Compare commits

..

27 Commits

Author SHA1 Message Date
JCW
222b721c7c Merge remote-tracking branch 'origin/develop' into a1q123456/fix-job-queue-stop
Signed-off-by: JCW <a1q123456@users.noreply.github.com>

# Conflicts:
#	include/xrpl/core/Coro.ipp
#	src/libxrpl/core/detail/JobQueue.cpp
#	src/test/core/Coroutine_test.cpp
2026-01-22 16:53:51 +00:00
JCW
75e402ad7a Fix errors
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-11-25 14:03:11 +00:00
JCW
976fd8229b Address PR comments
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-11-21 16:53:41 +00:00
Bart
472bcf6b03 Merge branch 'develop' into a1q123456/fix-job-queue-stop 2025-10-23 11:43:35 -04:00
JCW
3e4cb67db9 Fix error
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-10-02 14:26:03 +01:00
JCW
2acee44c29 Fix error
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-10-02 12:28:29 +01:00
Jingchen
ae351b81b4 Merge branch 'develop' into a1q123456/fix-job-queue-stop 2025-10-02 11:51:29 +01:00
JCW
fbe4f7dd9f Fix formatting
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-10-02 11:48:56 +01:00
JCW
bce1520d4b Address PR comments
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-10-02 11:48:01 +01:00
Jingchen
1dab9323a0 Update src/xrpld/core/Coro.ipp
Co-authored-by: Bronek Kozicki <brok@incorrekt.com>
2025-10-02 10:39:54 +01:00
JCW
63ef46b676 Make the assertion in the destructor unconditional
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-09-26 13:24:58 +01:00
JCW
6f0767a99e Remove redundant block
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-09-26 13:22:50 +01:00
Bronek Kozicki
9a3a58d0f2 Merge branch 'develop' into a1q123456/fix-job-queue-stop 2025-09-26 12:19:19 +01:00
JCW
52439ebb2d Fix edge case
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-09-25 14:47:05 +01:00
Bronek Kozicki
622bb71cba Merge branch 'develop' into a1q123456/fix-job-queue-stop 2025-09-23 16:44:40 +01:00
JCW
528562792f Fix formatting
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-09-19 00:03:04 +01:00
JCW
34706ef0ac Fix test case bug
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-09-18 23:48:31 +01:00
JCW
ab52fde56e Fix multithreading bugs
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-09-18 23:00:35 +01:00
Jingchen
671436c033 Merge branch 'develop' into a1q123456/fix-job-queue-stop 2025-09-18 22:29:27 +01:00
JCW
5f3b3a6a1e Fix multithreading bugs
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-09-18 22:29:00 +01:00
Jingchen
a70e60e0d8 Merge branch 'develop' into a1q123456/fix-job-queue-stop 2025-09-18 09:45:32 +01:00
JCW
7fb8f5f751 Fix the bug
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-09-18 09:44:42 +01:00
JCW
6fd30ebde1 Fix formatting
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-09-16 16:19:32 +01:00
JCW
a1f6580e54 Fix the bug
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-09-16 16:14:43 +01:00
JCW
32a3f0a867 Fix formatting
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-09-08 14:53:58 +01:00
JCW
ed6dcdb10f Add unit test
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-09-08 14:53:58 +01:00
JCW
3adfa074bc Fix the assertion failure in JobQueue::stop when exiting but there's a suspended coroutine
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-09-08 14:53:58 +01:00
12 changed files with 300 additions and 111 deletions

View File

@@ -31,7 +31,7 @@ runs:
conan config install conan/profiles/ -tf $(conan config home)/profiles/
echo 'Conan profile:'
conan profile show --profile ci
conan profile show
- name: Set up Conan remote
shell: bash

View File

@@ -125,8 +125,6 @@ jobs:
subtract: ${{ inputs.nproc_subtract }}
- name: Setup Conan
env:
SANITIZERS: ${{ inputs.sanitizers }}
uses: ./.github/actions/setup-conan
- name: Build dependencies

View File

