Compare commits

...

14 Commits

Author SHA1 Message Date
Pratik Mankawde
e807a3a89a Remove redundant CoroTask.h includes from test files
CoroTask.h is already transitively included via JobQueue.h.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 23:11:50 +00:00
Pratik Mankawde
b7caf78a81 Apply pre-commit formatting fixes to test files
clang-format auto-formatting adjustments.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 23:11:25 +00:00
Pratik Mankawde
5e899a65b5 Migrate Coroutine_test and JobQueue_test from Boost.Coroutine to C++20 coroutines
Rewrite all test coroutine code to use postCoroTask() and co_await
runner->suspend() instead of the old postCoro() / Coro::yield() API:

- Coroutine_test: Migrate correct_order, incorrect_order, and
  thread_specific_storage tests. Replace shared_ptr<JobQueue::Coro>
  with shared_ptr<JobQueue::CoroTaskRunner>, yield() with co_await
  runner->suspend().
- JobQueue_test: Rename testPostCoro to testPostCoroTask. Migrate all
  3 sub-tests (repeated post, repeated resume, shutdown rejection)
  to the new API.

After this change, zero .cpp files reference postCoro() or
JobQueue::Coro. The old API declarations remain in JobQueue.h and
Coro.ipp for removal in the cleanup milestone.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 23:11:25 +00:00
Pratik Mankawde
2b94358d4e Fix circular ref leak in postCoroTask and add CoroTask<T> tests
Fix a circular reference leak in CoroTaskRunner::expectEarlyExit().
When postCoroTask() fails to post, the coroutine frame holds a
shared_ptr back to the CoroTaskRunner, creating an unreachable
cycle. Breaking the cycle by destroying the task in expectEarlyExit()
fixes the leak.

Add 3 tests for CoroTask<T> (value-returning coroutines):
- testValueReturn: inner coroutine returns int via co_return
- testValueException: exception propagation from inner coroutine
- testValueChaining: nested value-returning coroutine chain

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 23:11:09 +00:00
Pratik Mankawde
c03645d0c6 Fix clang-format in CoroTaskRunner.ipp
Collapse multi-line function calls to match CI clang-format version.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 22:50:45 +00:00
Pratik Mankawde
7b5f0d1916 Add missing words to cspell dictionary
Add cppcoro, fcontext, gantt, pratik, repost, stackful to
cspell.config.yaml to fix cspell check failures.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 22:45:02 +00:00
Pratik Mankawde
2fbda82edc Apply pre-commit formatting fixes
clang-format and prettier auto-formatting adjustments.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 22:38:41 +00:00
Pratik Mankawde
4d9e594db9 Fix missed Context aggregate initialization in Application.cpp
Remove the extra {} that was for the now-deleted Context::coro field
in the RPC::JsonContext construction in Application::startGeometry().

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 22:27:43 +00:00
Pratik Mankawde
04fe18d77c Migrate production entry points from Boost.Coroutine to C++20 coroutines
Replace all postCoro() call sites with postCoroTask() using C++20
coroutine lambdas. The key changes are:

- Remove Context::coro field (shared_ptr<JobQueue::Coro>) from
  RPC::Context, eliminating it from all aggregate initializations
- Replace RipplePathFind's yield/post/resume pattern with a local
  std::condition_variable that blocks until path-finding completes,
  avoiding colored-function infection across the RPC call chain
- Switch ServerHandler entry points (onRequest, onWSMessage) from
  postCoro to postCoroTask with co_return lambdas
- Switch GRPCServer::CallData::process() to use postCoroTask,
  rename private handler to processRequest()
- Update Path_test and AMMTest to use postCoroTask (they set
  context.coro which no longer exists)

The old postCoro() API remains available for Coroutine_test and
JobQueue_test, which will be migrated in a subsequent commit.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 22:00:37 +00:00
Pratik Mankawde
ca15c0efd7 Add C++20 coroutine primitives: CoroTask, CoroTaskRunner, JobQueueAwaiter
Introduce the core building blocks for migrating from Boost.Coroutine to
C++20 stackless coroutines (Milestone 1):

- CoroTask<T>: RAII coroutine return type with promise_type, symmetric
  transfer via FinalAwaiter, and lazy start (suspend_always)
- CoroTaskRunner: Lifecycle manager (nested in JobQueue) mirroring the
  existing Coro class — handles LocalValues swap, nSuspend_ accounting,
  mutex-guarded resume, and join/post semantics
- JobQueueAwaiter: Convenience awaiter combining suspend + auto-repost,
  with graceful fallback when JobQueue is stopping
