Compare commits

..

41 Commits

Author SHA1 Message Date
Ed Hennis
ae41a712b3 Merge branch 'develop' into ximinez/fix/validator-cache 2026-02-24 17:34:55 -04:00
Ed Hennis
7684f9dd58 Merge branch 'develop' into ximinez/fix/validator-cache 2026-02-20 18:49:57 -04:00
Ed Hennis
7c34be898d Merge branch 'develop' into ximinez/fix/validator-cache 2026-02-20 18:26:15 -04:00
Ed Hennis
5e282f49da Merge branch 'develop' into ximinez/fix/validator-cache 2026-02-20 17:31:58 -04:00
Ed Hennis
86722689ac Merge remote-tracking branch 'upstream/develop' into ximinez/fix/validator-cache
* upstream/develop:
  ci: [DEPENDABOT] bump tj-actions/changed-files from 46.0.5 to 47.0.4 (6394)
  ci: [DEPENDABOT] bump codecov/codecov-action from 5.4.3 to 5.5.2 (6398)
  ci: Build docs in PRs and in private repos (6400)
  ci: Add dependabot config (6379)
  Fix tautological assertion (6393)
  chore: Apply clang-format width 100 (6387)
2026-02-20 16:25:05 -05:00
Ed Hennis
3759144bba Update formatting 2026-02-20 16:23:53 -05:00
Ed Hennis
16c41c2143 Merge commit '25cca465538a56cce501477f9e5e2c1c7ea2d84c' into ximinez/fix/validator-cache
* commit '25cca465538a56cce501477f9e5e2c1c7ea2d84c':
  chore: Set clang-format width to 100 in config file (6387)
2026-02-20 16:22:13 -05:00
Ed Hennis
af28042946 Merge branch 'develop' into ximinez/fix/validator-cache 2026-02-19 16:24:50 -05:00
Ed Hennis
734426554d Merge branch 'develop' into ximinez/fix/validator-cache 2026-02-18 20:49:50 -04:00
Ed Hennis
7f17daa95f Merge branch 'develop' into ximinez/fix/validator-cache 2026-02-04 16:30:05 -04:00
Ed Hennis
f359cd8dad Merge branch 'develop' into ximinez/fix/validator-cache 2026-02-03 16:08:02 -04:00
Ed Hennis
bf0b10404d Fix formatting 2026-01-28 19:40:27 -05:00
Ed Hennis
d019ebaf36 Merge branch 'develop' into ximinez/fix/validator-cache 2026-01-28 18:44:52 -04:00
Ed Hennis
b6e4620349 Merge branch 'develop' into ximinez/fix/validator-cache 2026-01-15 13:03:28 -04:00
Ed Hennis
db0ef6a370 Merge branch 'develop' into ximinez/fix/validator-cache 2026-01-15 12:05:56 -04:00
Ed Hennis
11a45a0ac2 Merge branch 'develop' into ximinez/fix/validator-cache 2026-01-13 18:19:08 -04:00
Ed Hennis
aa035f4cfd Merge branch 'develop' into ximinez/fix/validator-cache 2026-01-13 15:27:57 -04:00
Ed Hennis
8988f9117f Merge branch 'develop' into ximinez/fix/validator-cache 2026-01-12 14:52:12 -04:00
Ed Hennis
ae4f379845 Merge branch 'develop' into ximinez/fix/validator-cache 2026-01-11 00:50:40 -04:00
Ed Hennis
671aa11649 Merge branch 'develop' into ximinez/fix/validator-cache 2026-01-08 17:06:06 -04:00
Ed Hennis
53d35fd8ea Merge branch 'develop' into ximinez/fix/validator-cache 2026-01-08 13:04:16 -04:00
Ed Hennis
0c7ea2e333 Merge branch 'develop' into ximinez/fix/validator-cache 2026-01-06 14:02:10 -05:00
Ed Hennis
5f54be25e9 Merge branch 'develop' into ximinez/fix/validator-cache 2025-12-22 17:39:55 -05:00
Ed Hennis
d82756519c Merge branch 'develop' into ximinez/fix/validator-cache 2025-12-18 19:59:49 -05:00
Ed Hennis
1f23832659 Merge branch 'develop' into ximinez/fix/validator-cache 2025-12-12 20:34:55 -05:00
Ed Hennis
4c50969bde Merge branch 'develop' into ximinez/fix/validator-cache 2025-12-11 15:31:29 -05:00
Ed Hennis
aabdf372dd Merge branch 'develop' into ximinez/fix/validator-cache 2025-12-05 21:13:06 -05:00
Ed Hennis
c6d63a4b90 Merge branch 'develop' into ximinez/fix/validator-cache 2025-12-02 17:37:25 -05:00
Ed Hennis
1e6c3208db Merge branch 'develop' into ximinez/fix/validator-cache 2025-12-01 14:40:41 -05:00
Ed Hennis
a74f223efb Merge branch 'develop' into ximinez/fix/validator-cache 2025-11-28 15:46:40 -05:00
Ed Hennis
1eb3a3ea5a Merge branch 'develop' into ximinez/fix/validator-cache 2025-11-27 01:48:53 -05:00
Ed Hennis
630e428929 Merge branch 'develop' into ximinez/fix/validator-cache 2025-11-26 00:25:12 -05:00
Ed Hennis
3f93edc5e0 Merge branch 'develop' into ximinez/fix/validator-cache 2025-11-25 14:55:02 -05:00
Ed Hennis
baf62689ff Merge branch 'develop' into ximinez/fix/validator-cache 2025-11-24 21:49:07 -05:00
Ed Hennis
ddf7d6cac4 Merge branch 'develop' into ximinez/fix/validator-cache 2025-11-24 21:30:18 -05:00
Ed Hennis
fcd2ea2d6e Merge branch 'develop' into ximinez/fix/validator-cache 2025-11-21 12:47:54 -05:00
Ed Hennis
a16aa5b12f Merge branch 'develop' into ximinez/fix/validator-cache 2025-11-18 22:39:25 -05:00
Ed Hennis
ef2de81870 Merge branch 'develop' into ximinez/fix/validator-cache 2025-11-15 03:08:38 -05:00
Ed Hennis
fce6757260 Merge branch 'develop' into ximinez/fix/validator-cache 2025-11-13 12:19:10 -05:00
Ed Hennis
d759a0a2b0 Merge branch 'develop' into ximinez/fix/validator-cache 2025-11-12 14:12:51 -05:00
Ed Hennis
d2dda416e8 Use Validator List (VL) cache files in more scenarios
- If any [validator_list_keys] are not available after all
  [validator_list_sites] have had a chance to be queried, then fall
  back to loading cache files. Currently, cache files are only used if
  no sites are defined, or the request to one of them has an error. It
  does not include cases where not enough sites are defined, or if a
  site returns an invalid VL (or something else entirely).
- Resolves #5320
2025-11-10 19:53:02 -05:00
26 changed files with 457 additions and 2542 deletions

View File