@@ -49,6 +49,10 @@ jobs:
id: version
uses: ./.github/actions/generate-version
- name: Determine recipe reference
id: ref
run: echo "ref=xrpl/${{ steps.version.outputs.version }}" >> "${GITHUB_OUTPUT}"
- name: Set up Conan
uses: ./.github/actions/setup-conan
with:
@@ -58,16 +62,17 @@ jobs:
- name: Log into Conan remote
env:
REMOTE_NAME: ${{ inputs.remote_name }}
REMOTE_USERNAME: ${{ secrets.remote_username }}
REMOTE_PASSWORD: ${{ secrets.remote_password }}
REMOTE_USERNAME: ${{ inputs.remote_username }}
REMOTE_PASSWORD: ${{ inputs.remote_password }}
run: conan remote login "${REMOTE_NAME}" "${REMOTE_USERNAME}" --password "${REMOTE_PASSWORD}"
- name: Upload Conan recipe
env:
RECIPE_REF: ${{ steps.ref.outputs.ref }}
REMOTE_NAME: ${{ inputs.remote_name }}
run: |
conan export . --version=${{ steps.version.outputs.version }}
conan upload --confirm --check --remote="${REMOTE_NAME}" xrpl/${{ steps.version.outputs.version }}
conan export .
conan upload --confirm --check --remote="${REMOTE_NAME}" ${RECIPE_REF}
outputs:
ref: xrpl/${{ steps.version.outputs.version }}
ref: ${{ steps.ref.outputs.ref }}

View File

@@ -84,8 +84,6 @@ jobs:
subtract: ${{ env.NPROC_SUBTRACT }}
- name: Setup Conan
env:
SANITIZERS: ${{ matrix.sanitizers }}
uses: ./.github/actions/setup-conan
with:
remote_name: ${{ env.CONAN_REMOTE_NAME }}
@@ -100,7 +98,6 @@ jobs:
# Set the verbosity to "quiet" for Windows to avoid an excessive
# amount of logs. For other OSes, the "verbose" logs are more useful.
log_verbosity: ${{ runner.os == 'Windows' && 'quiet' || 'verbose' }}
sanitizers: ${{ matrix.sanitizers }}
- name: Log into Conan remote
if: ${{ github.repository_owner == 'XRPLF' && (github.event_name == 'push' || github.event_name == 'workflow_dispatch') }}

View File