- postCoroTask(): JobQueue entry point for launching C++20 coroutines
- CoroTask_test.cpp: 8 unit tests covering completion, suspend/resume
  ordering, LocalValue isolation, exception propagation, and shutdown

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 17:38:28 +00:00
Pratik Mankawde
bb4bc1d167 doc updated with branch names
Signed-off-by: Pratik Mankawde <3397372+pratikmankawde@users.noreply.github.com>
2026-02-25 17:38:08 +00:00
Pratik Mankawde
b9d14fb9e1 document update
Signed-off-by: Pratik Mankawde <3397372+pratikmankawde@users.noreply.github.com>
2026-02-25 17:03:00 +00:00
Pratik Mankawde
af30b71043 Plan doc added
Signed-off-by: Pratik Mankawde <3397372+pratikmankawde@users.noreply.github.com>
2026-02-25 16:36:45 +00:00
Ayaz Salikhov
65e63ebef3 chore: Update cleanup-workspace to delete old .conan2 dir on macOS (#6412) 2026-02-25 01:12:16 +00:00
20 changed files with 2594 additions and 182 deletions

View File

@@ -101,7 +101,7 @@ jobs:
steps:
- name: Cleanup workspace (macOS and Windows)
if: ${{ runner.os == 'macOS' || runner.os == 'Windows' }}
uses: XRPLF/actions/cleanup-workspace@cf0433aa74563aead044a1e395610c96d65a37cf
uses: XRPLF/actions/cleanup-workspace@c7d9ce5ebb03c752a354889ecd870cadfc2b1cd4
- name: Checkout repository
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2

View File

@@ -64,7 +64,7 @@ jobs:
steps:
- name: Cleanup workspace (macOS and Windows)
if: ${{ runner.os == 'macOS' || runner.os == 'Windows' }}
uses: XRPLF/actions/cleanup-workspace@cf0433aa74563aead044a1e395610c96d65a37cf
uses: XRPLF/actions/cleanup-workspace@c7d9ce5ebb03c752a354889ecd870cadfc2b1cd4
- name: Checkout repository
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2

File diff suppressed because it is too large Load Diff

View File

@@ -77,6 +77,7 @@ words:
- coro
- coros
- cowid
- cppcoro
- cryptocondition
- cryptoconditional
- cryptoconditions
@@ -99,11 +100,13 @@ words:
- endmacro
- exceptioned
- Falco
- fcontext
- finalizers
- firewalled
- fmtdur
- fsanitize
- funclets
- gantt
- gcov
- gcovr
- ghead
@@ -192,6 +195,7 @@ words:
- permissioned
- pointee
- populator
- pratik
- preauth
- preauthorization
- preauthorize
@@ -206,6 +210,7 @@ words:
- queuable
- Raphson
- replayer
- repost
- rerere
- retriable
- RIPD
@@ -236,6 +241,7 @@ words:
- soci
- socidb
- sslws
- stackful
- statsd
- STATSDCOLLECTOR
- stissue

View File

@@ -0,0 +1,289 @@
#pragma once
#include <coroutine>
#include <exception>
#include <utility>
#include <variant>
namespace xrpl {
template <typename T = void>
class CoroTask;
// --------------------------------------------------------------------------
// CoroTask<void> — coroutine return type for void-returning coroutines
// --------------------------------------------------------------------------
template <>
class CoroTask<void>
{
public:
struct promise_type;
using Handle = std::coroutine_handle<promise_type>;
struct promise_type
{
std::exception_ptr exception_;
std::coroutine_handle<> continuation_;
CoroTask
get_return_object()
{
return CoroTask{Handle::from_promise(*this)};
}
std::suspend_always
initial_suspend() noexcept
{
return {};
}
struct FinalAwaiter
{
bool
await_ready() noexcept
{
return false;
}
std::coroutine_handle<>
await_suspend(Handle h) noexcept
{
if (auto cont = h.promise().continuation_)
return cont;
return std::noop_coroutine();
}
void
await_resume() noexcept
{
}
};
FinalAwaiter
final_suspend() noexcept
{
return {};
}
void
return_void()
{
}
void
unhandled_exception()
{
exception_ = std::current_exception();
}
};
CoroTask() = default;
explicit CoroTask(Handle h) : handle_(h)
{
}
~CoroTask()
{
if (handle_)
handle_.destroy();
}
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
{
}
CoroTask&
operator=(CoroTask&& other) noexcept
{
if (this != &other)
{
if (handle_)
handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
CoroTask(CoroTask const&) = delete;
CoroTask&
operator=(CoroTask const&) = delete;
Handle
handle() const
{
return handle_;
}
bool
done() const
{
return handle_ && handle_.done();
}
// Awaiter interface — allows co_await on a CoroTask
bool
await_ready() const noexcept
{
return false;
}
std::coroutine_handle<>
await_suspend(std::coroutine_handle<> caller) noexcept
{
handle_.promise().continuation_ = caller;
return handle_; // Symmetric transfer
}
void
await_resume()
{
if (auto& ep = handle_.promise().exception_)
std::rethrow_exception(ep);
}
private:
Handle handle_;
};
// --------------------------------------------------------------------------
// CoroTask<T> — coroutine return type for value-returning coroutines
// --------------------------------------------------------------------------
template <typename T>
class CoroTask
{
public:
struct promise_type;
using Handle = std::coroutine_handle<promise_type>;
struct promise_type
{
std::variant<std::monostate, T, std::exception_ptr> result_;
std::coroutine_handle<> continuation_;
CoroTask
get_return_object()
{
return CoroTask{Handle::from_promise(*this)};
}
std::suspend_always
initial_suspend() noexcept
{
return {};
}
struct FinalAwaiter
{
bool
await_ready() noexcept
{
return false;
}
std::coroutine_handle<>
await_suspend(Handle h) noexcept
{
if (auto cont = h.promise().continuation_)
return cont;
return std::noop_coroutine();
}
void
await_resume() noexcept
{
}
};
FinalAwaiter
final_suspend() noexcept
{
return {};
}
void
return_value(T value)
{
result_.template emplace<1>(std::move(value));
}
void
unhandled_exception()
{
result_.template emplace<2>(std::current_exception());
}
};
CoroTask() = default;
explicit CoroTask(Handle h) : handle_(h)
{
}
~CoroTask()
{
if (handle_)
handle_.destroy();
}
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
{
}
CoroTask&
operator=(CoroTask&& other) noexcept
{
if (this != &other)
{
if (handle_)
handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
CoroTask(CoroTask const&) = delete;
CoroTask&
operator=(CoroTask const&) = delete;
Handle
handle() const
{
return handle_;
}
bool
done() const
{
return handle_ && handle_.done();
}
bool
await_ready() const noexcept
{
return false;
}
std::coroutine_handle<>
await_suspend(std::coroutine_handle<> caller) noexcept
{
handle_.promise().continuation_ = caller;
return handle_;
}
T
await_resume()
{
auto& result = handle_.promise().result_;
if (auto* ep = std::get_if<2>(&result))
std::rethrow_exception(*ep);
return std::get<1>(std::move(result));
}
private:
Handle handle_;
};
} // namespace xrpl

View File

@@ -0,0 +1,152 @@
#pragma once
namespace xrpl {
inline JobQueue::CoroTaskRunner::CoroTaskRunner(
create_t,
JobQueue& jq,
JobType type,
std::string const& name)
: jq_(jq), type_(type), name_(name), running_(false)
{
}
template <class F>
void
JobQueue::CoroTaskRunner::init(F&& f)
{
task_ = std::forward<F>(f)(shared_from_this());
}
inline JobQueue::CoroTaskRunner::~CoroTaskRunner()
{
#ifndef NDEBUG
XRPL_ASSERT(
finished_,
"xrpl::JobQueue::CoroTaskRunner::~CoroTaskRunner : is finished");
#endif
}
inline void
JobQueue::CoroTaskRunner::onSuspend()
{
std::lock_guard lock(jq_.m_mutex);
++jq_.nSuspend_;
}
inline void
JobQueue::CoroTaskRunner::onUndoSuspend()
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
}
inline auto
JobQueue::CoroTaskRunner::suspend()
{
struct SuspendAwaiter
{
CoroTaskRunner& runner_;
bool
await_ready() const noexcept
{
return false;
}
void
await_suspend(std::coroutine_handle<>) const
{
runner_.onSuspend();
}
void
await_resume() const noexcept
{
}
};
return SuspendAwaiter{*this};
}
inline bool
JobQueue::CoroTaskRunner::post()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
// sp prevents 'this' from being destroyed while the job is pending
if (jq_.addJob(type_, name_, [this, sp = shared_from_this()]() { resume(); }))
{
return true;
}
// The coroutine will not run. Clean up running_.
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
return false;
}
inline void
JobQueue::CoroTaskRunner::resume()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
}
auto saved = detail::getLocalValues().release();
detail::getLocalValues().reset(&lvs_);
std::lock_guard lock(mutex_);
XRPL_ASSERT(!task_.done(), "xrpl::JobQueue::CoroTaskRunner::resume : task is not done");
task_.handle().resume();
detail::getLocalValues().release();
detail::getLocalValues().reset(saved);
#ifndef NDEBUG
if (task_.done())
finished_ = true;
#endif
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
}
inline bool
JobQueue::CoroTaskRunner::runnable() const
{
return !task_.done();
}
inline void
JobQueue::CoroTaskRunner::expectEarlyExit()
{
#ifndef NDEBUG
if (!finished_)
#endif
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
#ifndef NDEBUG
finished_ = true;
#endif
}
// Destroy the coroutine frame to break a potential shared_ptr cycle.
// The coroutine is at initial_suspend and never ran user code, so
// destroying it is safe. Without this, the frame holds a shared_ptr
// back to this CoroTaskRunner, creating an unreachable reference cycle.
task_ = {};
}
inline void
JobQueue::CoroTaskRunner::join()
{
std::unique_lock<std::mutex> lk(mutex_run_);
cv_.wait(lk, [this]() { return running_ == false; });
}
} // namespace xrpl