@@ -101,7 +101,7 @@ jobs:
steps:
- name: Cleanup workspace (macOS and Windows)
if: ${{ runner.os == 'macOS' || runner.os == 'Windows' }}
uses: XRPLF/actions/cleanup-workspace@c7d9ce5ebb03c752a354889ecd870cadfc2b1cd4
uses: XRPLF/actions/cleanup-workspace@cf0433aa74563aead044a1e395610c96d65a37cf
- name: Checkout repository
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2

View File

@@ -64,7 +64,7 @@ jobs:
steps:
- name: Cleanup workspace (macOS and Windows)
if: ${{ runner.os == 'macOS' || runner.os == 'Windows' }}
uses: XRPLF/actions/cleanup-workspace@c7d9ce5ebb03c752a354889ecd870cadfc2b1cd4
uses: XRPLF/actions/cleanup-workspace@cf0433aa74563aead044a1e395610c96d65a37cf
- name: Checkout repository
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2

File diff suppressed because it is too large Load Diff

View File

@@ -16,6 +16,8 @@ find_dependency(Boost
COMPONENTS
chrono
container
context
coroutine
date_time
filesystem
program_options

View File

@@ -22,6 +22,7 @@ target_compile_definitions(
BOOST_FILESYSTEM_NO_DEPRECATED
>
$<$<NOT:$<BOOL:${boost_show_deprecated}>>:
BOOST_COROUTINES_NO_DEPRECATION_WARNING
BOOST_BEAST_ALLOW_DEPRECATED
BOOST_FILESYSTEM_DEPRECATED
>

View File

@@ -4,6 +4,7 @@ include(XrplSanitizers)
find_package(Boost REQUIRED
COMPONENTS chrono
container
coroutine
date_time
filesystem
json
@@ -20,6 +21,7 @@ target_link_libraries(
INTERFACE Boost::headers
Boost::chrono
Boost::container
Boost::coroutine
Boost::date_time
Boost::filesystem
Boost::json

View File

@@ -196,6 +196,7 @@ class Xrpl(ConanFile):
"boost::headers",
"boost::chrono",
"boost::container",
"boost::coroutine",
"boost::date_time",
"boost::filesystem",
"boost::json",

View File

@@ -77,7 +77,6 @@ words:
- coro
- coros
- cowid
- cppcoro
- cryptocondition
- cryptoconditional
- cryptoconditions
@@ -100,13 +99,11 @@ words:
- endmacro
- exceptioned
- Falco
- fcontext
- finalizers
- firewalled
- fmtdur
- fsanitize
- funclets
- gantt
- gcov
- gcovr
- ghead
@@ -195,7 +192,6 @@ words:
- permissioned
- pointee
- populator
- pratik
- preauth
- preauthorization
- preauthorize
@@ -210,7 +206,6 @@ words:
- queuable
- Raphson
- replayer
- repost
- rerere
- retriable
- RIPD
@@ -241,7 +236,6 @@ words:
- soci
- socidb
- sslws
- stackful
- statsd
- STATSDCOLLECTOR
- stissue

122
include/xrpl/core/Coro.ipp Normal file
View File

@@ -0,0 +1,122 @@
#pragma once
#include <xrpl/basics/ByteUtilities.h>
namespace xrpl {
template <class F>
JobQueue::Coro::Coro(Coro_create_t, JobQueue& jq, JobType type, std::string const& name, F&& f)
: jq_(jq)
, type_(type)
, name_(name)
, running_(false)
, coro_(
[this, fn = std::forward<F>(f)](
boost::coroutines::asymmetric_coroutine<void>::push_type& do_yield) {
yield_ = &do_yield;
yield();
fn(shared_from_this());
#ifndef NDEBUG
finished_ = true;
#endif
},
boost::coroutines::attributes(megabytes(1)))
{
}
inline JobQueue::Coro::~Coro()
{
#ifndef NDEBUG
XRPL_ASSERT(finished_, "xrpl::JobQueue::Coro::~Coro : is finished");
#endif
}
inline void
JobQueue::Coro::yield() const
{
{
std::lock_guard lock(jq_.m_mutex);
++jq_.nSuspend_;
}
(*yield_)();
}
inline bool
JobQueue::Coro::post()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
// sp keeps 'this' alive
if (jq_.addJob(type_, name_, [this, sp = shared_from_this()]() { resume(); }))
{
return true;
}
// The coroutine will not run. Clean up running_.
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
return false;
}
inline void
JobQueue::Coro::resume()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
}
auto saved = detail::getLocalValues().release();
detail::getLocalValues().reset(&lvs_);
std::lock_guard lock(mutex_);
XRPL_ASSERT(static_cast<bool>(coro_), "xrpl::JobQueue::Coro::resume : is runnable");
coro_();
detail::getLocalValues().release();
detail::getLocalValues().reset(saved);
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
}
inline bool
JobQueue::Coro::runnable() const
{
return static_cast<bool>(coro_);
}
inline void
JobQueue::Coro::expectEarlyExit()
{
#ifndef NDEBUG
if (!finished_)
#endif
{
// expectEarlyExit() must only ever be called from outside the
// Coro's stack. It you're inside the stack you can simply return
// and be done.
//
// That said, since we're outside the Coro's stack, we need to
// decrement the nSuspend that the Coro's call to yield caused.
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
#ifndef NDEBUG
finished_ = true;
#endif
}
}
inline void
JobQueue::Coro::join()
{
std::unique_lock<std::mutex> lk(mutex_run_);
cv_.wait(lk, [this]() { return running_ == false; });
}
} // namespace xrpl

View File

@@ -1,289 +0,0 @@
#pragma once
#include <coroutine>
#include <exception>
#include <utility>
#include <variant>
namespace xrpl {
template <typename T = void>
class CoroTask;
// --------------------------------------------------------------------------
// CoroTask<void> — coroutine return type for void-returning coroutines
// --------------------------------------------------------------------------
template <>
class CoroTask<void>
{
public:
struct promise_type;
using Handle = std::coroutine_handle<promise_type>;
struct promise_type
{
std::exception_ptr exception_;
std::coroutine_handle<> continuation_;
CoroTask
get_return_object()
{
return CoroTask{Handle::from_promise(*this)};
}
std::suspend_always
initial_suspend() noexcept
{
return {};
}
struct FinalAwaiter
{
bool
await_ready() noexcept
{
return false;
}
std::coroutine_handle<>
await_suspend(Handle h) noexcept
{
if (auto cont = h.promise().continuation_)
return cont;
return std::noop_coroutine();
}
void
await_resume() noexcept
{
}
};
FinalAwaiter
final_suspend() noexcept
{
return {};
}
void
return_void()
{
}
void
unhandled_exception()
{
exception_ = std::current_exception();
}
};
CoroTask() = default;
explicit CoroTask(Handle h) : handle_(h)
{
}
~CoroTask()
{
if (handle_)
handle_.destroy();
}
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
{
}
CoroTask&
operator=(CoroTask&& other) noexcept
{
if (this != &other)
{
if (handle_)
handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
CoroTask(CoroTask const&) = delete;
CoroTask&
operator=(CoroTask const&) = delete;
Handle
handle() const
{
return handle_;
}
bool
done() const
{
return handle_ && handle_.done();
}
// Awaiter interface — allows co_await on a CoroTask
bool
await_ready() const noexcept
{
return false;
}
std::coroutine_handle<>
await_suspend(std::coroutine_handle<> caller) noexcept
{
handle_.promise().continuation_ = caller;
return handle_; // Symmetric transfer
}
void
await_resume()
{
if (auto& ep = handle_.promise().exception_)
std::rethrow_exception(ep);
}
private:
Handle handle_;
};
// --------------------------------------------------------------------------
// CoroTask<T> — coroutine return type for value-returning coroutines
// --------------------------------------------------------------------------
template <typename T>
class CoroTask
{
public:
struct promise_type;
using Handle = std::coroutine_handle<promise_type>;
struct promise_type
{
std::variant<std::monostate, T, std::exception_ptr> result_;
std::coroutine_handle<> continuation_;
CoroTask
get_return_object()
{
return CoroTask{Handle::from_promise(*this)};
}
std::suspend_always
initial_suspend() noexcept
{
return {};
}
struct FinalAwaiter
{
bool
await_ready() noexcept
{
return false;
}
std::coroutine_handle<>
await_suspend(Handle h) noexcept
{
if (auto cont = h.promise().continuation_)
return cont;
return std::noop_coroutine();
}
void
await_resume() noexcept
{
}
};
FinalAwaiter
final_suspend() noexcept
{
return {};
}
void
return_value(T value)
{
result_.template emplace<1>(std::move(value));
}
void
unhandled_exception()
{
result_.template emplace<2>(std::current_exception());
}
};
CoroTask() = default;
explicit CoroTask(Handle h) : handle_(h)
{
}
~CoroTask()
{
if (handle_)
handle_.destroy();
}
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
{
}
CoroTask&
operator=(CoroTask&& other) noexcept
{
if (this != &other)
{
if (handle_)
handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
CoroTask(CoroTask const&) = delete;
CoroTask&
operator=(CoroTask const&) = delete;
Handle
handle() const
{
return handle_;
}
bool
done() const
{
return handle_ && handle_.done();
}
bool
await_ready() const noexcept
{
return false;
}
std::coroutine_handle<>
await_suspend(std::coroutine_handle<> caller) noexcept
{
handle_.promise().continuation_ = caller;
return handle_;
}
T
await_resume()
{
auto& result = handle_.promise().result_;
if (auto* ep = std::get_if<2>(&result))
std::rethrow_exception(*ep);
return std::get<1>(std::move(result));
}
private:
Handle handle_;
};
} // namespace xrpl

View File

@@ -1,152 +0,0 @@
#pragma once
namespace xrpl {
inline JobQueue::CoroTaskRunner::CoroTaskRunner(
create_t,
JobQueue& jq,
JobType type,
std::string const& name)
: jq_(jq), type_(type), name_(name), running_(false)
{
}
template <class F>
void
JobQueue::CoroTaskRunner::init(F&& f)
{
task_ = std::forward<F>(f)(shared_from_this());
}
inline JobQueue::CoroTaskRunner::~CoroTaskRunner()
{
#ifndef NDEBUG
XRPL_ASSERT(
finished_,
"xrpl::JobQueue::CoroTaskRunner::~CoroTaskRunner : is finished");
#endif
}
inline void
JobQueue::CoroTaskRunner::onSuspend()
{
std::lock_guard lock(jq_.m_mutex);
++jq_.nSuspend_;
}
inline void
JobQueue::CoroTaskRunner::onUndoSuspend()
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
}
inline auto
JobQueue::CoroTaskRunner::suspend()
{
struct SuspendAwaiter
{
CoroTaskRunner& runner_;
bool
await_ready() const noexcept
{
return false;
}
void
await_suspend(std::coroutine_handle<>) const
{
runner_.onSuspend();
}
void
await_resume() const noexcept
{
}
};
return SuspendAwaiter{*this};
}
inline bool
JobQueue::CoroTaskRunner::post()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
// sp prevents 'this' from being destroyed while the job is pending
if (jq_.addJob(type_, name_, [this, sp = shared_from_this()]() { resume(); }))
{
return true;
}
// The coroutine will not run. Clean up running_.
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
return false;
}
inline void
JobQueue::CoroTaskRunner::resume()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
}
auto saved = detail::getLocalValues().release();
detail::getLocalValues().reset(&lvs_);
std::lock_guard lock(mutex_);
XRPL_ASSERT(!task_.done(), "xrpl::JobQueue::CoroTaskRunner::resume : task is not done");
task_.handle().resume();
detail::getLocalValues().release();
detail::getLocalValues().reset(saved);
#ifndef NDEBUG
if (task_.done())
finished_ = true;
#endif
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
}
inline bool
JobQueue::CoroTaskRunner::runnable() const
{
return !task_.done();
}
inline void
JobQueue::CoroTaskRunner::expectEarlyExit()
{
#ifndef NDEBUG
if (!finished_)
#endif
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
#ifndef NDEBUG
finished_ = true;
#endif
}
// Destroy the coroutine frame to break a potential shared_ptr cycle.
// The coroutine is at initial_suspend and never ran user code, so
// destroying it is safe. Without this, the frame holds a shared_ptr
// back to this CoroTaskRunner, creating an unreachable reference cycle.
task_ = {};
}
inline void
JobQueue::CoroTaskRunner::join()
{
std::unique_lock<std::mutex> lk(mutex_run_);
cv_.wait(lk, [this]() { return running_ == false; });
}
} // namespace xrpl

