Address PR comments

Signed-off-by: JCW <a1q123456@users.noreply.github.com>
This commit is contained in:
JCW
2025-10-02 11:47:42 +01:00
parent 1dab9323a0
commit bce1520d4b
3 changed files with 59 additions and 54 deletions

View File

@@ -120,14 +120,13 @@ JobQueue::Coro::post()
inline void
JobQueue::Coro::resume()
{
auto suspended = CoroState::Suspended;
if (!state_.compare_exchange_strong(suspended, CoroState::Running))
{
auto suspended = CoroState::Suspended;
if (!state_.compare_exchange_strong(suspended, CoroState::Running))
{
return;
}
cv_.notify_all();
return;
}
cv_.notify_all();
{
std::lock_guard lock(jq_.m_mutex);
jq_.m_suspendedCoros.erase(this);

View File

@@ -57,6 +57,13 @@ struct Coro_create_t
class JobQueue : private Workers::Callback
{
public:
enum class QueueState
{
Accepting,
Stopping,
Stopped
};
/** Coroutines must run to completion. */
class Coro : public std::enable_shared_from_this<Coro>
{
@@ -168,24 +175,16 @@ public:
@return true if jobHandler added to queue.
*/
template <
typename JobHandler,
typename = std::enable_if_t<std::is_same<
decltype(std::declval<JobHandler&&>()()),
void>::value>>
template <typename JobHandler>
bool
addJob(JobType type, std::string const& name, JobHandler&& jobHandler)
requires std::is_void_v<std::invoke_result_t<JobHandler>>
{
if (!accepting_)
if (queueState_ != QueueState::Accepting)
{
return false;
}
if (auto optionalCountedJob =
jobCounter_.wrap(std::forward<JobHandler>(jobHandler)))
{
return addRefCountedJob(type, name, std::move(*optionalCountedJob));
}
return false;
return addJobNoStatusCheck(type, name, std::forward<JobHandler>(jobHandler));
}
/** Creates a coroutine and adds a job to the queue which will run it.
@@ -244,13 +243,16 @@ public:
bool
isStopping() const
{
return stopping_;
return queueState_ == QueueState::Stopping;
}
// We may be able to move away from this, but we can keep it during the
// transition.
bool
isStopped() const;
isStopped() const
{
return queueState_ == QueueState::Stopped;
}
private:
friend class Coro;
@@ -262,9 +264,7 @@ private:
std::uint64_t m_lastJob;
std::set<Job> m_jobSet;
JobCounter jobCounter_;
std::atomic_bool accepting_ = true;
std::atomic_bool stopping_{false};
std::atomic_bool stopped_{false};
std::atomic<QueueState> queueState_{QueueState::Accepting};
JobDataMap m_jobData;
JobTypeData m_invalidJobData;
@@ -286,30 +286,24 @@ private:
std::condition_variable cv_;
void
onStopResumeCoros(std::map<void*, std::weak_ptr<Coro>>& coros)
{
for (auto& [_, coro] : coros)
{
if (auto coroPtr = coro.lock())
{
if (auto optionalCountedJob =
jobCounter_.wrap([=]() { coroPtr->resume(); }))
{
addRefCountedJob(
coroPtr->type_,
coroPtr->name_,
std::move(*optionalCountedJob));
}
}
}
}
void
collect();
JobTypeData&
getJobTypeData(JobType type);
template <typename JobHandler>
bool
addJobNoStatusCheck(JobType type, std::string const& name, JobHandler&& jobHandler)
requires std::is_void_v<std::invoke_result_t<JobHandler>>
{
if (auto optionalCountedJob =
jobCounter_.wrap(std::forward<JobHandler>(jobHandler)))
{
return addRefCountedJob(type, name, std::move(*optionalCountedJob));
}
return false;
}
// Adds a reference counted job to the JobQueue.
//
// param type The type of job.
@@ -447,7 +441,7 @@ template <class F>
std::shared_ptr<JobQueue::Coro>
JobQueue::postCoro(JobType t, std::string const& name, F&& f)
{
if (!accepting_)
if (queueState_ != QueueState::Accepting)
{
return nullptr;
}

View File

@@ -29,7 +29,7 @@ namespace ripple {
bool
JobQueue::Coro::shouldStop() const
{
return jq_.stopping_ || jq_.stopped_ || !jq_.accepting_ || exiting_;
return jq_.queueState_ != QueueState::Accepting || exiting_;
}
JobQueue::JobQueue(
@@ -305,19 +305,33 @@ JobQueue::stop()
// get suspended and yield() will return immediately, so we can safely
// move m_suspendedCoros, and we can assume that no coroutine will be
// suspended in the future.
auto accepting = QueueState::Accepting;
if (!queueState_.compare_exchange_strong(accepting, QueueState::Stopping))
{
XRPL_ASSERT(false, "Incorrect queueState, should be accepting but not!");
}
std::map<void*, std::weak_ptr<Coro>> suspendedCoros;
{
std::unique_lock lock(m_mutex);
accepting_ = false;
suspendedCoros = std::move(m_suspendedCoros);
}
if (!suspendedCoros.empty())
{
// We should resume the suspended coroutines so that the coroutines
// get a chance to exit cleanly.
onStopResumeCoros(suspendedCoros);
for (auto& [_, coro] : suspendedCoros)
{
if (auto coroPtr = coro.lock())
{
// We don't allow any new jobs from outside when we are
// stopping, but we should allow new jobs from inside the class.
addJobNoStatusCheck(coroPtr->type_, coroPtr->name_, [coroPtr]() { coroPtr->resume(); });
}
}
}
stopping_ = true;
using namespace std::chrono_literals;
jobCounter_.join("JobQueue", 1s, m_journal);
{
@@ -337,14 +351,12 @@ JobQueue::stop()
m_jobSet.empty(), "ripple::JobQueue::stop : all jobs completed");
XRPL_ASSERT(
nSuspend_ == 0, "ripple::JobQueue::stop : no coros suspended");
stopped_ = true;
}
}
bool
JobQueue::isStopped() const
{
return stopped_;
auto stopping = QueueState::Stopping;
if (queueState_.compare_exchange_strong(stopping, QueueState::Stopped))
{
XRPL_ASSERT(false, "Incorrect queueState, should be stopping but not!");
}
}
void