View File

@@ -2,6 +2,7 @@
#include <xrpl/basics/LocalValue.h>
#include <xrpl/core/ClosureCounter.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/core/JobTypeData.h>
#include <xrpl/core/JobTypes.h>
#include <xrpl/core/detail/Workers.h>
@@ -9,6 +10,7 @@
#include <boost/coroutine/all.hpp>
#include <coroutine>
#include <set>
namespace xrpl {
@@ -119,6 +121,92 @@ public:
join();
};
/** C++20 coroutine lifecycle manager. Replaces Coro for new code. */
class CoroTaskRunner : public std::enable_shared_from_this<CoroTaskRunner>
{
private:
detail::LocalValues lvs_;
JobQueue& jq_;
JobType type_;
std::string name_;
bool running_;
std::mutex mutex_;
std::mutex mutex_run_;
std::condition_variable cv_;
CoroTask<void> task_;
#ifndef NDEBUG
bool finished_ = false;
#endif
public:
struct create_t
{
explicit create_t() = default;
};
// Private: Used in the implementation of postCoroTask
CoroTaskRunner(create_t, JobQueue&, JobType, std::string const&);
// Not copy-constructible or assignable
CoroTaskRunner(CoroTaskRunner const&) = delete;
CoroTaskRunner&
operator=(CoroTaskRunner const&) = delete;
~CoroTaskRunner();
/** Initialize with a coroutine function.
Must be called exactly once, after the object is managed by
shared_ptr. This is handled automatically by postCoroTask().
*/
template <class F>
void
init(F&& f);
/** Increment the suspended coroutine count.
Called when the coroutine is about to suspend.
*/
void
onSuspend();
/** Decrement the suspended coroutine count without side effects.
Used to undo onSuspend() when a suspend is cancelled.
*/
void
onUndoSuspend();
/** Suspend coroutine execution.
Returns an awaiter for use with co_await.
Effects:
Increments nSuspend_ in the JobQueue.
The coroutine is suspended.
The caller must later call post() or resume() to continue.
*/
auto
suspend();
/** Schedule coroutine execution on the JobQueue.
@return true if the job is added to the JobQueue.
*/
bool
post();
/** Resume coroutine on current thread. */
void
resume();
/** Returns true if coroutine hasn't completed. */
bool
runnable() const;
/** Once called, allows early exit without an assert. */
void
expectEarlyExit();
/** Waits until coroutine completes. */
void
join();
};
using JobFunction = std::function<void()>;
JobQueue(
@@ -165,6 +253,19 @@ public:
std::shared_ptr<Coro>
postCoro(JobType t, std::string const& name, F&& f);
/** Creates a C++20 coroutine and adds a job to the queue to run it.
@param t The type of job.
@param name Name of the job.
@param f Callable with signature
CoroTask<void>(std::shared_ptr<CoroTaskRunner>).
@return shared_ptr to posted CoroTaskRunner. nullptr if not successful.
*/
template <class F>
std::shared_ptr<CoroTaskRunner>
postCoroTask(JobType t, std::string const& name, F&& f);
/** Jobs waiting at this priority.
*/
int
@@ -379,6 +480,7 @@ private:
} // namespace xrpl
#include <xrpl/core/Coro.ipp>
#include <xrpl/core/CoroTaskRunner.ipp>
namespace xrpl {
@@ -401,4 +503,26 @@ JobQueue::postCoro(JobType t, std::string const& name, F&& f)
return coro;
}
template <class F>
std::shared_ptr<JobQueue::CoroTaskRunner>
JobQueue::postCoroTask(JobType t, std::string const& name, F&& f)
{
auto runner = std::make_shared<CoroTaskRunner>(CoroTaskRunner::create_t{}, *this, t, name);
runner->init(std::forward<F>(f));
// Account for the initial suspension (lazy start).
// Mirrors the yield() in the Boost Coro constructor.
{
std::lock_guard lock(m_mutex);
++nSuspend_;
}
if (!runner->post())
{
runner->expectEarlyExit();
runner.reset();
}
return runner;
}
} // namespace xrpl