View File

@@ -2,13 +2,13 @@
#include <xrpl/basics/LocalValue.h>
#include <xrpl/core/ClosureCounter.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/core/JobTypeData.h>
#include <xrpl/core/JobTypes.h>
#include <xrpl/core/detail/Workers.h>
#include <xrpl/json/json_value.h>
#include <coroutine>
#include <boost/coroutine/all.hpp>
#include <set>
namespace xrpl {
@@ -18,6 +18,10 @@ class PerfLog;
}
class Logs;
struct Coro_create_t
{
explicit Coro_create_t() = default;
};
/** A pool of threads to perform work.
@@ -32,8 +36,8 @@ class Logs;
class JobQueue : private Workers::Callback
{
public:
/** C++20 coroutine lifecycle manager. */
class CoroTaskRunner : public std::enable_shared_from_this<CoroTaskRunner>
/** Coroutines must run to completion. */
class Coro : public std::enable_shared_from_this<Coro>
{
private:
detail::LocalValues lvs_;
@@ -44,76 +48,73 @@ public:
std::mutex mutex_;
std::mutex mutex_run_;
std::condition_variable cv_;
CoroTask<void> task_;
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
#ifndef NDEBUG
bool finished_ = false;
#endif
public:
struct create_t
{
explicit create_t() = default;
};
// Private: Used in the implementation of postCoroTask
CoroTaskRunner(create_t, JobQueue&, JobType, std::string const&);
// Private: Used in the implementation
template <class F>
Coro(Coro_create_t, JobQueue&, JobType, std::string const&, F&&);
// Not copy-constructible or assignable
CoroTaskRunner(CoroTaskRunner const&) = delete;
CoroTaskRunner&
operator=(CoroTaskRunner const&) = delete;
Coro(Coro const&) = delete;
Coro&
operator=(Coro const&) = delete;
~CoroTaskRunner();
/** Initialize with a coroutine function.
Must be called exactly once, after the object is managed by
shared_ptr. This is handled automatically by postCoroTask().
*/
template <class F>
void
init(F&& f);
/** Increment the suspended coroutine count.
Called when the coroutine is about to suspend.
*/
void
onSuspend();
/** Decrement the suspended coroutine count without side effects.
Used to undo onSuspend() when a suspend is cancelled.
*/
void
onUndoSuspend();
~Coro();
/** Suspend coroutine execution.
Returns an awaiter for use with co_await.
Effects:
Increments nSuspend_ in the JobQueue.
The coroutine is suspended.
The caller must later call post() or resume() to continue.
The coroutine's stack is saved.
The associated Job thread is released.
Note:
The associated Job function returns.
Undefined behavior if called consecutively without a corresponding
post.
*/
auto
suspend();
void
yield() const;
/** Schedule coroutine execution on the JobQueue.
@return true if the job is added to the JobQueue.
/** Schedule coroutine execution.
Effects:
Returns immediately.
A new job is scheduled to resume the execution of the coroutine.
When the job runs, the coroutine's stack is restored and execution
continues at the beginning of coroutine function or the
statement after the previous call to yield. Undefined behavior if
called after the coroutine has completed with a return (as opposed to
a yield()). Undefined behavior if post() or resume() called
consecutively without a corresponding yield.
@return true if the Coro's job is added to the JobQueue.
*/
bool
post();
/** Resume coroutine on current thread. */
/** Resume coroutine execution.
Effects:
The coroutine continues execution from where it last left off
using this same thread.
Undefined behavior if called after the coroutine has completed
with a return (as opposed to a yield()).
Undefined behavior if resume() or post() called consecutively
without a corresponding yield.
*/
void
resume();
/** Returns true if coroutine hasn't completed. */
/** Returns true if the Coro is still runnable (has not returned). */
bool
runnable() const;
/** Once called, allows early exit without an assert. */
/** Once called, the Coro allows early exit without an assert. */
void
expectEarlyExit();
/** Waits until coroutine completes. */
/** Waits until coroutine returns from the user function. */
void
join();
};
@@ -151,18 +152,18 @@ public:
return false;
}
/** Creates a C++20 coroutine and adds a job to the queue to run it.
/** Creates a coroutine and adds a job to the queue which will run it.
@param t The type of job.
@param name Name of the job.
@param f Callable with signature
CoroTask<void>(std::shared_ptr<CoroTaskRunner>).
@param f Has a signature of void(std::shared_ptr<Coro>). Called when the
job executes.
@return shared_ptr to posted CoroTaskRunner. nullptr if not successful.
@return shared_ptr to posted Coro. nullptr if post was not successful.
*/
template <class F>
std::shared_ptr<CoroTaskRunner>
postCoroTask(JobType t, std::string const& name, F&& f);
std::shared_ptr<Coro>
postCoro(JobType t, std::string const& name, F&& f);
/** Jobs waiting at this priority.
*/
@@ -216,6 +217,8 @@ public:
isStopped() const;
private:
friend class Coro;
using JobDataMap = std::map<JobType, JobTypeData>;
beast::Journal m_journal;
@@ -316,31 +319,86 @@ private:
getJobLimit(JobType type);
};
/*
An RPC command is received and is handled via ServerHandler(HTTP) or
Handler(websocket), depending on the connection type. The handler then calls
the JobQueue::postCoro() method to create a coroutine and run it at a later
point. This frees up the handler thread and allows it to continue handling
other requests while the RPC command completes its work asynchronously.
postCoro() creates a Coro object. When the Coro ctor is called, and its
coro_ member is initialized (a boost::coroutines::pull_type), execution
automatically passes to the coroutine, which we don't want at this point,
since we are still in the handler thread context. It's important to note
here that construction of a boost pull_type automatically passes execution to
the coroutine. A pull_type object automatically generates a push_type that is
passed as a parameter (do_yield) in the signature of the function the
pull_type was created with. This function is immediately called during coro_
construction and within it, Coro::yield_ is assigned the push_type
parameter (do_yield) address and called (yield()) so we can return execution
back to the caller's stack.
postCoro() then calls Coro::post(), which schedules a job on the job
queue to continue execution of the coroutine in a JobQueue worker thread at
some later time. When the job runs, we lock on the Coro::mutex_ and call
coro_ which continues where we had left off. Since we the last thing we did
in coro_ was call yield(), the next thing we continue with is calling the
function param f, that was passed into Coro ctor. It is within this
function body that the caller specifies what he would like to do while
running in the coroutine and allow them to suspend and resume execution.
A task that relies on other events to complete, such as path finding, calls
Coro::yield() to suspend its execution while waiting on those events to
complete and continue when signaled via the Coro::post() method.
There is a potential race condition that exists here where post() can get
called before yield() after f is called. Technically the problem only occurs
if the job that post() scheduled is executed before yield() is called.
If the post() job were to be executed before yield(), undefined behavior
would occur. The lock ensures that coro_ is not called again until we exit
the coroutine. At which point a scheduled resume() job waiting on the lock
would gain entry, harmlessly call coro_ and immediately return as we have
already completed the coroutine.
The race condition occurs as follows:
1- The coroutine is running.
2- The coroutine is about to suspend, but before it can do so, it must
arrange for some event to wake it up.
3- The coroutine arranges for some event to wake it up.
4- Before the coroutine can suspend, that event occurs and the
resumption of the coroutine is scheduled on the job queue. 5- Again, before
the coroutine can suspend, the resumption of the coroutine is dispatched. 6-
Again, before the coroutine can suspend, the resumption code runs the
coroutine.
The coroutine is now running in two threads.
The lock prevents this from happening as step 6 will block until the
lock is released which only happens after the coroutine completes.
*/
} // namespace xrpl
#include <xrpl/core/CoroTaskRunner.ipp>
#include <xrpl/core/Coro.ipp>
namespace xrpl {
template <class F>
std::shared_ptr<JobQueue::CoroTaskRunner>
JobQueue::postCoroTask(JobType t, std::string const& name, F&& f)
std::shared_ptr<JobQueue::Coro>
JobQueue::postCoro(JobType t, std::string const& name, F&& f)
{
auto runner = std::make_shared<CoroTaskRunner>(CoroTaskRunner::create_t{}, *this, t, name);
runner->init(std::forward<F>(f));
// Account for the initial suspension (CoroTask uses lazy start).
/* First param is a detail type to make construction private.
Last param is the function the coroutine runs. Signature of
void(std::shared_ptr<Coro>).
*/
auto coro = std::make_shared<Coro>(Coro_create_t{}, *this, t, name, std::forward<F>(f));
if (!coro->post())
{
std::lock_guard lock(m_mutex);
++nSuspend_;
// The Coro was not successfully posted. Disable it so it's destructor
// can run with no negative side effects. Then destroy it.
coro->expectEarlyExit();
coro.reset();
}
if (!runner->post())
{
runner->expectEarlyExit();
runner.reset();
}
return runner;
return coro;
}
} // namespace xrpl

