mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-20 11:05:54 +00:00
Compare commits
24 Commits
pratik/Ret
...
a1q123456/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
472bcf6b03 | ||
|
|
3e4cb67db9 | ||
|
|
2acee44c29 | ||
|
|
ae351b81b4 | ||
|
|
fbe4f7dd9f | ||
|
|
bce1520d4b | ||
|
|
1dab9323a0 | ||
|
|
63ef46b676 | ||
|
|
6f0767a99e | ||
|
|
9a3a58d0f2 | ||
|
|
52439ebb2d | ||
|
|
622bb71cba | ||
|
|
528562792f | ||
|
|
34706ef0ac | ||
|
|
ab52fde56e | ||
|
|
671436c033 | ||
|
|
5f3b3a6a1e | ||
|
|
a70e60e0d8 | ||
|
|
7fb8f5f751 | ||
|
|
6fd30ebde1 | ||
|
|
a1f6580e54 | ||
|
|
32a3f0a867 | ||
|
|
ed6dcdb10f | ||
|
|
3adfa074bc |
@@ -101,12 +101,21 @@ public:
|
|||||||
}));
|
}));
|
||||||
|
|
||||||
gate g;
|
gate g;
|
||||||
env.app().getJobQueue().postCoro(
|
gate gStart;
|
||||||
|
auto coro = env.app().getJobQueue().postCoro(
|
||||||
jtCLIENT, "Coroutine-Test", [&](auto const& c) {
|
jtCLIENT, "Coroutine-Test", [&](auto const& c) {
|
||||||
c->post();
|
gStart.signal();
|
||||||
c->yield();
|
c->yield();
|
||||||
g.signal();
|
g.signal();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Wait for the coroutine to start.
|
||||||
|
BEAST_EXPECT(gStart.wait_for(5s));
|
||||||
|
|
||||||
|
BEAST_EXPECT(coro->state() == JobQueue::Coro::CoroState::Suspended);
|
||||||
|
// Post the coroutine.
|
||||||
|
coro->post();
|
||||||
|
|
||||||
BEAST_EXPECT(g.wait_for(5s));
|
BEAST_EXPECT(g.wait_for(5s));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -175,12 +184,78 @@ public:
|
|||||||
BEAST_EXPECT(*lv == -1);
|
BEAST_EXPECT(*lv == -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
stopJobQueueWhenCoroutineSuspended()
|
||||||
|
{
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
using namespace jtx;
|
||||||
|
|
||||||
|
testcase("Stop JobQueue when a coroutine is suspended");
|
||||||
|
|
||||||
|
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||||
|
cfg->FORCE_MULTI_THREAD = true;
|
||||||
|
return cfg;
|
||||||
|
}));
|
||||||
|
|
||||||
|
bool started = false;
|
||||||
|
bool finished = false;
|
||||||
|
std::optional<bool> shouldStop;
|
||||||
|
std::condition_variable cv;
|
||||||
|
std::mutex m;
|
||||||
|
std::unique_lock<std::mutex> lk(m);
|
||||||
|
auto coro = env.app().getJobQueue().postCoro(
|
||||||
|
jtCLIENT, "Coroutine-Test", [&](auto const& c) {
|
||||||
|
started = true;
|
||||||
|
cv.notify_all();
|
||||||
|
c->yield();
|
||||||
|
finished = true;
|
||||||
|
shouldStop = c->shouldStop();
|
||||||
|
cv.notify_all();
|
||||||
|
});
|
||||||
|
|
||||||
|
cv.wait_for(lk, 5s, [&]() { return started; });
|
||||||
|
env.app().getJobQueue().stop();
|
||||||
|
|
||||||
|
cv.wait_for(lk, 5s, [&]() { return finished; });
|
||||||
|
BEAST_EXPECT(finished);
|
||||||
|
BEAST_EXPECT(shouldStop.has_value() && *shouldStop == true);
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
coroutineGetsDestroyedBeforeExecuting()
|
||||||
|
{
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
using namespace jtx;
|
||||||
|
|
||||||
|
testcase("Coroutine gets destroyed before executing");
|
||||||
|
|
||||||
|
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||||
|
cfg->FORCE_MULTI_THREAD = true;
|
||||||
|
return cfg;
|
||||||
|
}));
|
||||||
|
|
||||||
|
{
|
||||||
|
auto coro = std::make_shared<JobQueue::Coro>(
|
||||||
|
Coro_create_t{},
|
||||||
|
env.app().getJobQueue(),
|
||||||
|
JobType::jtCLIENT,
|
||||||
|
"test",
|
||||||
|
[](auto coro) {
|
||||||
|
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
pass();
|
||||||
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
run() override
|
run() override
|
||||||
{
|
{
|
||||||
correct_order();
|
correct_order();
|
||||||
incorrect_order();
|
incorrect_order();
|
||||||
thread_specific_storage();
|
thread_specific_storage();
|
||||||
|
stopJobQueueWhenCoroutineSuspended();
|
||||||
|
coroutineGetsDestroyedBeforeExecuting();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -87,6 +87,8 @@ class JobQueue_test : public beast::unit_test::suite
|
|||||||
while (yieldCount == 0)
|
while (yieldCount == 0)
|
||||||
;
|
;
|
||||||
|
|
||||||
|
coro->join();
|
||||||
|
|
||||||
// Now re-post until the Coro says it is done.
|
// Now re-post until the Coro says it is done.
|
||||||
int old = yieldCount;
|
int old = yieldCount;
|
||||||
while (coro->runnable())
|
while (coro->runnable())
|
||||||
@@ -118,6 +120,9 @@ class JobQueue_test : public beast::unit_test::suite
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
while (yieldCount == 0)
|
||||||
|
; // We should wait for the job to start and yield
|
||||||
|
|
||||||
// Wait for the Job to run and yield.
|
// Wait for the Job to run and yield.
|
||||||
coro->join();
|
coro->join();
|
||||||
|
|
||||||
|
|||||||
@@ -34,17 +34,21 @@ JobQueue::Coro::Coro(
|
|||||||
: jq_(jq)
|
: jq_(jq)
|
||||||
, type_(type)
|
, type_(type)
|
||||||
, name_(name)
|
, name_(name)
|
||||||
, running_(false)
|
|
||||||
, coro_(
|
, coro_(
|
||||||
[this, fn = std::forward<F>(f)](
|
[this, fn = std::forward<F>(f)](
|
||||||
boost::coroutines::asymmetric_coroutine<void>::push_type&
|
boost::coroutines::asymmetric_coroutine<void>::push_type&
|
||||||
do_yield) {
|
do_yield) {
|
||||||
yield_ = &do_yield;
|
yield_ = &do_yield;
|
||||||
yield();
|
yield();
|
||||||
fn(shared_from_this());
|
// self makes Coro alive until this function returns
|
||||||
#ifndef NDEBUG
|
std::shared_ptr<Coro> self;
|
||||||
finished_ = true;
|
if (!shouldStop())
|
||||||
#endif
|
{
|
||||||
|
self = shared_from_this();
|
||||||
|
fn(self);
|
||||||
|
}
|
||||||
|
state_ = CoroState::Finished;
|
||||||
|
cv_.notify_all();
|
||||||
},
|
},
|
||||||
boost::coroutines::attributes(megabytes(1)))
|
boost::coroutines::attributes(megabytes(1)))
|
||||||
{
|
{
|
||||||
@@ -52,17 +56,35 @@ JobQueue::Coro::Coro(
|
|||||||
|
|
||||||
inline JobQueue::Coro::~Coro()
|
inline JobQueue::Coro::~Coro()
|
||||||
{
|
{
|
||||||
#ifndef NDEBUG
|
XRPL_ASSERT(
|
||||||
XRPL_ASSERT(finished_, "ripple::JobQueue::Coro::~Coro : is finished");
|
state_ != CoroState::Running,
|
||||||
#endif
|
"ripple::JobQueue::Coro::~Coro : is not running");
|
||||||
|
exiting_ = true;
|
||||||
|
// Resume the coroutine so that it has a chance to clean things up
|
||||||
|
if (state_ == CoroState::Suspended)
|
||||||
|
{
|
||||||
|
resume();
|
||||||
|
}
|
||||||
|
|
||||||
|
XRPL_ASSERT(
|
||||||
|
state_ == CoroState::Finished,
|
||||||
|
"ripple::JobQueue::Coro::~Coro : is finished");
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void
|
inline void
|
||||||
JobQueue::Coro::yield() const
|
JobQueue::Coro::yield()
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
std::lock_guard lock(jq_.m_mutex);
|
std::lock_guard lock(jq_.m_mutex);
|
||||||
|
if (shouldStop())
|
||||||
|
return;
|
||||||
|
|
||||||
|
state_ = CoroState::Suspended;
|
||||||
|
cv_.notify_all();
|
||||||
|
|
||||||
++jq_.nSuspend_;
|
++jq_.nSuspend_;
|
||||||
|
jq_.m_suspendedCoros[this] = weak_from_this();
|
||||||
|
jq_.cv_.notify_all();
|
||||||
}
|
}
|
||||||
(*yield_)();
|
(*yield_)();
|
||||||
}
|
}
|
||||||
@@ -70,11 +92,20 @@ JobQueue::Coro::yield() const
|
|||||||
inline bool
|
inline bool
|
||||||
JobQueue::Coro::post()
|
JobQueue::Coro::post()
|
||||||
{
|
{
|
||||||
|
if (state_ == CoroState::Finished)
|
||||||
{
|
{
|
||||||
std::lock_guard lk(mutex_run_);
|
// The coroutine will run until it finishes if the JobQueue has stopped.
|
||||||
running_ = true;
|
// In the case where make_shared<Coro>() succeeds and then the JobQueue
|
||||||
|
// stops before coro_ gets executed, post() will still be called and
|
||||||
|
// state_ will be Finished. We should return false and avoid XRPL_ASSERT
|
||||||
|
// as it's a valid edge case.
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
XRPL_ASSERT(
|
||||||
|
state_ == CoroState::Suspended,
|
||||||
|
"ripple::JobQueue::Coro::post : should be suspended");
|
||||||
|
|
||||||
// sp keeps 'this' alive
|
// sp keeps 'this' alive
|
||||||
if (jq_.addJob(
|
if (jq_.addJob(
|
||||||
type_, name_, [this, sp = shared_from_this()]() { resume(); }))
|
type_, name_, [this, sp = shared_from_this()]() { resume(); }))
|
||||||
@@ -82,9 +113,6 @@ JobQueue::Coro::post()
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// The coroutine will not run. Clean up running_.
|
|
||||||
std::lock_guard lk(mutex_run_);
|
|
||||||
running_ = false;
|
|
||||||
cv_.notify_all();
|
cv_.notify_all();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@@ -92,13 +120,18 @@ JobQueue::Coro::post()
|
|||||||
inline void
|
inline void
|
||||||
JobQueue::Coro::resume()
|
JobQueue::Coro::resume()
|
||||||
{
|
{
|
||||||
|
auto suspended = CoroState::Suspended;
|
||||||
|
if (!state_.compare_exchange_strong(suspended, CoroState::Running))
|
||||||
{
|
{
|
||||||
std::lock_guard lk(mutex_run_);
|
return;
|
||||||
running_ = true;
|
|
||||||
}
|
}
|
||||||
|
cv_.notify_all();
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard lock(jq_.m_mutex);
|
std::lock_guard lock(jq_.m_mutex);
|
||||||
|
jq_.m_suspendedCoros.erase(this);
|
||||||
--jq_.nSuspend_;
|
--jq_.nSuspend_;
|
||||||
|
jq_.cv_.notify_all();
|
||||||
}
|
}
|
||||||
auto saved = detail::getLocalValues().release();
|
auto saved = detail::getLocalValues().release();
|
||||||
detail::getLocalValues().reset(&lvs_);
|
detail::getLocalValues().reset(&lvs_);
|
||||||
@@ -109,43 +142,24 @@ JobQueue::Coro::resume()
|
|||||||
coro_();
|
coro_();
|
||||||
detail::getLocalValues().release();
|
detail::getLocalValues().release();
|
||||||
detail::getLocalValues().reset(saved);
|
detail::getLocalValues().reset(saved);
|
||||||
std::lock_guard lk(mutex_run_);
|
|
||||||
running_ = false;
|
|
||||||
cv_.notify_all();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
inline bool
|
inline bool
|
||||||
JobQueue::Coro::runnable() const
|
JobQueue::Coro::runnable() const
|
||||||
{
|
{
|
||||||
return static_cast<bool>(coro_);
|
// There's an edge case where the coroutine has updated the status
|
||||||
}
|
// to Finished but the function hasn't exited and therefore, coro_ is
|
||||||
|
// still valid. However, the coroutine is not technically runnable in this
|
||||||
inline void
|
// case, because the coroutine is about to exit and static_cast<bool>(coro_)
|
||||||
JobQueue::Coro::expectEarlyExit()
|
// is going to be false.
|
||||||
{
|
return static_cast<bool>(coro_) && state_ != CoroState::Finished;
|
||||||
#ifndef NDEBUG
|
|
||||||
if (!finished_)
|
|
||||||
#endif
|
|
||||||
{
|
|
||||||
// expectEarlyExit() must only ever be called from outside the
|
|
||||||
// Coro's stack. It you're inside the stack you can simply return
|
|
||||||
// and be done.
|
|
||||||
//
|
|
||||||
// That said, since we're outside the Coro's stack, we need to
|
|
||||||
// decrement the nSuspend that the Coro's call to yield caused.
|
|
||||||
std::lock_guard lock(jq_.m_mutex);
|
|
||||||
--jq_.nSuspend_;
|
|
||||||
#ifndef NDEBUG
|
|
||||||
finished_ = true;
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void
|
inline void
|
||||||
JobQueue::Coro::join()
|
JobQueue::Coro::join()
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> lk(mutex_run_);
|
std::unique_lock<std::mutex> lk(mutex_run_);
|
||||||
cv_.wait(lk, [this]() { return running_ == false; });
|
cv_.wait(lk, [this]() { return state_ != CoroState::Running; });
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace ripple
|
} // namespace ripple
|
||||||
|
|||||||
@@ -57,23 +57,28 @@ struct Coro_create_t
|
|||||||
class JobQueue : private Workers::Callback
|
class JobQueue : private Workers::Callback
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
enum class QueueState { Accepting, Stopping, Stopped };
|
||||||
|
|
||||||
/** Coroutines must run to completion. */
|
/** Coroutines must run to completion. */
|
||||||
class Coro : public std::enable_shared_from_this<Coro>
|
class Coro : public std::enable_shared_from_this<Coro>
|
||||||
{
|
{
|
||||||
|
friend class JobQueue;
|
||||||
|
|
||||||
|
public:
|
||||||
|
enum class CoroState { None, Suspended, Running, Finished };
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
std::atomic_bool exiting_ = false;
|
||||||
detail::LocalValues lvs_;
|
detail::LocalValues lvs_;
|
||||||
JobQueue& jq_;
|
JobQueue& jq_;
|
||||||
JobType type_;
|
JobType type_;
|
||||||
std::string name_;
|
std::string name_;
|
||||||
bool running_;
|
std::atomic<CoroState> state_ = CoroState::None;
|
||||||
std::mutex mutex_;
|
std::mutex mutex_;
|
||||||
std::mutex mutex_run_;
|
std::mutex mutex_run_;
|
||||||
std::condition_variable cv_;
|
std::condition_variable cv_;
|
||||||
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
|
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
|
||||||
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
|
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
|
||||||
#ifndef NDEBUG
|
|
||||||
bool finished_ = false;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
// Private: Used in the implementation
|
// Private: Used in the implementation
|
||||||
@@ -97,7 +102,7 @@ public:
|
|||||||
post.
|
post.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
yield() const;
|
yield();
|
||||||
|
|
||||||
/** Schedule coroutine execution.
|
/** Schedule coroutine execution.
|
||||||
Effects:
|
Effects:
|
||||||
@@ -127,17 +132,23 @@ public:
|
|||||||
void
|
void
|
||||||
resume();
|
resume();
|
||||||
|
|
||||||
|
CoroState
|
||||||
|
state() const
|
||||||
|
{
|
||||||
|
return state_;
|
||||||
|
}
|
||||||
|
|
||||||
/** Returns true if the Coro is still runnable (has not returned). */
|
/** Returns true if the Coro is still runnable (has not returned). */
|
||||||
bool
|
bool
|
||||||
runnable() const;
|
runnable() const;
|
||||||
|
|
||||||
/** Once called, the Coro allows early exit without an assert. */
|
|
||||||
void
|
|
||||||
expectEarlyExit();
|
|
||||||
|
|
||||||
/** Waits until coroutine returns from the user function. */
|
/** Waits until coroutine returns from the user function. */
|
||||||
void
|
void
|
||||||
join();
|
join();
|
||||||
|
|
||||||
|
/** Returns true if the coroutine should stop executing */
|
||||||
|
bool
|
||||||
|
shouldStop() const;
|
||||||
};
|
};
|
||||||
|
|
||||||
using JobFunction = std::function<void()>;
|
using JobFunction = std::function<void()>;
|
||||||
@@ -159,21 +170,18 @@ public:
|
|||||||
|
|
||||||
@return true if jobHandler added to queue.
|
@return true if jobHandler added to queue.
|
||||||
*/
|
*/
|
||||||
template <
|
template <typename JobHandler>
|
||||||
typename JobHandler,
|
|
||||||
typename = std::enable_if_t<std::is_same<
|
|
||||||
decltype(std::declval<JobHandler&&>()()),
|
|
||||||
void>::value>>
|
|
||||||
bool
|
bool
|
||||||
addJob(JobType type, std::string const& name, JobHandler&& jobHandler)
|
addJob(JobType type, std::string const& name, JobHandler&& jobHandler)
|
||||||
|
requires std::is_void_v<std::invoke_result_t<JobHandler>>
|
||||||
{
|
{
|
||||||
if (auto optionalCountedJob =
|
if (queueState_ != QueueState::Accepting)
|
||||||
jobCounter_.wrap(std::forward<JobHandler>(jobHandler)))
|
|
||||||
{
|
{
|
||||||
return addRefCountedJob(type, name, std::move(*optionalCountedJob));
|
|
||||||
}
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
return addJobNoStatusCheck(
|
||||||
|
type, name, std::forward<JobHandler>(jobHandler));
|
||||||
|
}
|
||||||
|
|
||||||
/** Creates a coroutine and adds a job to the queue which will run it.
|
/** Creates a coroutine and adds a job to the queue which will run it.
|
||||||
|
|
||||||
@@ -231,13 +239,16 @@ public:
|
|||||||
bool
|
bool
|
||||||
isStopping() const
|
isStopping() const
|
||||||
{
|
{
|
||||||
return stopping_;
|
return queueState_ == QueueState::Stopping;
|
||||||
}
|
}
|
||||||
|
|
||||||
// We may be able to move away from this, but we can keep it during the
|
// We may be able to move away from this, but we can keep it during the
|
||||||
// transition.
|
// transition.
|
||||||
bool
|
bool
|
||||||
isStopped() const;
|
isStopped() const
|
||||||
|
{
|
||||||
|
return queueState_ == QueueState::Stopped;
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
friend class Coro;
|
friend class Coro;
|
||||||
@@ -249,8 +260,7 @@ private:
|
|||||||
std::uint64_t m_lastJob;
|
std::uint64_t m_lastJob;
|
||||||
std::set<Job> m_jobSet;
|
std::set<Job> m_jobSet;
|
||||||
JobCounter jobCounter_;
|
JobCounter jobCounter_;
|
||||||
std::atomic_bool stopping_{false};
|
std::atomic<QueueState> queueState_{QueueState::Accepting};
|
||||||
std::atomic_bool stopped_{false};
|
|
||||||
JobDataMap m_jobData;
|
JobDataMap m_jobData;
|
||||||
JobTypeData m_invalidJobData;
|
JobTypeData m_invalidJobData;
|
||||||
|
|
||||||
@@ -260,6 +270,8 @@ private:
|
|||||||
// The number of suspended coroutines
|
// The number of suspended coroutines
|
||||||
int nSuspend_ = 0;
|
int nSuspend_ = 0;
|
||||||
|
|
||||||
|
std::map<void*, std::weak_ptr<Coro>> m_suspendedCoros;
|
||||||
|
|
||||||
Workers m_workers;
|
Workers m_workers;
|
||||||
|
|
||||||
// Statistics tracking
|
// Statistics tracking
|
||||||
@@ -275,6 +287,22 @@ private:
|
|||||||
JobTypeData&
|
JobTypeData&
|
||||||
getJobTypeData(JobType type);
|
getJobTypeData(JobType type);
|
||||||
|
|
||||||
|
template <typename JobHandler>
|
||||||
|
bool
|
||||||
|
addJobNoStatusCheck(
|
||||||
|
JobType type,
|
||||||
|
std::string const& name,
|
||||||
|
JobHandler&& jobHandler)
|
||||||
|
requires std::is_void_v<std::invoke_result_t<JobHandler>>
|
||||||
|
{
|
||||||
|
if (auto optionalCountedJob =
|
||||||
|
jobCounter_.wrap(std::forward<JobHandler>(jobHandler)))
|
||||||
|
{
|
||||||
|
return addRefCountedJob(type, name, std::move(*optionalCountedJob));
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
// Adds a reference counted job to the JobQueue.
|
// Adds a reference counted job to the JobQueue.
|
||||||
//
|
//
|
||||||
// param type The type of job.
|
// param type The type of job.
|
||||||
@@ -412,6 +440,10 @@ template <class F>
|
|||||||
std::shared_ptr<JobQueue::Coro>
|
std::shared_ptr<JobQueue::Coro>
|
||||||
JobQueue::postCoro(JobType t, std::string const& name, F&& f)
|
JobQueue::postCoro(JobType t, std::string const& name, F&& f)
|
||||||
{
|
{
|
||||||
|
if (queueState_ != QueueState::Accepting)
|
||||||
|
{
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
/* First param is a detail type to make construction private.
|
/* First param is a detail type to make construction private.
|
||||||
Last param is the function the coroutine runs. Signature of
|
Last param is the function the coroutine runs. Signature of
|
||||||
void(std::shared_ptr<Coro>).
|
void(std::shared_ptr<Coro>).
|
||||||
@@ -422,7 +454,6 @@ JobQueue::postCoro(JobType t, std::string const& name, F&& f)
|
|||||||
{
|
{
|
||||||
// The Coro was not successfully posted. Disable it so it's destructor
|
// The Coro was not successfully posted. Disable it so it's destructor
|
||||||
// can run with no negative side effects. Then destroy it.
|
// can run with no negative side effects. Then destroy it.
|
||||||
coro->expectEarlyExit();
|
|
||||||
coro.reset();
|
coro.reset();
|
||||||
}
|
}
|
||||||
return coro;
|
return coro;
|
||||||
|
|||||||
@@ -26,6 +26,12 @@
|
|||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
|
bool
|
||||||
|
JobQueue::Coro::shouldStop() const
|
||||||
|
{
|
||||||
|
return jq_.queueState_ != QueueState::Accepting || exiting_;
|
||||||
|
}
|
||||||
|
|
||||||
JobQueue::JobQueue(
|
JobQueue::JobQueue(
|
||||||
int threadCount,
|
int threadCount,
|
||||||
beast::insight::Collector::ptr const& collector,
|
beast::insight::Collector::ptr const& collector,
|
||||||
@@ -295,7 +301,45 @@ JobQueue::getJobTypeData(JobType type)
|
|||||||
void
|
void
|
||||||
JobQueue::stop()
|
JobQueue::stop()
|
||||||
{
|
{
|
||||||
stopping_ = true;
|
// Once we stop accepting new jobs, all running coroutines won't be able to
|
||||||
|
// get suspended and yield() will return immediately, so we can safely
|
||||||
|
// move m_suspendedCoros, and we can assume that no coroutine will be
|
||||||
|
// suspended in the future.
|
||||||
|
if (queueState_ == QueueState::Stopped)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto accepting = QueueState::Accepting;
|
||||||
|
|
||||||
|
if (!queueState_.compare_exchange_strong(accepting, QueueState::Stopping))
|
||||||
|
{
|
||||||
|
XRPL_ASSERT(
|
||||||
|
false, "Incorrect queueState, should be accepting but not!");
|
||||||
|
}
|
||||||
|
std::map<void*, std::weak_ptr<Coro>> suspendedCoros;
|
||||||
|
{
|
||||||
|
std::unique_lock lock(m_mutex);
|
||||||
|
suspendedCoros = std::move(m_suspendedCoros);
|
||||||
|
}
|
||||||
|
if (!suspendedCoros.empty())
|
||||||
|
{
|
||||||
|
// We should resume the suspended coroutines so that the coroutines
|
||||||
|
// get a chance to exit cleanly.
|
||||||
|
for (auto& [_, coro] : suspendedCoros)
|
||||||
|
{
|
||||||
|
if (auto coroPtr = coro.lock())
|
||||||
|
{
|
||||||
|
// We don't allow any new jobs from outside when we are
|
||||||
|
// stopping, but we should allow new jobs from inside the class.
|
||||||
|
addJobNoStatusCheck(
|
||||||
|
coroPtr->type_, coroPtr->name_, [coroPtr]() {
|
||||||
|
coroPtr->resume();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
using namespace std::chrono_literals;
|
using namespace std::chrono_literals;
|
||||||
jobCounter_.join("JobQueue", 1s, m_journal);
|
jobCounter_.join("JobQueue", 1s, m_journal);
|
||||||
{
|
{
|
||||||
@@ -305,8 +349,9 @@ JobQueue::stop()
|
|||||||
// `Job::doJob` and the return of `JobQueue::processTask`. That is why
|
// `Job::doJob` and the return of `JobQueue::processTask`. That is why
|
||||||
// we must wait on the condition variable to make these assertions.
|
// we must wait on the condition variable to make these assertions.
|
||||||
std::unique_lock<std::mutex> lock(m_mutex);
|
std::unique_lock<std::mutex> lock(m_mutex);
|
||||||
cv_.wait(
|
cv_.wait(lock, [this] {
|
||||||
lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
|
return m_processCount == 0 && nSuspend_ == 0 && m_jobSet.empty();
|
||||||
|
});
|
||||||
XRPL_ASSERT(
|
XRPL_ASSERT(
|
||||||
m_processCount == 0,
|
m_processCount == 0,
|
||||||
"ripple::JobQueue::stop : all processes completed");
|
"ripple::JobQueue::stop : all processes completed");
|
||||||
@@ -314,14 +359,12 @@ JobQueue::stop()
|
|||||||
m_jobSet.empty(), "ripple::JobQueue::stop : all jobs completed");
|
m_jobSet.empty(), "ripple::JobQueue::stop : all jobs completed");
|
||||||
XRPL_ASSERT(
|
XRPL_ASSERT(
|
||||||
nSuspend_ == 0, "ripple::JobQueue::stop : no coros suspended");
|
nSuspend_ == 0, "ripple::JobQueue::stop : no coros suspended");
|
||||||
stopped_ = true;
|
|
||||||
}
|
}
|
||||||
}
|
auto stopping = QueueState::Stopping;
|
||||||
|
if (!queueState_.compare_exchange_strong(stopping, QueueState::Stopped))
|
||||||
bool
|
|
||||||
JobQueue::isStopped() const
|
|
||||||
{
|
{
|
||||||
return stopped_;
|
XRPL_ASSERT(false, "Incorrect queueState, should be stopping but not!");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
|||||||
@@ -128,21 +128,17 @@ doRipplePathFind(RPC::JsonContext& context)
|
|||||||
// May 2017
|
// May 2017
|
||||||
jvResult = context.app.getPathRequests().makeLegacyPathRequest(
|
jvResult = context.app.getPathRequests().makeLegacyPathRequest(
|
||||||
request,
|
request,
|
||||||
[&context]() {
|
[coro = context.coro]() {
|
||||||
// Copying the shared_ptr keeps the coroutine alive up
|
// Capturing the shared_ptr keeps the coroutine alive up
|
||||||
// through the return. Otherwise the storage under the
|
// through the return. Otherwise the storage under the
|
||||||
// captured reference could evaporate when we return from
|
// captured reference could evaporate when we return from
|
||||||
// coroCopy->resume(). This is not strictly necessary, but
|
// coro->post().
|
||||||
// will make maintenance easier.
|
// When post() failed, we won't get a thread to let
|
||||||
std::shared_ptr<JobQueue::Coro> coroCopy{context.coro};
|
// the Coro finish. We should ignore the coroutine and
|
||||||
if (!coroCopy->post())
|
// let it destruct, as the JobQueu has been signaled to
|
||||||
{
|
// close, and resuming it manually messes up the internal
|
||||||
// The post() failed, so we won't get a thread to let
|
// state in JobQueue.
|
||||||
// the Coro finish. We'll call Coro::resume() so the
|
coro->post();
|
||||||
// Coro can finish on our thread. Otherwise the
|
|
||||||
// application will hang on shutdown.
|
|
||||||
coroCopy->resume();
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
context.consumer,
|
context.consumer,
|
||||||
lpLedger,
|
lpLedger,
|
||||||
@@ -150,6 +146,14 @@ doRipplePathFind(RPC::JsonContext& context)
|
|||||||
if (request)
|
if (request)
|
||||||
{
|
{
|
||||||
context.coro->yield();
|
context.coro->yield();
|
||||||
|
// Each time after we resume from yield(), we should
|
||||||
|
// check if cancellation has been requested. It would
|
||||||
|
// be a lot more elegant if we replace boost coroutine
|
||||||
|
// with c++ standard coroutine.
|
||||||
|
if (context.coro->shouldStop())
|
||||||
|
{
|
||||||
|
return jvResult;
|
||||||
|
}
|
||||||
jvResult = request->doStatus(context.params);
|
jvResult = request->doStatus(context.params);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user