View File

@@ -0,0 +1,56 @@
#pragma once
#include <xrpl/core/JobQueue.h>
#include <coroutine>
#include <memory>
namespace xrpl {
/** Awaiter that suspends and immediately reschedules on the JobQueue.
Equivalent to calling yield() followed by post() in the old Coro API.
Usage:
co_await JobQueueAwaiter{runner};
What it waits for: The coroutine is re-queued as a job and resumes
when a worker thread picks it up.
Which thread resumes: A JobQueue worker thread.
What await_resume() returns: void.
*/
struct JobQueueAwaiter
{
std::shared_ptr<JobQueue::CoroTaskRunner> runner;
bool
await_ready() const noexcept
{
return false;
}
bool
await_suspend(std::coroutine_handle<>)
{
// Increment nSuspend (equivalent to yield())
runner->onSuspend();
// Schedule resume on JobQueue (equivalent to post())
if (!runner->post())
{
// JobQueue is stopping. Undo the suspend count and
// don't actually suspend — the coroutine continues
// immediately so it can clean up and co_return.
runner->onUndoSuspend();
return false;
}
return true;
}
void
await_resume() const noexcept
{
}
};
} // namespace xrpl

View File

@@ -8,6 +8,7 @@
#include <xrpld/rpc/detail/Tuning.h>
#include <xrpl/beast/unit_test.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/json/json_reader.h>
#include <xrpl/protocol/ApiVersion.h>
@@ -131,7 +132,6 @@ public:
c,
Role::USER,
{},
{},
RPC::apiVersionIfUnspecified},
{},
{}};
@@ -155,11 +155,11 @@ public:
Json::Value result;
gate g;
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = std::move(params);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
using namespace std::chrono_literals;
@@ -240,28 +240,27 @@ public:
c,
Role::USER,
{},
{},
RPC::apiVersionIfUnspecified},
{},
{}};
Json::Value result;
gate g;
// Test RPC::Tuning::max_src_cur source currencies.
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = rpf(Account("alice"), Account("bob"), RPC::Tuning::max_src_cur);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(!result.isMember(jss::error));
// Test more than RPC::Tuning::max_src_cur source currencies.
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = rpf(Account("alice"), Account("bob"), RPC::Tuning::max_src_cur + 1);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(result.isMember(jss::error));
@@ -269,22 +268,22 @@ public:
// Test RPC::Tuning::max_auto_src_cur source currencies.
for (auto i = 0; i < (RPC::Tuning::max_auto_src_cur - 1); ++i)
env.trust(Account("alice")[std::to_string(i + 100)](100), "bob");
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = rpf(Account("alice"), Account("bob"), 0);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(!result.isMember(jss::error));
// Test more than RPC::Tuning::max_auto_src_cur source currencies.
env.trust(Account("alice")["AUD"](100), "bob");
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = rpf(Account("alice"), Account("bob"), 0);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(result.isMember(jss::error));

View File