View File

@@ -1,56 +0,0 @@
#pragma once
#include <xrpl/core/JobQueue.h>
#include <coroutine>
#include <memory>
namespace xrpl {
/** Awaiter that suspends and immediately reschedules on the JobQueue.
Equivalent to calling yield() followed by post() in the old Coro API.
Usage:
co_await JobQueueAwaiter{runner};
What it waits for: The coroutine is re-queued as a job and resumes
when a worker thread picks it up.
Which thread resumes: A JobQueue worker thread.
What await_resume() returns: void.
*/
struct JobQueueAwaiter
{
std::shared_ptr<JobQueue::CoroTaskRunner> runner;
bool
await_ready() const noexcept
{
return false;
}
bool
await_suspend(std::coroutine_handle<>)
{
// Increment nSuspend (equivalent to yield())
runner->onSuspend();
// Schedule resume on JobQueue (equivalent to post())
if (!runner->post())
{
// JobQueue is stopping. Undo the suspend count and
// don't actually suspend — the coroutine continues
// immediately so it can clean up and co_return.
runner->onUndoSuspend();
return false;
}
return true;
}
void
await_resume() const noexcept
{
}
};
} // namespace xrpl

View File

@@ -8,7 +8,6 @@
#include <xrpld/rpc/detail/Tuning.h>
#include <xrpl/beast/unit_test.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/json/json_reader.h>
#include <xrpl/protocol/ApiVersion.h>
@@ -132,6 +131,7 @@ public:
c,
Role::USER,
{},
{},
RPC::apiVersionIfUnspecified},
{},
{}};
@@ -155,11 +155,11 @@ public:
Json::Value result;
gate g;
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
context.params = std::move(params);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
using namespace std::chrono_literals;
@@ -240,27 +240,28 @@ public:
c,
Role::USER,
{},
{},
RPC::apiVersionIfUnspecified},
{},
{}};
Json::Value result;
gate g;
// Test RPC::Tuning::max_src_cur source currencies.
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
context.params = rpf(Account("alice"), Account("bob"), RPC::Tuning::max_src_cur);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(!result.isMember(jss::error));
// Test more than RPC::Tuning::max_src_cur source currencies.
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
context.params = rpf(Account("alice"), Account("bob"), RPC::Tuning::max_src_cur + 1);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(result.isMember(jss::error));
@@ -268,22 +269,22 @@ public:
// Test RPC::Tuning::max_auto_src_cur source currencies.
for (auto i = 0; i < (RPC::Tuning::max_auto_src_cur - 1); ++i)
env.trust(Account("alice")[std::to_string(i + 100)](100), "bob");
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
context.params = rpf(Account("alice"), Account("bob"), 0);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(!result.isMember(jss::error));
// Test more than RPC::Tuning::max_auto_src_cur source currencies.
env.trust(Account("alice")["AUD"](100), "bob");
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
context.params = rpf(Account("alice"), Account("bob"), 0);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(result.isMember(jss::error));