@@ -15,17 +15,21 @@ JobQueue::Coro::Coro(
: jq_(jq)
, type_(type)
, name_(name)
, running_(false)
, coro_(
[this, fn = std::forward<F>(f)](
boost::coroutines::asymmetric_coroutine<void>::push_type&
do_yield) {
yield_ = &do_yield;
yield();
fn(shared_from_this());
#ifndef NDEBUG
finished_ = true;
#endif
// self makes Coro alive until this function returns
std::shared_ptr<Coro> self;
if (!shouldStop())
{
self = shared_from_this();
fn(self);
}
state_ = CoroState::Finished;
cv_.notify_all();
},
boost::coroutines::attributes(megabytes(1)))
{
@@ -33,29 +37,58 @@ JobQueue::Coro::Coro(
inline JobQueue::Coro::~Coro()
{
#ifndef NDEBUG
XRPL_ASSERT(finished_, "xrpl::JobQueue::Coro::~Coro : is finished");
#endif
XRPL_ASSERT(
state_ != CoroState::Running,
"xrpl::JobQueue::Coro::~Coro : is not running");
exiting_ = true;
// Resume the coroutine so that it has a chance to clean things up
if (state_ == CoroState::Suspended)
{
resume();
}
XRPL_ASSERT(
state_ == CoroState::Finished,
"xrpl::JobQueue::Coro::~Coro : is finished");
}
inline void
JobQueue::Coro::yield() const
inline bool
JobQueue::Coro::yield()
{
{
std::lock_guard lock(jq_.m_mutex);
if (shouldStop())
return false;
state_ = CoroState::Suspended;
cv_.notify_all();
++jq_.nSuspend_;
jq_.m_suspendedCoros[this] = weak_from_this();
jq_.cv_.notify_all();
}
(*yield_)();
return true;
}
inline bool
JobQueue::Coro::post()
{
if (state_ == CoroState::Finished)
{
std::lock_guard lk(mutex_run_);
running_ = true;
// The coroutine will run until it finishes if the JobQueue has stopped.
// In the case where make_shared<Coro>() succeeds and then the JobQueue
// stops before coro_ gets executed, post() will still be called and
// state_ will be Finished. We should return false and avoid XRPL_ASSERT
// as it's a valid edge case.
return false;
}
XRPL_ASSERT(
state_ == CoroState::Suspended,
"ripple::JobQueue::Coro::post : should be suspended");
// sp keeps 'this' alive
if (jq_.addJob(
type_, name_, [this, sp = shared_from_this()]() { resume(); }))
@@ -63,9 +96,6 @@ JobQueue::Coro::post()
return true;
}
// The coroutine will not run. Clean up running_.
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
return false;
}
@@ -73,13 +103,18 @@ JobQueue::Coro::post()
inline void
JobQueue::Coro::resume()
{
auto suspended = CoroState::Suspended;
if (!state_.compare_exchange_strong(suspended, CoroState::Running))
{
std::lock_guard lk(mutex_run_);
running_ = true;
return;
}
cv_.notify_all();
{
std::lock_guard lock(jq_.m_mutex);
jq_.m_suspendedCoros.erase(this);
--jq_.nSuspend_;
jq_.cv_.notify_all();
}
auto saved = detail::getLocalValues().release();
detail::getLocalValues().reset(&lvs_);
@@ -89,43 +124,24 @@ JobQueue::Coro::resume()
coro_();
detail::getLocalValues().release();
detail::getLocalValues().reset(saved);
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
}
inline bool
JobQueue::Coro::runnable() const
{
return static_cast<bool>(coro_);
}
inline void
JobQueue::Coro::expectEarlyExit()
{
#ifndef NDEBUG
if (!finished_)
#endif
{
// expectEarlyExit() must only ever be called from outside the
// Coro's stack. It you're inside the stack you can simply return
// and be done.
//
// That said, since we're outside the Coro's stack, we need to
// decrement the nSuspend that the Coro's call to yield caused.
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
#ifndef NDEBUG
finished_ = true;
#endif
}
// There's an edge case where the coroutine has updated the status
// to Finished but the function hasn't exited and therefore, coro_ is
// still valid. However, the coroutine is not technically runnable in this
// case, because the coroutine is about to exit and static_cast<bool>(coro_)
// is going to be false.
return static_cast<bool>(coro_) && state_ != CoroState::Finished;
}
inline void
JobQueue::Coro::join()
{
std::unique_lock<std::mutex> lk(mutex_run_);
cv_.wait(lk, [this]() { return running_ == false; });
cv_.wait(lk, [this]() { return state_ != CoroState::Running; });
}
} // namespace xrpl

View File

@@ -37,23 +37,28 @@ struct Coro_create_t
class JobQueue : private Workers::Callback
{
public:
enum class QueueState { Accepting, Stopping, Stopped };
/** Coroutines must run to completion. */
class Coro : public std::enable_shared_from_this<Coro>
{
friend class JobQueue;
public:
enum class CoroState { None, Suspended, Running, Finished };
private:
std::atomic_bool exiting_ = false;
detail::LocalValues lvs_;
JobQueue& jq_;
JobType type_;
std::string name_;
bool running_;
std::atomic<CoroState> state_ = CoroState::None;
std::mutex mutex_;
std::mutex mutex_run_;
std::condition_variable cv_;
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
#ifndef NDEBUG
bool finished_ = false;
#endif
public:
// Private: Used in the implementation
@@ -74,10 +79,12 @@ public:
Note:
The associated Job function returns.
Undefined behavior if called consecutively without a corresponding
post.
post.
It may not suspend at all if the JobQueue is stopping, and returns
false in such a case.
*/
void
yield() const;
bool
yield();
/** Schedule coroutine execution.
Effects:
@@ -107,17 +114,23 @@ public:
void
resume();
CoroState
state() const
{
return state_;
}
/** Returns true if the Coro is still runnable (has not returned). */
bool
runnable() const;
/** Once called, the Coro allows early exit without an assert. */
void
expectEarlyExit();
/** Waits until coroutine returns from the user function. */
void
join();
/** Returns true if the coroutine should stop executing */
[[nodiscard]] bool
shouldStop() const;
};
using JobFunction = std::function<void()>;
@@ -139,20 +152,17 @@ public:
@return true if jobHandler added to queue.
*/
template <
typename JobHandler,
typename = std::enable_if_t<std::is_same<
decltype(std::declval<JobHandler&&>()()),
void>::value>>
template <typename JobHandler>
bool
addJob(JobType type, std::string const& name, JobHandler&& jobHandler)
requires std::is_void_v<std::invoke_result_t<JobHandler>>
{
if (auto optionalCountedJob =
jobCounter_.wrap(std::forward<JobHandler>(jobHandler)))
if (queueState_ != QueueState::Accepting)
{
return addRefCountedJob(type, name, std::move(*optionalCountedJob));
return false;
}
return false;
return addJobNoStatusCheck(
type, name, std::forward<JobHandler>(jobHandler));
}
/** Creates a coroutine and adds a job to the queue which will run it.
@@ -211,13 +221,16 @@ public:
bool
isStopping() const
{
return stopping_;
return queueState_ == QueueState::Stopping;
}
// We may be able to move away from this, but we can keep it during the
// transition.
bool
isStopped() const;
isStopped() const
{
return queueState_ == QueueState::Stopped;
}
private:
friend class Coro;
@@ -229,8 +242,7 @@ private:
std::uint64_t m_lastJob;
std::set<Job> m_jobSet;
JobCounter jobCounter_;
std::atomic_bool stopping_{false};
std::atomic_bool stopped_{false};
std::atomic<QueueState> queueState_{QueueState::Accepting};
JobDataMap m_jobData;
JobTypeData m_invalidJobData;
@@ -240,6 +252,8 @@ private:
// The number of suspended coroutines
int nSuspend_ = 0;
std::map<void*, std::weak_ptr<Coro>> m_suspendedCoros;
Workers m_workers;
// Statistics tracking
@@ -255,6 +269,22 @@ private:
JobTypeData&
getJobTypeData(JobType type);
template <typename JobHandler>
bool
addJobNoStatusCheck(
JobType type,
std::string const& name,
JobHandler&& jobHandler)
requires std::is_void_v<std::invoke_result_t<JobHandler>>
{
if (auto optionalCountedJob =
jobCounter_.wrap(std::forward<JobHandler>(jobHandler)))
{
return addRefCountedJob(type, name, std::move(*optionalCountedJob));
}
return false;
}
// Adds a reference counted job to the JobQueue.
//
// param type The type of job.
@@ -392,6 +422,10 @@ template <class F>
std::shared_ptr<JobQueue::Coro>
JobQueue::postCoro(JobType t, std::string const& name, F&& f)
{
if (queueState_ != QueueState::Accepting)
{
return nullptr;
}
/* First param is a detail type to make construction private.
Last param is the function the coroutine runs. Signature of
void(std::shared_ptr<Coro>).
@@ -402,7 +436,6 @@ JobQueue::postCoro(JobType t, std::string const& name, F&& f)
{
// The Coro was not successfully posted. Disable it so it's destructor
// can run with no negative side effects. Then destroy it.
coro->expectEarlyExit();
coro.reset();
}
return coro;

View File

@@ -6,6 +6,12 @@
namespace xrpl {
bool
JobQueue::Coro::shouldStop() const
{
return jq_.queueState_ != QueueState::Accepting || exiting_;
}
JobQueue::JobQueue(
int threadCount,
beast::insight::Collector::ptr const& collector,
@@ -274,7 +280,45 @@ JobQueue::getJobTypeData(JobType type)
void
JobQueue::stop()
{
stopping_ = true;
// Once we stop accepting new jobs, all running coroutines won't be able to
// get suspended and yield() will return immediately, so we can safely
// move m_suspendedCoros, and we can assume that no coroutine will be
// suspended in the future.
if (queueState_ == QueueState::Stopped)
{
return;
}
auto accepting = QueueState::Accepting;
if (!queueState_.compare_exchange_strong(accepting, QueueState::Stopping))
{
XRPL_ASSERT(
false, "Incorrect queueState, should be accepting but not!");
}
std::map<void*, std::weak_ptr<Coro>> suspendedCoros;
{
std::unique_lock lock(m_mutex);
suspendedCoros = std::move(m_suspendedCoros);
}
if (!suspendedCoros.empty())
{
// We should resume the suspended coroutines so that the coroutines
// get a chance to exit cleanly.
for (auto& [_, coro] : suspendedCoros)
{
if (auto coroPtr = coro.lock())
{
// We don't allow any new jobs from outside when we are
// stopping, but we should allow new jobs from inside the class.
addJobNoStatusCheck(
coroPtr->type_, coroPtr->name_, [coroPtr]() {
coroPtr->resume();
});
}
}
}
using namespace std::chrono_literals;
jobCounter_.join("JobQueue", 1s, m_journal);
{
@@ -284,8 +328,9 @@ JobQueue::stop()
// `Job::doJob` and the return of `JobQueue::processTask`. That is why
// we must wait on the condition variable to make these assertions.
std::unique_lock<std::mutex> lock(m_mutex);
cv_.wait(
lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
cv_.wait(lock, [this] {
return m_processCount == 0 && nSuspend_ == 0 && m_jobSet.empty();
});
XRPL_ASSERT(
m_processCount == 0,
"xrpl::JobQueue::stop : all processes completed");
@@ -293,14 +338,12 @@ JobQueue::stop()
m_jobSet.empty(), "xrpl::JobQueue::stop : all jobs completed");
XRPL_ASSERT(
nSuspend_ == 0, "xrpl::JobQueue::stop : no coros suspended");
stopped_ = true;
}
}
bool
JobQueue::isStopped() const
{
return stopped_;
auto stopping = QueueState::Stopping;
if (!queueState_.compare_exchange_strong(stopping, QueueState::Stopped))
{
XRPL_ASSERT(false, "Incorrect queueState, should be stopping but not!");
}
}
void

View File

@@ -82,12 +82,21 @@ public:
}));
gate g;
env.app().getJobQueue().postCoro(
gate gStart;
auto coro = env.app().getJobQueue().postCoro(
jtCLIENT, "CoroTest", [&](auto const& c) {
c->post();
gStart.signal();
c->yield();
g.signal();
});
// Wait for the coroutine to start.
BEAST_EXPECT(gStart.wait_for(5s));
BEAST_EXPECT(coro->state() == JobQueue::Coro::CoroState::Suspended);
// Post the coroutine.
coro->post();
BEAST_EXPECT(g.wait_for(5s));
}
@@ -156,12 +165,78 @@ public:
BEAST_EXPECT(*lv == -1);
}
void
stopJobQueueWhenCoroutineSuspended()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("Stop JobQueue when a coroutine is suspended");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
bool started = false;
bool finished = false;
std::optional<bool> shouldStop;
std::condition_variable cv;
std::mutex m;
std::unique_lock<std::mutex> lk(m);
auto coro = env.app().getJobQueue().postCoro(
jtCLIENT, "Coroutine-Test", [&](auto const& c) {
started = true;
cv.notify_all();
c->yield();
finished = true;
shouldStop = c->shouldStop();
cv.notify_all();
});
cv.wait_for(lk, 5s, [&]() { return started; });
env.app().getJobQueue().stop();
cv.wait_for(lk, 5s, [&]() { return finished; });
BEAST_EXPECT(finished);
BEAST_EXPECT(shouldStop.has_value() && *shouldStop == true);
}
void
coroutineGetsDestroyedBeforeExecuting()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("Coroutine gets destroyed before executing");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
{
auto coro = std::make_shared<JobQueue::Coro>(
Coro_create_t{},
env.app().getJobQueue(),
JobType::jtCLIENT,
"test",
[](auto coro) {
});
}
pass();
}
void
run() override
{
correct_order();
incorrect_order();
thread_specific_storage();
stopJobQueueWhenCoroutineSuspended();
coroutineGetsDestroyedBeforeExecuting();
}
};

View File

@@ -67,6 +67,8 @@ class JobQueue_test : public beast::unit_test::suite
while (yieldCount == 0)
;
coro->join();
// Now re-post until the Coro says it is done.
int old = yieldCount;
while (coro->runnable())
@@ -98,6 +100,9 @@ class JobQueue_test : public beast::unit_test::suite
return;
}
while (yieldCount == 0)
; // We should wait for the job to start and yield
// Wait for the Job to run and yield.
coro->join();

View File

@@ -15,9 +15,16 @@
#include <xrpl/protocol/Units.h>
#include <xrpl/protocol/jss.h>
#include <source_location>
#include <vector>
#if (defined(__clang_major__) && __clang_major__ < 15)
#include <experimental/source_location>
using source_location = std::experimental::source_location;
#else
#include <source_location>
using std::source_location;
#endif
namespace xrpl {
namespace test {
namespace jtx {
@@ -633,7 +640,7 @@ checkMetrics(
std::size_t expectedPerLedger,
std::uint64_t expectedMinFeeLevel = baseFeeLevel.fee(),
std::uint64_t expectedMedFeeLevel = minEscalationFeeLevel.fee(),
std::source_location const location = std::source_location::current())
source_location const location = source_location::current())
{
int line = location.line();
char const* file = location.file_name();

View File

@@ -14,7 +14,13 @@
#include <xrpl/protocol/STXChainBridge.h>
#include <xrpl/protocol/jss.h>
#if (defined(__clang_major__) && __clang_major__ < 15)
#include <experimental/source_location>
using source_location = std::experimental::source_location;
#else
#include <source_location>
using std::source_location;
#endif
namespace xrpl {
namespace test {
@@ -108,7 +114,7 @@ class LedgerEntry_test : public beast::unit_test::suite
Json::Value const& jv,
std::string const& err,
std::string const& msg,
std::source_location const location = std::source_location::current())
source_location const location = source_location::current())
{
if (BEAST_EXPECT(jv.isMember(jss::status)))
BEAST_EXPECTS(
@@ -291,7 +297,7 @@ class LedgerEntry_test : public beast::unit_test::suite
FieldType const typeID,
std::string const& expectedError,
bool required = true,
std::source_location const location = std::source_location::current())
source_location const location = source_location::current())
{
forAllApiVersions([&, this](unsigned apiVersion) {
if (required)
@@ -344,7 +350,7 @@ class LedgerEntry_test : public beast::unit_test::suite
FieldType typeID,
std::string const& expectedError,
bool required = true,
std::source_location const location = std::source_location::current())
source_location const location = source_location::current())
{
forAllApiVersions([&, this](unsigned apiVersion) {
if (required)
@@ -401,7 +407,7 @@ class LedgerEntry_test : public beast::unit_test::suite
runLedgerEntryTest(
test::jtx::Env& env,
Json::StaticString const& parentField,
std::source_location const location = std::source_location::current())
source_location const location = source_location::current())
{
testMalformedField(
env,
@@ -425,7 +431,7 @@ class LedgerEntry_test : public beast::unit_test::suite
test::jtx::Env& env,
Json::StaticString const& parentField,
std::vector<Subfield> const& subfields,
std::source_location const location = std::source_location::current())
source_location const location = source_location::current())
{
testMalformedField(
env,

View File

@@ -109,21 +109,17 @@ doRipplePathFind(RPC::JsonContext& context)
// May 2017
jvResult = context.app.getPathRequests().makeLegacyPathRequest(
request,
[&context]() {
// Copying the shared_ptr keeps the coroutine alive up
[coro = context.coro]() {
// Capturing the shared_ptr keeps the coroutine alive up
// through the return. Otherwise the storage under the
// captured reference could evaporate when we return from
// coroCopy->resume(). This is not strictly necessary, but
// will make maintenance easier.
std::shared_ptr<JobQueue::Coro> coroCopy{context.coro};
if (!coroCopy->post())
{
// The post() failed, so we won't get a thread to let
// the Coro finish. We'll call Coro::resume() so the
// Coro can finish on our thread. Otherwise the
// application will hang on shutdown.
coroCopy->resume();
}
// coro->post().
// When post() failed, we won't get a thread to let
// the Coro finish. We should ignore the coroutine and
// let it destruct, as the JobQueu has been signaled to
// close, and resuming it manually messes up the internal
// state in JobQueue.
coro->post();
},
context.consumer,
lpLedger,
@@ -131,6 +127,14 @@ doRipplePathFind(RPC::JsonContext& context)
if (request)
{
context.coro->yield();
// Each time after we resume from yield(), we should
// check if cancellation has been requested. It would
// be a lot more elegant if we replace boost coroutine
// with c++ standard coroutine.
if (context.coro->shouldStop())
{
return jvResult;
}
jvResult = request->doStatus(context.params);
}