@@ -0,0 +1,448 @@
#include <test/jtx.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/core/JobQueueAwaiter.h>
#include <chrono>
#include <mutex>
namespace xrpl {
namespace test {
class CoroTask_test : public beast::unit_test::suite
{
public:
class gate
{
private:
std::condition_variable cv_;
std::mutex mutex_;
bool signaled_ = false;
public:
template <class Rep, class Period>
bool
wait_for(std::chrono::duration<Rep, Period> const& rel_time)
{
std::unique_lock<std::mutex> lk(mutex_);
auto b = cv_.wait_for(lk, rel_time, [this] { return signaled_; });
signaled_ = false;
return b;
}
void
signal()
{
std::lock_guard lk(mutex_);
signaled_ = true;
cv_.notify_all();
}
};
// Test: CoroTask<void> runs to completion
void
testVoidCompletion()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("void completion");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto) -> CoroTask<void> {
g.signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(!runner->runnable());
}
// Test: correct_order — suspend, join, post, complete
// Mirrors existing Coroutine_test::correct_order
void
testCorrectOrder()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("correct order");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g1, g2;
std::shared_ptr<JobQueue::CoroTaskRunner> r;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto runner) -> CoroTask<void> {
r = runner;
g1.signal();
co_await runner->suspend();
g2.signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g1.wait_for(5s));
runner->join();
runner->post();
BEAST_EXPECT(g2.wait_for(5s));
runner->join();
}
// Test: incorrect_order — post before suspend
// Mirrors existing Coroutine_test::incorrect_order
void
testIncorrectOrder()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("incorrect order");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto runner) -> CoroTask<void> {
runner->post();
co_await runner->suspend();
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
}
// Test: JobQueueAwaiter — suspend + auto-repost
void
testJobQueueAwaiter()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("JobQueueAwaiter");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int step = 0;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto runner) -> CoroTask<void> {
step = 1;
co_await JobQueueAwaiter{runner};
step = 2;
co_await JobQueueAwaiter{runner};
step = 3;
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(step == 3);
}
// Test: thread_specific_storage — per-coroutine LocalValue isolation
// Mirrors existing Coroutine_test::thread_specific_storage
void
testThreadSpecificStorage()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("thread specific storage");
Env env(*this);
auto& jq = env.app().getJobQueue();
static int const N = 4;
std::array<std::shared_ptr<JobQueue::CoroTaskRunner>, N> a;
LocalValue<int> lv(-1);
BEAST_EXPECT(*lv == -1);
gate g;
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
this->BEAST_EXPECT(*lv == -1);
*lv = -2;
this->BEAST_EXPECT(*lv == -2);
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(*lv == -1);
for (int i = 0; i < N; ++i)
{
jq.postCoroTask(jtCLIENT, "CoroTaskTest", [&, id = i](auto runner) -> CoroTask<void> {
a[id] = runner;
g.signal();
co_await runner->suspend();
this->BEAST_EXPECT(*lv == -1);
*lv = id;
this->BEAST_EXPECT(*lv == id);
g.signal();
co_await runner->suspend();
this->BEAST_EXPECT(*lv == id);
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
a[i]->join();
}
for (auto const& r : a)
{
r->post();
BEAST_EXPECT(g.wait_for(5s));
r->join();
}
for (auto const& r : a)
{
r->post();
r->join();
}
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
this->BEAST_EXPECT(*lv == -2);
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(*lv == -1);
}
// Test: exception propagation
void
testExceptionPropagation()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("exception propagation");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto) -> CoroTask<void> {
g.signal();
throw std::runtime_error("test exception");
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
// The exception is caught by promise_type::unhandled_exception()
// and the coroutine is considered done
BEAST_EXPECT(!runner->runnable());
}
// Test: multiple sequential co_await points
void
testMultipleYields()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("multiple yields");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int counter = 0;
std::shared_ptr<JobQueue::CoroTaskRunner> r;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto runner) -> CoroTask<void> {
r = runner;
++counter;
g.signal();
co_await runner->suspend();
++counter;
g.signal();
co_await runner->suspend();
++counter;
g.signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 1);
runner->join();
runner->post();
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 2);
runner->join();
runner->post();
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 3);
runner->join();
BEAST_EXPECT(!runner->runnable());
}
// Test: CoroTask<T> returns a value via co_return
void
testValueReturn()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value return");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int result = 0;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto) -> CoroTask<void> {
auto inner = []() -> CoroTask<int> { co_return 42; };
result = co_await inner();
g.signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(result == 42);
BEAST_EXPECT(!runner->runnable());
}
// Test: CoroTask<T> propagates exceptions from inner coroutines
void
testValueException()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value exception");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
bool caught = false;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto) -> CoroTask<void> {
auto inner = []() -> CoroTask<int> {
throw std::runtime_error("inner error");
co_return 0;
};
try
{
co_await inner();
}
catch (std::runtime_error const& e)
{
caught = true;
}
g.signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(caught);
BEAST_EXPECT(!runner->runnable());
}
// Test: CoroTask<T> chaining — nested value-returning coroutines
void
testValueChaining()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value chaining");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int result = 0;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto) -> CoroTask<void> {
auto add = [](int a, int b) -> CoroTask<int> { co_return a + b; };
auto mul = [&](int a, int b) -> CoroTask<int> {
int sum = co_await add(a, b);
co_return sum * 2;
};
result = co_await mul(3, 4);
g.signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(result == 14); // (3 + 4) * 2
BEAST_EXPECT(!runner->runnable());
}
// Test: postCoroTask returns nullptr when JobQueue is stopping
void
testShutdownRejection()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("shutdown rejection");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
// Stop the JobQueue
env.app().getJobQueue().stop();
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [&](auto) -> CoroTask<void> { co_return; });
BEAST_EXPECT(!runner);
}
void
run() override
{
testVoidCompletion();
testCorrectOrder();
testIncorrectOrder();
testJobQueueAwaiter();
testThreadSpecificStorage();
testExceptionPropagation();
testMultipleYields();
testValueReturn();
testValueException();
testValueChaining();
testShutdownRejection();
}
};
BEAST_DEFINE_TESTSUITE(CoroTask, core, xrpl);
} // namespace test
} // namespace xrpl

View File