View File

@@ -1,448 +0,0 @@
#include <test/jtx.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/core/JobQueueAwaiter.h>
#include <chrono>
#include <mutex>
namespace xrpl {
namespace test {
class CoroTask_test : public beast::unit_test::suite
{
public:
class gate
{
private:
std::condition_variable cv_;
std::mutex mutex_;
bool signaled_ = false;
public:
template <class Rep, class Period>
bool
wait_for(std::chrono::duration<Rep, Period> const& rel_time)
{
std::unique_lock<std::mutex> lk(mutex_);
auto b = cv_.wait_for(lk, rel_time, [this] { return signaled_; });
signaled_ = false;
return b;
}
void
signal()
{
std::lock_guard lk(mutex_);
signaled_ = true;
cv_.notify_all();
}
};
// Test: CoroTask<void> runs to completion
void
testVoidCompletion()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("void completion");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto) -> CoroTask<void> {
g.signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(!runner->runnable());
}
// Test: correct_order — suspend, join, post, complete
// Mirrors existing Coroutine_test::correct_order
void
testCorrectOrder()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("correct order");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g1, g2;
std::shared_ptr<JobQueue::CoroTaskRunner> r;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto runner) -> CoroTask<void> {
r = runner;
g1.signal();
co_await runner->suspend();
g2.signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g1.wait_for(5s));
runner->join();
runner->post();
BEAST_EXPECT(g2.wait_for(5s));
runner->join();
}
// Test: incorrect_order — post before suspend
// Mirrors existing Coroutine_test::incorrect_order
void
testIncorrectOrder()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("incorrect order");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto runner) -> CoroTask<void> {
runner->post();
co_await runner->suspend();
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
}
// Test: JobQueueAwaiter — suspend + auto-repost
void
testJobQueueAwaiter()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("JobQueueAwaiter");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int step = 0;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto runner) -> CoroTask<void> {
step = 1;
co_await JobQueueAwaiter{runner};
step = 2;
co_await JobQueueAwaiter{runner};
step = 3;
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(step == 3);
}
// Test: thread_specific_storage — per-coroutine LocalValue isolation
// Mirrors existing Coroutine_test::thread_specific_storage
void
testThreadSpecificStorage()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("thread specific storage");
Env env(*this);
auto& jq = env.app().getJobQueue();
static int const N = 4;
std::array<std::shared_ptr<JobQueue::CoroTaskRunner>, N> a;
LocalValue<int> lv(-1);
BEAST_EXPECT(*lv == -1);
gate g;
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
this->BEAST_EXPECT(*lv == -1);
*lv = -2;
this->BEAST_EXPECT(*lv == -2);
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(*lv == -1);
for (int i = 0; i < N; ++i)
{
jq.postCoroTask(jtCLIENT, "CoroTaskTest", [&, id = i](auto runner) -> CoroTask<void> {
a[id] = runner;
g.signal();
co_await runner->suspend();
this->BEAST_EXPECT(*lv == -1);
*lv = id;
this->BEAST_EXPECT(*lv == id);
g.signal();
co_await runner->suspend();
this->BEAST_EXPECT(*lv == id);
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
a[i]->join();
}
for (auto const& r : a)
{
r->post();
BEAST_EXPECT(g.wait_for(5s));
r->join();
}
for (auto const& r : a)
{
r->post();
r->join();
}
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
this->BEAST_EXPECT(*lv == -2);
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(*lv == -1);
}
// Test: exception propagation
void
testExceptionPropagation()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("exception propagation");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto) -> CoroTask<void> {
g.signal();
throw std::runtime_error("test exception");
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
// The exception is caught by promise_type::unhandled_exception()
// and the coroutine is considered done
BEAST_EXPECT(!runner->runnable());
}
// Test: multiple sequential co_await points
void
testMultipleYields()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("multiple yields");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int counter = 0;
std::shared_ptr<JobQueue::CoroTaskRunner> r;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto runner) -> CoroTask<void> {
r = runner;
++counter;
g.signal();
co_await runner->suspend();
++counter;
g.signal();
co_await runner->suspend();
++counter;
g.signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 1);
runner->join();
runner->post();
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 2);
runner->join();
runner->post();
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 3);
runner->join();
BEAST_EXPECT(!runner->runnable());
}
// Test: CoroTask<T> returns a value via co_return
void
testValueReturn()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value return");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int result = 0;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto) -> CoroTask<void> {
auto inner = []() -> CoroTask<int> { co_return 42; };
result = co_await inner();
g.signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(result == 42);
BEAST_EXPECT(!runner->runnable());
}
// Test: CoroTask<T> propagates exceptions from inner coroutines
void
testValueException()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value exception");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
bool caught = false;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto) -> CoroTask<void> {
auto inner = []() -> CoroTask<int> {
throw std::runtime_error("inner error");
co_return 0;
};
try
{
co_await inner();
}
catch (std::runtime_error const& e)
{
caught = true;
}
g.signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(caught);
BEAST_EXPECT(!runner->runnable());
}
// Test: CoroTask<T> chaining — nested value-returning coroutines
void
testValueChaining()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value chaining");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int result = 0;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto) -> CoroTask<void> {
auto add = [](int a, int b) -> CoroTask<int> { co_return a + b; };
auto mul = [&](int a, int b) -> CoroTask<int> {
int sum = co_await add(a, b);
co_return sum * 2;
};
result = co_await mul(3, 4);
g.signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(result == 14); // (3 + 4) * 2
BEAST_EXPECT(!runner->runnable());
}
// Test: postCoroTask returns nullptr when JobQueue is stopping
void
testShutdownRejection()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("shutdown rejection");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
// Stop the JobQueue
env.app().getJobQueue().stop();
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto) -> CoroTask<void> { co_return; });
BEAST_EXPECT(!runner);
}
void
run() override
{
testVoidCompletion();
testCorrectOrder();
testIncorrectOrder();
testJobQueueAwaiter();
testThreadSpecificStorage();
testExceptionPropagation();
testMultipleYields();
testValueReturn();
testValueException();
testValueChaining();
testShutdownRejection();
}
};
BEAST_DEFINE_TESTSUITE(CoroTask, core, xrpl);
} // namespace test
} // namespace xrpl