@@ -54,13 +54,15 @@ public:
}));
gate g1, g2;
std::shared_ptr<JobQueue::Coro> c;
env.app().getJobQueue().postCoro(jtCLIENT, "CoroTest", [&](auto const& cr) {
c = cr;
g1.signal();
c->yield();
g2.signal();
});
std::shared_ptr<JobQueue::CoroTaskRunner> c;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTest", [&](auto runner) -> CoroTask<void> {
c = runner;
g1.signal();
co_await runner->suspend();
g2.signal();
co_return;
});
BEAST_EXPECT(g1.wait_for(5s));
c->join();
c->post();
@@ -81,11 +83,17 @@ public:
}));
gate g;
env.app().getJobQueue().postCoro(jtCLIENT, "CoroTest", [&](auto const& c) {
c->post();
c->yield();
g.signal();
});
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTest", [&](auto runner) -> CoroTask<void> {
// Schedule a resume before suspending. The posted job
// cannot actually call resume() until the current resume()
// releases CoroTaskRunner::mutex_, which only happens after
// the coroutine suspends at co_await.
runner->post();
co_await runner->suspend();
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
}
@@ -101,7 +109,7 @@ public:
auto& jq = env.app().getJobQueue();
static int const N = 4;
std::array<std::shared_ptr<JobQueue::Coro>, N> a;
std::array<std::shared_ptr<JobQueue::CoroTaskRunner>, N> a;
LocalValue<int> lv(-1);
BEAST_EXPECT(*lv == -1);
@@ -118,18 +126,19 @@ public:
for (int i = 0; i < N; ++i)
{
jq.postCoro(jtCLIENT, "CoroTest", [&, id = i](auto const& c) {
a[id] = c;
jq.postCoroTask(jtCLIENT, "CoroTest", [&, id = i](auto runner) -> CoroTask<void> {
a[id] = runner;
g.signal();
c->yield();
co_await runner->suspend();
this->BEAST_EXPECT(*lv == -1);
*lv = id;
this->BEAST_EXPECT(*lv == id);
g.signal();
c->yield();
co_await runner->suspend();
this->BEAST_EXPECT(*lv == id);
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
a[i]->join();

View File

@@ -44,86 +44,85 @@ class JobQueue_test : public beast::unit_test::suite
}
void
testPostCoro()
testPostCoroTask()
{
jtx::Env env{*this};
JobQueue& jQueue = env.app().getJobQueue();
{
// Test repeated post()s until the Coro completes.
// Test repeated post()s until the coroutine completes.
std::atomic<int> yieldCount{0};
auto const coro = jQueue.postCoro(
jtCLIENT,
"PostCoroTest1",
[&yieldCount](std::shared_ptr<JobQueue::Coro> const& coroCopy) {
auto const runner = jQueue.postCoroTask(
jtCLIENT, "PostCoroTest1", [&yieldCount](auto runner) -> CoroTask<void> {
while (++yieldCount < 4)
coroCopy->yield();
co_await runner->suspend();
co_return;
});
BEAST_EXPECT(coro != nullptr);
BEAST_EXPECT(runner != nullptr);
// Wait for the Job to run and yield.
while (yieldCount == 0)
;
// Now re-post until the Coro says it is done.
// Now re-post until the CoroTaskRunner says it is done.
int old = yieldCount;
while (coro->runnable())
while (runner->runnable())
{
BEAST_EXPECT(coro->post());
BEAST_EXPECT(runner->post());
while (old == yieldCount)
{
}
coro->join();
runner->join();
BEAST_EXPECT(++old == yieldCount);
}
BEAST_EXPECT(yieldCount == 4);
}
{
// Test repeated resume()s until the Coro completes.
// Test repeated resume()s until the coroutine completes.
int yieldCount{0};
auto const coro = jQueue.postCoro(
jtCLIENT,
"PostCoroTest2",
[&yieldCount](std::shared_ptr<JobQueue::Coro> const& coroCopy) {
auto const runner = jQueue.postCoroTask(
jtCLIENT, "PostCoroTest2", [&yieldCount](auto runner) -> CoroTask<void> {
while (++yieldCount < 4)
coroCopy->yield();
co_await runner->suspend();
co_return;
});
if (!coro)
if (!runner)
{
// There's no good reason we should not get a Coro, but we
// There's no good reason we should not get a runner, but we
// can't continue without one.
BEAST_EXPECT(false);
return;
}
// Wait for the Job to run and yield.
coro->join();
runner->join();
// Now resume until the Coro says it is done.
// Now resume until the CoroTaskRunner says it is done.
int old = yieldCount;
while (coro->runnable())
while (runner->runnable())
{
coro->resume(); // Resume runs synchronously on this thread.
runner->resume(); // Resume runs synchronously on this thread.
BEAST_EXPECT(++old == yieldCount);
}
BEAST_EXPECT(yieldCount == 4);
}
{
// If the JobQueue is stopped, we should no
// longer be able to add a Coro (and calling postCoro() should
// return false).
// longer be able to post a coroutine (and calling postCoroTask()
// should return nullptr).
using namespace std::chrono_literals;
jQueue.stop();
// The Coro should never run, so having the Coro access this
// The coroutine should never run, so having it access this
// unprotected variable on the stack should be completely safe.
// Not recommended for the faint of heart...
bool unprotected;
auto const coro = jQueue.postCoro(
jtCLIENT, "PostCoroTest3", [&unprotected](std::shared_ptr<JobQueue::Coro> const&) {
auto const runner = jQueue.postCoroTask(
jtCLIENT, "PostCoroTest3", [&unprotected](auto) -> CoroTask<void> {
unprotected = false;
co_return;
});
BEAST_EXPECT(coro == nullptr);
BEAST_EXPECT(runner == nullptr);
}
}
@@ -132,7 +131,7 @@ public:
run() override
{
testAddJob();
testPostCoro();
testPostCoroTask();
}
};

View File

@@ -6,6 +6,7 @@
#include <xrpld/rpc/RPCHandler.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/protocol/ApiVersion.h>
#include <xrpl/protocol/STParsedJSON.h>
#include <xrpl/resource/Fees.h>
@@ -193,7 +194,6 @@ AMMTest::find_paths_request(
c,
Role::USER,
{},
{},
RPC::apiVersionIfUnspecified},
{},
{}};
@@ -215,11 +215,11 @@ AMMTest::find_paths_request(
Json::Value result;
gate g;
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = std::move(params);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
using namespace std::chrono_literals;

View File

@@ -1425,7 +1425,6 @@ ApplicationImp::setup(boost::program_options::variables_map const& cmdline)
c,
Role::ADMIN,
{},
{},
RPC::apiMaximumSupportedVersion},
jvCommand};

View File

@@ -3,6 +3,7 @@
#include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/beast/net/IPAddressConversion.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/resource/Fees.h>
namespace xrpl {
@@ -99,13 +100,14 @@ GRPCServerImpl::CallData<Request, Response>::process()
// ensures that finished is always true when this CallData object
// is returned as a tag in handleRpcs(), after sending the response
finished_ = true;
auto coro = app_.getJobQueue().postCoro(
JobType::jtRPC, "gRPC-Client", [thisShared](std::shared_ptr<JobQueue::Coro> coro) {
thisShared->process(coro);
auto runner = app_.getJobQueue().postCoroTask(
JobType::jtRPC, "gRPC-Client", [thisShared](auto) -> CoroTask<void> {
thisShared->processRequest();
co_return;
});
// If coro is null, then the JobQueue has already been shutdown
if (!coro)
// If runner is null, then the JobQueue has already been shutdown
if (!runner)
{
grpc::Status status{grpc::StatusCode::INTERNAL, "Job Queue is already stopped"};
responder_.FinishWithError(status, this);
@@ -114,7 +116,7 @@ GRPCServerImpl::CallData<Request, Response>::process()
template <class Request, class Response>
void
GRPCServerImpl::CallData<Request, Response>::process(std::shared_ptr<JobQueue::Coro> coro)
GRPCServerImpl::CallData<Request, Response>::processRequest()
{
try
{
@@ -156,7 +158,6 @@ GRPCServerImpl::CallData<Request, Response>::process(std::shared_ptr<JobQueue::C
app_.getLedgerMaster(),
usage,
role,
coro,
InfoSub::pointer(),
apiVersion},
request_};

View File

@@ -208,7 +208,7 @@ private:
private:
// process the request. Called inside the coroutine passed to JobQueue
void
process(std::shared_ptr<JobQueue::Coro> coro);
processRequest();
// return load type of this RPC
Resource::Charge

View File

@@ -3,7 +3,6 @@
#include <xrpld/rpc/Role.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/server/InfoSub.h>
namespace xrpl {
@@ -24,7 +23,6 @@ struct Context
LedgerMaster& ledgerMaster;
Resource::Consumer& consumer;
Role role;
std::shared_ptr<JobQueue::Coro> coro{};
InfoSub::pointer infoSub{};
unsigned int apiVersion;
};

View File

@@ -169,13 +169,10 @@ public:
private:
Json::Value
processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobQueue::Coro> const& coro,
Json::Value const& jv);
processSession(std::shared_ptr<WSSession> const& session, Json::Value const& jv);
void
processSession(std::shared_ptr<Session> const&, std::shared_ptr<JobQueue::Coro> coro);
processSession(std::shared_ptr<Session> const&);
void
processRequest(
@@ -183,7 +180,6 @@ private:
std::string const& request,
beast::IP::Endpoint const& remoteIPAddress,
Output&&,
std::shared_ptr<JobQueue::Coro> coro,
std::string_view forwardedFor,
std::string_view user);

View File

@@ -14,6 +14,7 @@
#include <xrpl/basics/make_SSLContext.h>
#include <xrpl/beast/net/IPAddressConversion.h>
#include <xrpl/beast/rfc2616.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/json/json_reader.h>
#include <xrpl/json/to_string.h>
@@ -284,9 +285,10 @@ ServerHandler::onRequest(Session& session)
}
std::shared_ptr<Session> detachedSession = session.detach();
auto const postResult = m_jobQueue.postCoro(
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
processSession(detachedSession, coro);
auto const postResult = m_jobQueue.postCoroTask(
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](auto) -> CoroTask<void> {
processSession(detachedSession);
co_return;
});
if (postResult == nullptr)
{
@@ -322,17 +324,18 @@ ServerHandler::onWSMessage(
JLOG(m_journal.trace()) << "Websocket received '" << jv << "'";
auto const postResult = m_jobQueue.postCoro(
auto const postResult = m_jobQueue.postCoroTask(
jtCLIENT_WEBSOCKET,
"WS-Client",
[this, session, jv = std::move(jv)](std::shared_ptr<JobQueue::Coro> const& coro) {
auto const jr = this->processSession(session, coro, jv);
[this, session, jv = std::move(jv)](auto) -> CoroTask<void> {
auto const jr = this->processSession(session, jv);
auto const s = to_string(jr);
auto const n = s.length();
boost::beast::multi_buffer sb(n);
sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(s.c_str(), n)));
session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
session->complete();
co_return;
});
if (postResult == nullptr)
{
@@ -373,10 +376,7 @@ logDuration(Json::Value const& request, T const& duration, beast::Journal& journ
}
Json::Value
ServerHandler::processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobQueue::Coro> const& coro,
Json::Value const& jv)
ServerHandler::processSession(std::shared_ptr<WSSession> const& session, Json::Value const& jv)
{
auto is = std::static_pointer_cast<WSInfoSub>(session->appDefined);
if (is->getConsumer().disconnect(m_journal))
@@ -443,7 +443,6 @@ ServerHandler::processSession(
app_.getLedgerMaster(),
is->getConsumer(),
role,
coro,
is,
apiVersion},
jv,
@@ -514,18 +513,14 @@ ServerHandler::processSession(
return jr;
}
// Run as a coroutine.
void
ServerHandler::processSession(
std::shared_ptr<Session> const& session,
std::shared_ptr<JobQueue::Coro> coro)
ServerHandler::processSession(std::shared_ptr<Session> const& session)
{
processRequest(
session->port(),
buffers_to_string(session->request().body().data()),
session->remoteAddress().at_port(0),
makeOutput(*session),
coro,
forwardedFor(session->request()),
[&] {
auto const iter = session->request().find("X-User");
@@ -562,7 +557,6 @@ ServerHandler::processRequest(
std::string const& request,
beast::IP::Endpoint const& remoteIPAddress,
Output&& output,
std::shared_ptr<JobQueue::Coro> coro,
std::string_view forwardedFor,
std::string_view user)
{
@@ -819,7 +813,6 @@ ServerHandler::processRequest(
app_.getLedgerMaster(),
usage,
role,
coro,
InfoSub::pointer(),
apiVersion},
params,

View File

@@ -7,6 +7,9 @@
#include <xrpl/protocol/RPCErr.h>
#include <xrpl/resource/Fees.h>
#include <condition_variable>
#include <mutex>
namespace xrpl {
// This interface is deprecated.
@@ -37,98 +40,31 @@ doRipplePathFind(RPC::JsonContext& context)
PathRequest::pointer request;
lpLedger = context.ledgerMaster.getClosedLedger();
// It doesn't look like there's much odd happening here, but you should
// be aware this code runs in a JobQueue::Coro, which is a coroutine.
// And we may be flipping around between threads. Here's an overview:
//
// 1. We're running doRipplePathFind() due to a call to
// ripple_path_find. doRipplePathFind() is currently running
// inside of a JobQueue::Coro using a JobQueue thread.
//
// 2. doRipplePathFind's call to makeLegacyPathRequest() enqueues the
// path-finding request. That request will (probably) run at some
// indeterminate future time on a (probably different) JobQueue
// thread.
//
// 3. As a continuation from that path-finding JobQueue thread, the
// coroutine we're currently running in (!) is posted to the
// JobQueue. Because it is a continuation, that post won't
// happen until the path-finding request completes.
//
// 4. Once the continuation is enqueued, and we have reason to think
// the path-finding job is likely to run, then the coroutine we're
// running in yield()s. That means it surrenders its thread in
// the JobQueue. The coroutine is suspended, but ready to run,
// because it is kept resident by a shared_ptr in the
// path-finding continuation.
//
// 5. If all goes well then path-finding runs on a JobQueue thread
// and executes its continuation. The continuation posts this
// same coroutine (!) to the JobQueue.
//
// 6. When the JobQueue calls this coroutine, this coroutine resumes
// from the line below the coro->yield() and returns the
// path-finding result.
//
// With so many moving parts, what could go wrong?
//
// Just in terms of the JobQueue refusing to add jobs at shutdown
// there are two specific things that can go wrong.
//
// 1. The path-finding Job queued by makeLegacyPathRequest() might be
// rejected (because we're shutting down).
//
// Fortunately this problem can be addressed by looking at the
// return value of makeLegacyPathRequest(). If
// makeLegacyPathRequest() cannot get a thread to run the path-find
// on, then it returns an empty request.
//
// 2. The path-finding job might run, but the Coro::post() might be
// rejected by the JobQueue (because we're shutting down).
//
// We handle this case by resuming (not posting) the Coro.
// By resuming the Coro, we allow the Coro to run to completion
// on the current thread instead of requiring that it run on a
// new thread from the JobQueue.
//
// Both of these failure modes are hard to recreate in a unit test
// because they are so dependent on inter-thread timing. However
// the failure modes can be observed by synchronously (inside the
// rippled source code) shutting down the application. The code to
// do so looks like this:
//
// context.app.signalStop();
// while (! context.app.getJobQueue().jobCounter().joined()) { }
//
// The first line starts the process of shutting down the app.
// The second line waits until no more jobs can be added to the
// JobQueue before letting the thread continue.
//
// May 2017
// makeLegacyPathRequest enqueues a path-finding job that runs
// asynchronously. We block this thread with a condition_variable
// until the path-finding continuation signals completion.
// If makeLegacyPathRequest cannot schedule the job (e.g. during
// shutdown), it returns an empty request and we skip the wait.
std::mutex mtx;
std::condition_variable cv;
bool pathDone = false;
jvResult = context.app.getPathRequests().makeLegacyPathRequest(
request,
[&context]() {
// Copying the shared_ptr keeps the coroutine alive up
// through the return. Otherwise the storage under the
// captured reference could evaporate when we return from
// coroCopy->resume(). This is not strictly necessary, but
// will make maintenance easier.
std::shared_ptr<JobQueue::Coro> coroCopy{context.coro};
if (!coroCopy->post())
[&]() {
{
// The post() failed, so we won't get a thread to let
// the Coro finish. We'll call Coro::resume() so the
// Coro can finish on our thread. Otherwise the
// application will hang on shutdown.
coroCopy->resume();
std::lock_guard lk(mtx);
pathDone = true;
}
cv.notify_one();
},
context.consumer,
lpLedger,
context.params);
if (request)
{
context.coro->yield();
std::unique_lock lk(mtx);
cv.wait(lk, [&] { return pathDone; });
jvResult = request->doStatus(context.params);
}