View File

@@ -54,15 +54,13 @@ public:
}));
gate g1, g2;
std::shared_ptr<JobQueue::CoroTaskRunner> c;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTest", [&](auto runner) -> CoroTask<void> {
c = runner;
g1.signal();
co_await runner->suspend();
g2.signal();
co_return;
});
std::shared_ptr<JobQueue::Coro> c;
env.app().getJobQueue().postCoro(jtCLIENT, "CoroTest", [&](auto const& cr) {
c = cr;
g1.signal();
c->yield();
g2.signal();
});
BEAST_EXPECT(g1.wait_for(5s));
c->join();
c->post();
@@ -83,17 +81,11 @@ public:
}));
gate g;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTest", [&](auto runner) -> CoroTask<void> {
// Schedule a resume before suspending. The posted job
// cannot actually call resume() until the current resume()
// releases CoroTaskRunner::mutex_, which only happens after
// the coroutine suspends at co_await.
runner->post();
co_await runner->suspend();
g.signal();
co_return;
});
env.app().getJobQueue().postCoro(jtCLIENT, "CoroTest", [&](auto const& c) {
c->post();
c->yield();
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
}
@@ -109,7 +101,7 @@ public:
auto& jq = env.app().getJobQueue();
static int const N = 4;
std::array<std::shared_ptr<JobQueue::CoroTaskRunner>, N> a;
std::array<std::shared_ptr<JobQueue::Coro>, N> a;
LocalValue<int> lv(-1);
BEAST_EXPECT(*lv == -1);
@@ -126,19 +118,18 @@ public:
for (int i = 0; i < N; ++i)
{
jq.postCoroTask(jtCLIENT, "CoroTest", [&, id = i](auto runner) -> CoroTask<void> {
a[id] = runner;
jq.postCoro(jtCLIENT, "CoroTest", [&, id = i](auto const& c) {
a[id] = c;
g.signal();
co_await runner->suspend();
c->yield();
this->BEAST_EXPECT(*lv == -1);
*lv = id;
this->BEAST_EXPECT(*lv == id);
g.signal();
co_await runner->suspend();
c->yield();
this->BEAST_EXPECT(*lv == id);
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
a[i]->join();

View File

@@ -44,85 +44,86 @@ class JobQueue_test : public beast::unit_test::suite
}
void
testPostCoroTask()
testPostCoro()
{
jtx::Env env{*this};
JobQueue& jQueue = env.app().getJobQueue();
{
// Test repeated post()s until the coroutine completes.
// Test repeated post()s until the Coro completes.
std::atomic<int> yieldCount{0};
auto const runner = jQueue.postCoroTask(
jtCLIENT, "PostCoroTest1", [&yieldCount](auto runner) -> CoroTask<void> {
auto const coro = jQueue.postCoro(
jtCLIENT,
"PostCoroTest1",
[&yieldCount](std::shared_ptr<JobQueue::Coro> const& coroCopy) {
while (++yieldCount < 4)
co_await runner->suspend();
co_return;
coroCopy->yield();
});
BEAST_EXPECT(runner != nullptr);
BEAST_EXPECT(coro != nullptr);
// Wait for the Job to run and yield.
while (yieldCount == 0)
;
// Now re-post until the CoroTaskRunner says it is done.
// Now re-post until the Coro says it is done.
int old = yieldCount;
while (runner->runnable())
while (coro->runnable())
{
BEAST_EXPECT(runner->post());
BEAST_EXPECT(coro->post());
while (old == yieldCount)
{
}
runner->join();
coro->join();
BEAST_EXPECT(++old == yieldCount);
}
BEAST_EXPECT(yieldCount == 4);
}
{
// Test repeated resume()s until the coroutine completes.
// Test repeated resume()s until the Coro completes.
int yieldCount{0};
auto const runner = jQueue.postCoroTask(
jtCLIENT, "PostCoroTest2", [&yieldCount](auto runner) -> CoroTask<void> {
auto const coro = jQueue.postCoro(
jtCLIENT,
"PostCoroTest2",
[&yieldCount](std::shared_ptr<JobQueue::Coro> const& coroCopy) {
while (++yieldCount < 4)
co_await runner->suspend();
co_return;
coroCopy->yield();
});
if (!runner)
if (!coro)
{
// There's no good reason we should not get a runner, but we
// There's no good reason we should not get a Coro, but we
// can't continue without one.
BEAST_EXPECT(false);
return;
}
// Wait for the Job to run and yield.
runner->join();
coro->join();
// Now resume until the CoroTaskRunner says it is done.
// Now resume until the Coro says it is done.
int old = yieldCount;
while (runner->runnable())
while (coro->runnable())
{
runner->resume(); // Resume runs synchronously on this thread.
coro->resume(); // Resume runs synchronously on this thread.
BEAST_EXPECT(++old == yieldCount);
}
BEAST_EXPECT(yieldCount == 4);
}
{
// If the JobQueue is stopped, we should no
// longer be able to post a coroutine (and calling postCoroTask()
// should return nullptr).
// longer be able to add a Coro (and calling postCoro() should
// return false).
using namespace std::chrono_literals;
jQueue.stop();
// The coroutine should never run, so having it access this
// The Coro should never run, so having the Coro access this
// unprotected variable on the stack should be completely safe.
// Not recommended for the faint of heart...
bool unprotected;
auto const runner = jQueue.postCoroTask(
jtCLIENT, "PostCoroTest3", [&unprotected](auto) -> CoroTask<void> {
auto const coro = jQueue.postCoro(
jtCLIENT, "PostCoroTest3", [&unprotected](std::shared_ptr<JobQueue::Coro> const&) {
unprotected = false;
co_return;
});
BEAST_EXPECT(runner == nullptr);
BEAST_EXPECT(coro == nullptr);
}
}
@@ -131,7 +132,7 @@ public:
run() override
{
testAddJob();
testPostCoroTask();
testPostCoro();
}
};

View File

@@ -6,7 +6,6 @@
#include <xrpld/rpc/RPCHandler.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/protocol/ApiVersion.h>
#include <xrpl/protocol/STParsedJSON.h>
#include <xrpl/resource/Fees.h>
@@ -194,6 +193,7 @@ AMMTest::find_paths_request(
c,
Role::USER,
{},
{},
RPC::apiVersionIfUnspecified},
{},
{}};
@@ -215,11 +215,11 @@ AMMTest::find_paths_request(
Json::Value result;
gate g;
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
context.params = std::move(params);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
using namespace std::chrono_literals;

View File

@@ -1425,6 +1425,7 @@ ApplicationImp::setup(boost::program_options::variables_map const& cmdline)
c,
Role::ADMIN,
{},
{},
RPC::apiMaximumSupportedVersion},
jvCommand};

View File

@@ -3,7 +3,6 @@
#include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/beast/net/IPAddressConversion.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/resource/Fees.h>
namespace xrpl {
@@ -100,14 +99,13 @@ GRPCServerImpl::CallData<Request, Response>::process()
// ensures that finished is always true when this CallData object
// is returned as a tag in handleRpcs(), after sending the response
finished_ = true;
auto runner = app_.getJobQueue().postCoroTask(
JobType::jtRPC, "gRPC-Client", [thisShared](auto) -> CoroTask<void> {
thisShared->processRequest();
co_return;
auto coro = app_.getJobQueue().postCoro(
JobType::jtRPC, "gRPC-Client", [thisShared](std::shared_ptr<JobQueue::Coro> coro) {
thisShared->process(coro);
});
// If runner is null, then the JobQueue has already been shutdown
if (!runner)
// If coro is null, then the JobQueue has already been shutdown
if (!coro)
{
grpc::Status status{grpc::StatusCode::INTERNAL, "Job Queue is already stopped"};
responder_.FinishWithError(status, this);
@@ -116,7 +114,7 @@ GRPCServerImpl::CallData<Request, Response>::process()
template <class Request, class Response>
void
GRPCServerImpl::CallData<Request, Response>::processRequest()
GRPCServerImpl::CallData<Request, Response>::process(std::shared_ptr<JobQueue::Coro> coro)
{
try
{
@@ -158,6 +156,7 @@ GRPCServerImpl::CallData<Request, Response>::processRequest()
app_.getLedgerMaster(),
usage,
role,
coro,
InfoSub::pointer(),
apiVersion},
request_};

View File

@@ -208,7 +208,7 @@ private:
private:
// process the request. Called inside the coroutine passed to JobQueue
void
processRequest();
process(std::shared_ptr<JobQueue::Coro> coro);
// return load type of this RPC
Resource::Charge

View File

@@ -129,7 +129,11 @@ ValidatorSite::load(
{
try
{
sites_.emplace_back(uri);
// This is not super efficient, but it doesn't happen often.
bool found = std::ranges::any_of(
sites_, [&uri](auto const& site) { return site.loadedResource->uri == uri; });
if (!found)
sites_.emplace_back(uri);
}
catch (std::exception const& e)
{
@@ -190,6 +194,16 @@ ValidatorSite::setTimer(
std::lock_guard<std::mutex> const& site_lock,
std::lock_guard<std::mutex> const& state_lock)
{
if (!sites_.empty() && //
std::ranges::all_of(
sites_, [](auto const& site) { return site.lastRefreshStatus.has_value(); }))
{
// If all of the sites have been handled at least once (including
// errors and timeouts), call missingSite, which will load the cache
// files for any lists that are still unavailable.
missingSite(site_lock);
}
auto next = std::min_element(sites_.begin(), sites_.end(), [](Site const& a, Site const& b) {
return a.nextRefresh < b.nextRefresh;
});
@@ -299,12 +313,15 @@ ValidatorSite::onRequestTimeout(std::size_t siteIdx, error_code const& ec)
// processes a network error. Usually, this function runs first,
// but on extremely rare occasions, the response handler can run
// first, which will leave activeResource empty.
auto const& site = sites_[siteIdx];
auto& site = sites_[siteIdx];
if (site.activeResource)
JLOG(j_.warn()) << "Request for " << site.activeResource->uri << " took too long";
else
JLOG(j_.error()) << "Request took too long, but a response has "
"already been processed";
if (!site.lastRefreshStatus)
site.lastRefreshStatus.emplace(
Site::Status{clock_type::now(), ListDisposition::invalid, "timeout"});
}
std::lock_guard lock_state{state_mutex_};

View File

@@ -3,6 +3,7 @@
#include <xrpld/rpc/Role.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/server/InfoSub.h>
namespace xrpl {
@@ -23,6 +24,7 @@ struct Context
LedgerMaster& ledgerMaster;
Resource::Consumer& consumer;
Role role;
std::shared_ptr<JobQueue::Coro> coro{};
InfoSub::pointer infoSub{};
unsigned int apiVersion;
};

View File

@@ -169,10 +169,13 @@ public:
private:
Json::Value
processSession(std::shared_ptr<WSSession> const& session, Json::Value const& jv);
processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobQueue::Coro> const& coro,
Json::Value const& jv);
void
processSession(std::shared_ptr<Session> const&);
processSession(std::shared_ptr<Session> const&, std::shared_ptr<JobQueue::Coro> coro);
void
processRequest(
@@ -180,6 +183,7 @@ private:
std::string const& request,
beast::IP::Endpoint const& remoteIPAddress,
Output&&,
std::shared_ptr<JobQueue::Coro> coro,
std::string_view forwardedFor,
std::string_view user);

View File

@@ -14,7 +14,6 @@
#include <xrpl/basics/make_SSLContext.h>
#include <xrpl/beast/net/IPAddressConversion.h>
#include <xrpl/beast/rfc2616.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/json/json_reader.h>
#include <xrpl/json/to_string.h>
@@ -285,10 +284,9 @@ ServerHandler::onRequest(Session& session)
}
std::shared_ptr<Session> detachedSession = session.detach();
auto const postResult = m_jobQueue.postCoroTask(
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](auto) -> CoroTask<void> {
processSession(detachedSession);
co_return;
auto const postResult = m_jobQueue.postCoro(
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
processSession(detachedSession, coro);
});
if (postResult == nullptr)
{
@@ -324,18 +322,17 @@ ServerHandler::onWSMessage(
JLOG(m_journal.trace()) << "Websocket received '" << jv << "'";
auto const postResult = m_jobQueue.postCoroTask(
auto const postResult = m_jobQueue.postCoro(
jtCLIENT_WEBSOCKET,
"WS-Client",
[this, session, jv = std::move(jv)](auto) -> CoroTask<void> {
auto const jr = this->processSession(session, jv);
[this, session, jv = std::move(jv)](std::shared_ptr<JobQueue::Coro> const& coro) {
auto const jr = this->processSession(session, coro, jv);
auto const s = to_string(jr);
auto const n = s.length();
boost::beast::multi_buffer sb(n);
sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(s.c_str(), n)));
session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
session->complete();
co_return;
});
if (postResult == nullptr)
{
@@ -376,7 +373,10 @@ logDuration(Json::Value const& request, T const& duration, beast::Journal& journ
}
Json::Value
ServerHandler::processSession(std::shared_ptr<WSSession> const& session, Json::Value const& jv)
ServerHandler::processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobQueue::Coro> const& coro,
Json::Value const& jv)
{
auto is = std::static_pointer_cast<WSInfoSub>(session->appDefined);
if (is->getConsumer().disconnect(m_journal))
@@ -443,6 +443,7 @@ ServerHandler::processSession(std::shared_ptr<WSSession> const& session, Json::V
app_.getLedgerMaster(),
is->getConsumer(),
role,
coro,
is,
apiVersion},
jv,
@@ -513,14 +514,18 @@ ServerHandler::processSession(std::shared_ptr<WSSession> const& session, Json::V
return jr;
}
// Run as a coroutine.
void
ServerHandler::processSession(std::shared_ptr<Session> const& session)
ServerHandler::processSession(
std::shared_ptr<Session> const& session,
std::shared_ptr<JobQueue::Coro> coro)
{
processRequest(
session->port(),
buffers_to_string(session->request().body().data()),
session->remoteAddress().at_port(0),
makeOutput(*session),
coro,
forwardedFor(session->request()),
[&] {
auto const iter = session->request().find("X-User");
@@ -557,6 +562,7 @@ ServerHandler::processRequest(
std::string const& request,
beast::IP::Endpoint const& remoteIPAddress,
Output&& output,
std::shared_ptr<JobQueue::Coro> coro,
std::string_view forwardedFor,
std::string_view user)
{
@@ -813,6 +819,7 @@ ServerHandler::processRequest(
app_.getLedgerMaster(),
usage,
role,
coro,
InfoSub::pointer(),
apiVersion},
params,

View File

@@ -7,9 +7,6 @@
#include <xrpl/protocol/RPCErr.h>
#include <xrpl/resource/Fees.h>
#include <condition_variable>
#include <mutex>
namespace xrpl {
// This interface is deprecated.
@@ -40,31 +37,98 @@ doRipplePathFind(RPC::JsonContext& context)
PathRequest::pointer request;
lpLedger = context.ledgerMaster.getClosedLedger();
// makeLegacyPathRequest enqueues a path-finding job that runs
// asynchronously. We block this thread with a condition_variable
// until the path-finding continuation signals completion.
// If makeLegacyPathRequest cannot schedule the job (e.g. during
// shutdown), it returns an empty request and we skip the wait.
std::mutex mtx;
std::condition_variable cv;
bool pathDone = false;
// It doesn't look like there's much odd happening here, but you should
// be aware this code runs in a JobQueue::Coro, which is a coroutine.
// And we may be flipping around between threads. Here's an overview:
//
// 1. We're running doRipplePathFind() due to a call to
// ripple_path_find. doRipplePathFind() is currently running
// inside of a JobQueue::Coro using a JobQueue thread.
//
// 2. doRipplePathFind's call to makeLegacyPathRequest() enqueues the
// path-finding request. That request will (probably) run at some
// indeterminate future time on a (probably different) JobQueue
// thread.
//
// 3. As a continuation from that path-finding JobQueue thread, the
// coroutine we're currently running in (!) is posted to the
// JobQueue. Because it is a continuation, that post won't
// happen until the path-finding request completes.
//
// 4. Once the continuation is enqueued, and we have reason to think
// the path-finding job is likely to run, then the coroutine we're
// running in yield()s. That means it surrenders its thread in
// the JobQueue. The coroutine is suspended, but ready to run,
// because it is kept resident by a shared_ptr in the
// path-finding continuation.
//
// 5. If all goes well then path-finding runs on a JobQueue thread
// and executes its continuation. The continuation posts this
// same coroutine (!) to the JobQueue.
//
// 6. When the JobQueue calls this coroutine, this coroutine resumes
// from the line below the coro->yield() and returns the
// path-finding result.
//
// With so many moving parts, what could go wrong?
//
// Just in terms of the JobQueue refusing to add jobs at shutdown
// there are two specific things that can go wrong.
//
// 1. The path-finding Job queued by makeLegacyPathRequest() might be
// rejected (because we're shutting down).
//
// Fortunately this problem can be addressed by looking at the
// return value of makeLegacyPathRequest(). If
// makeLegacyPathRequest() cannot get a thread to run the path-find
// on, then it returns an empty request.
//
// 2. The path-finding job might run, but the Coro::post() might be
// rejected by the JobQueue (because we're shutting down).
//
// We handle this case by resuming (not posting) the Coro.
// By resuming the Coro, we allow the Coro to run to completion
// on the current thread instead of requiring that it run on a
// new thread from the JobQueue.
//
// Both of these failure modes are hard to recreate in a unit test
// because they are so dependent on inter-thread timing. However
// the failure modes can be observed by synchronously (inside the
// rippled source code) shutting down the application. The code to
// do so looks like this:
//
// context.app.signalStop();
// while (! context.app.getJobQueue().jobCounter().joined()) { }
//
// The first line starts the process of shutting down the app.
// The second line waits until no more jobs can be added to the
// JobQueue before letting the thread continue.
//
// May 2017
jvResult = context.app.getPathRequests().makeLegacyPathRequest(
request,
[&]() {
[&context]() {
// Copying the shared_ptr keeps the coroutine alive up
// through the return. Otherwise the storage under the
// captured reference could evaporate when we return from
// coroCopy->resume(). This is not strictly necessary, but
// will make maintenance easier.
std::shared_ptr<JobQueue::Coro> coroCopy{context.coro};
if (!coroCopy->post())
{
std::lock_guard lk(mtx);
pathDone = true;
// The post() failed, so we won't get a thread to let
// the Coro finish. We'll call Coro::resume() so the
// Coro can finish on our thread. Otherwise the
// application will hang on shutdown.
coroCopy->resume();
}
cv.notify_one();
},
context.consumer,
lpLedger,
context.params);
if (request)
{
std::unique_lock lk(mtx);
cv.wait(lk, [&] { return pathDone; });
context.coro->yield();
jvResult = request->doStatus(context.params);
}