Compare commits

..

3 Commits

Author SHA1 Message Date
Bart
0586b5678e ci: Pass missing sanitizers input to actions (#6266)
The `upload-conan-deps` workflow that's triggered on push is supposed to upload the Conan dependencies to our remote, so future PR commits can pull those dependencies from the remote. However, as the `sanitize` argument is missing, it was building different dependencies than what the PRs are building for the asan/tsan/ubsan job, so the latter would not find anything in the remote that they could use. This change sets the missing `sanitizers` input variable when running the `build-deps` action. 

Separately, the `setup-conan` action showed the default profile, while we are using the `ci` profile. To ensure the profile is correctly printed when sanitizers are enabled, the environment variable the profile uses is set before calling the action.
2026-01-23 06:40:55 -05:00
Bart
66158d786f ci: Properly propagate Conan credentials (#6265)
The export and upload steps were initially in a separate action, where GitHub Actions does not support the `secrets` keyword, but only `inputs` for the credentials. After they were moved to a reusable workflow, only part of the references to the credentials were updated. This change correctly references to the Conan credentials via `secrets` instead of `inputs`.
2026-01-22 16:05:15 -05:00
Bart
c57ffdbcb8 ci: Explicitly set version when exporting the Conan recipe (#6264)
By default the Conan recipe extracts the version from `BuildInfo.cpp`, but in some of the cases we want to upload a recipe with a suffix derived from the commit hash. This currently then results in the uploading to fail, since there is a version mismatch.

Here we explicitly set the version, and then simplify the steps in the upload workflow since we now need the recipe name (embedded within the conanfile.py but also needed when uploading), the recipe version, and the recipe ref (name/version).
2026-01-22 19:05:59 +00:00
10 changed files with 104 additions and 280 deletions

View File

@@ -31,7 +31,7 @@ runs:
conan config install conan/profiles/ -tf $(conan config home)/profiles/
echo 'Conan profile:'
conan profile show
conan profile show --profile ci
- name: Set up Conan remote
shell: bash

View File

@@ -125,6 +125,8 @@ jobs:
subtract: ${{ inputs.nproc_subtract }}
- name: Setup Conan
env:
SANITIZERS: ${{ inputs.sanitizers }}
uses: ./.github/actions/setup-conan
- name: Build dependencies

View File

@@ -49,10 +49,6 @@ jobs:
id: version
uses: ./.github/actions/generate-version
- name: Determine recipe reference
id: ref
run: echo "ref=xrpl/${{ steps.version.outputs.version }}" >> "${GITHUB_OUTPUT}"
- name: Set up Conan
uses: ./.github/actions/setup-conan
with:
@@ -62,17 +58,16 @@ jobs:
- name: Log into Conan remote
env:
REMOTE_NAME: ${{ inputs.remote_name }}
REMOTE_USERNAME: ${{ inputs.remote_username }}
REMOTE_PASSWORD: ${{ inputs.remote_password }}
REMOTE_USERNAME: ${{ secrets.remote_username }}
REMOTE_PASSWORD: ${{ secrets.remote_password }}
run: conan remote login "${REMOTE_NAME}" "${REMOTE_USERNAME}" --password "${REMOTE_PASSWORD}"
- name: Upload Conan recipe
env:
RECIPE_REF: ${{ steps.ref.outputs.ref }}
REMOTE_NAME: ${{ inputs.remote_name }}
run: |
conan export .
conan upload --confirm --check --remote="${REMOTE_NAME}" ${RECIPE_REF}
conan export . --version=${{ steps.version.outputs.version }}
conan upload --confirm --check --remote="${REMOTE_NAME}" xrpl/${{ steps.version.outputs.version }}
outputs:
ref: ${{ steps.ref.outputs.ref }}
ref: xrpl/${{ steps.version.outputs.version }}

View File

@@ -84,6 +84,8 @@ jobs:
subtract: ${{ env.NPROC_SUBTRACT }}
- name: Setup Conan
env:
SANITIZERS: ${{ matrix.sanitizers }}
uses: ./.github/actions/setup-conan
with:
remote_name: ${{ env.CONAN_REMOTE_NAME }}
@@ -98,6 +100,7 @@ jobs:
# Set the verbosity to "quiet" for Windows to avoid an excessive
# amount of logs. For other OSes, the "verbose" logs are more useful.
log_verbosity: ${{ runner.os == 'Windows' && 'quiet' || 'verbose' }}
sanitizers: ${{ matrix.sanitizers }}
- name: Log into Conan remote
if: ${{ github.repository_owner == 'XRPLF' && (github.event_name == 'push' || github.event_name == 'workflow_dispatch') }}

View File

@@ -15,21 +15,17 @@ JobQueue::Coro::Coro(
: jq_(jq)
, type_(type)
, name_(name)
, running_(false)
, coro_(
[this, fn = std::forward<F>(f)](
boost::coroutines::asymmetric_coroutine<void>::push_type&
do_yield) {
yield_ = &do_yield;
yield();
// self makes Coro alive until this function returns
std::shared_ptr<Coro> self;
if (!shouldStop())
{
self = shared_from_this();
fn(self);
}
state_ = CoroState::Finished;
cv_.notify_all();
fn(shared_from_this());
#ifndef NDEBUG
finished_ = true;
#endif
},
boost::coroutines::attributes(megabytes(1)))
{
@@ -37,58 +33,29 @@ JobQueue::Coro::Coro(
inline JobQueue::Coro::~Coro()
{
XRPL_ASSERT(
state_ != CoroState::Running,
"xrpl::JobQueue::Coro::~Coro : is not running");
exiting_ = true;
// Resume the coroutine so that it has a chance to clean things up
if (state_ == CoroState::Suspended)
{
resume();
}
XRPL_ASSERT(
state_ == CoroState::Finished,
"xrpl::JobQueue::Coro::~Coro : is finished");
#ifndef NDEBUG
XRPL_ASSERT(finished_, "xrpl::JobQueue::Coro::~Coro : is finished");
#endif
}
inline bool
JobQueue::Coro::yield()
inline void
JobQueue::Coro::yield() const
{
{
std::lock_guard lock(jq_.m_mutex);
if (shouldStop())
return false;
state_ = CoroState::Suspended;
cv_.notify_all();
++jq_.nSuspend_;
jq_.m_suspendedCoros[this] = weak_from_this();
jq_.cv_.notify_all();
}
(*yield_)();
return true;
}
inline bool
JobQueue::Coro::post()
{
if (state_ == CoroState::Finished)
{
// The coroutine will run until it finishes if the JobQueue has stopped.
// In the case where make_shared<Coro>() succeeds and then the JobQueue
// stops before coro_ gets executed, post() will still be called and
// state_ will be Finished. We should return false and avoid XRPL_ASSERT
// as it's a valid edge case.
return false;
std::lock_guard lk(mutex_run_);
running_ = true;
}
XRPL_ASSERT(
state_ == CoroState::Suspended,
"ripple::JobQueue::Coro::post : should be suspended");
// sp keeps 'this' alive
if (jq_.addJob(
type_, name_, [this, sp = shared_from_this()]() { resume(); }))
@@ -96,6 +63,9 @@ JobQueue::Coro::post()
return true;
}
// The coroutine will not run. Clean up running_.
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
return false;
}
@@ -103,18 +73,13 @@ JobQueue::Coro::post()
inline void
JobQueue::Coro::resume()
{
auto suspended = CoroState::Suspended;
if (!state_.compare_exchange_strong(suspended, CoroState::Running))
{
return;
std::lock_guard lk(mutex_run_);
running_ = true;
}
cv_.notify_all();
{
std::lock_guard lock(jq_.m_mutex);
jq_.m_suspendedCoros.erase(this);
--jq_.nSuspend_;
jq_.cv_.notify_all();
}
auto saved = detail::getLocalValues().release();
detail::getLocalValues().reset(&lvs_);
@@ -124,24 +89,43 @@ JobQueue::Coro::resume()
coro_();
detail::getLocalValues().release();
detail::getLocalValues().reset(saved);
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
}
inline bool
JobQueue::Coro::runnable() const
{
// There's an edge case where the coroutine has updated the status
// to Finished but the function hasn't exited and therefore, coro_ is
// still valid. However, the coroutine is not technically runnable in this
// case, because the coroutine is about to exit and static_cast<bool>(coro_)
// is going to be false.
return static_cast<bool>(coro_) && state_ != CoroState::Finished;
return static_cast<bool>(coro_);
}
inline void
JobQueue::Coro::expectEarlyExit()
{
#ifndef NDEBUG
if (!finished_)
#endif
{
// expectEarlyExit() must only ever be called from outside the
// Coro's stack. It you're inside the stack you can simply return
// and be done.
//
// That said, since we're outside the Coro's stack, we need to
// decrement the nSuspend that the Coro's call to yield caused.
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
#ifndef NDEBUG
finished_ = true;
#endif
}
}
inline void
JobQueue::Coro::join()
{
std::unique_lock<std::mutex> lk(mutex_run_);
cv_.wait(lk, [this]() { return state_ != CoroState::Running; });
cv_.wait(lk, [this]() { return running_ == false; });
}
} // namespace xrpl

View File

@@ -37,28 +37,23 @@ struct Coro_create_t
class JobQueue : private Workers::Callback
{
public:
enum class QueueState { Accepting, Stopping, Stopped };
/** Coroutines must run to completion. */
class Coro : public std::enable_shared_from_this<Coro>
{
friend class JobQueue;
public:
enum class CoroState { None, Suspended, Running, Finished };
private:
std::atomic_bool exiting_ = false;
detail::LocalValues lvs_;
JobQueue& jq_;
JobType type_;
std::string name_;
std::atomic<CoroState> state_ = CoroState::None;
bool running_;
std::mutex mutex_;
std::mutex mutex_run_;
std::condition_variable cv_;
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
#ifndef NDEBUG
bool finished_ = false;
#endif
public:
// Private: Used in the implementation
@@ -79,12 +74,10 @@ public:
Note:
The associated Job function returns.
Undefined behavior if called consecutively without a corresponding
post.
It may not suspend at all if the JobQueue is stopping, and returns
false in such a case.
post.
*/
bool
yield();
void
yield() const;
/** Schedule coroutine execution.
Effects:
@@ -114,23 +107,17 @@ public:
void
resume();
CoroState
state() const
{
return state_;
}
/** Returns true if the Coro is still runnable (has not returned). */
bool
runnable() const;
/** Once called, the Coro allows early exit without an assert. */
void
expectEarlyExit();
/** Waits until coroutine returns from the user function. */
void
join();
/** Returns true if the coroutine should stop executing */
[[nodiscard]] bool
shouldStop() const;
};
using JobFunction = std::function<void()>;
@@ -152,17 +139,20 @@ public:
@return true if jobHandler added to queue.
*/
template <typename JobHandler>
template <
typename JobHandler,
typename = std::enable_if_t<std::is_same<
decltype(std::declval<JobHandler&&>()()),
void>::value>>
bool
addJob(JobType type, std::string const& name, JobHandler&& jobHandler)
requires std::is_void_v<std::invoke_result_t<JobHandler>>
{
if (queueState_ != QueueState::Accepting)
if (auto optionalCountedJob =
jobCounter_.wrap(std::forward<JobHandler>(jobHandler)))
{
return false;
return addRefCountedJob(type, name, std::move(*optionalCountedJob));
}
return addJobNoStatusCheck(
type, name, std::forward<JobHandler>(jobHandler));
return false;
}
/** Creates a coroutine and adds a job to the queue which will run it.
@@ -221,16 +211,13 @@ public:
bool
isStopping() const
{
return queueState_ == QueueState::Stopping;
return stopping_;
}
// We may be able to move away from this, but we can keep it during the
// transition.
bool
isStopped() const
{
return queueState_ == QueueState::Stopped;
}
isStopped() const;
private:
friend class Coro;
@@ -242,7 +229,8 @@ private:
std::uint64_t m_lastJob;
std::set<Job> m_jobSet;
JobCounter jobCounter_;
std::atomic<QueueState> queueState_{QueueState::Accepting};
std::atomic_bool stopping_{false};
std::atomic_bool stopped_{false};
JobDataMap m_jobData;
JobTypeData m_invalidJobData;
@@ -252,8 +240,6 @@ private:
// The number of suspended coroutines
int nSuspend_ = 0;
std::map<void*, std::weak_ptr<Coro>> m_suspendedCoros;
Workers m_workers;
// Statistics tracking
@@ -269,22 +255,6 @@ private:
JobTypeData&
getJobTypeData(JobType type);
template <typename JobHandler>
bool
addJobNoStatusCheck(
JobType type,
std::string const& name,
JobHandler&& jobHandler)
requires std::is_void_v<std::invoke_result_t<JobHandler>>
{
if (auto optionalCountedJob =
jobCounter_.wrap(std::forward<JobHandler>(jobHandler)))
{
return addRefCountedJob(type, name, std::move(*optionalCountedJob));
}
return false;
}
// Adds a reference counted job to the JobQueue.
//
// param type The type of job.
@@ -422,10 +392,6 @@ template <class F>
std::shared_ptr<JobQueue::Coro>
JobQueue::postCoro(JobType t, std::string const& name, F&& f)
{
if (queueState_ != QueueState::Accepting)
{
return nullptr;
}
/* First param is a detail type to make construction private.
Last param is the function the coroutine runs. Signature of
void(std::shared_ptr<Coro>).
@@ -436,6 +402,7 @@ JobQueue::postCoro(JobType t, std::string const& name, F&& f)
{
// The Coro was not successfully posted. Disable it so it's destructor
// can run with no negative side effects. Then destroy it.
coro->expectEarlyExit();
coro.reset();
}
return coro;

View File

@@ -6,12 +6,6 @@
namespace xrpl {
bool
JobQueue::Coro::shouldStop() const
{
return jq_.queueState_ != QueueState::Accepting || exiting_;
}
JobQueue::JobQueue(
int threadCount,
beast::insight::Collector::ptr const& collector,
@@ -280,45 +274,7 @@ JobQueue::getJobTypeData(JobType type)
void
JobQueue::stop()
{
// Once we stop accepting new jobs, all running coroutines won't be able to
// get suspended and yield() will return immediately, so we can safely
// move m_suspendedCoros, and we can assume that no coroutine will be
// suspended in the future.
if (queueState_ == QueueState::Stopped)
{
return;
}
auto accepting = QueueState::Accepting;
if (!queueState_.compare_exchange_strong(accepting, QueueState::Stopping))
{
XRPL_ASSERT(
false, "Incorrect queueState, should be accepting but not!");
}
std::map<void*, std::weak_ptr<Coro>> suspendedCoros;
{
std::unique_lock lock(m_mutex);
suspendedCoros = std::move(m_suspendedCoros);
}
if (!suspendedCoros.empty())
{
// We should resume the suspended coroutines so that the coroutines
// get a chance to exit cleanly.
for (auto& [_, coro] : suspendedCoros)
{
if (auto coroPtr = coro.lock())
{
// We don't allow any new jobs from outside when we are
// stopping, but we should allow new jobs from inside the class.
addJobNoStatusCheck(
coroPtr->type_, coroPtr->name_, [coroPtr]() {
coroPtr->resume();
});
}
}
}
stopping_ = true;
using namespace std::chrono_literals;
jobCounter_.join("JobQueue", 1s, m_journal);
{
@@ -328,9 +284,8 @@ JobQueue::stop()
// `Job::doJob` and the return of `JobQueue::processTask`. That is why
// we must wait on the condition variable to make these assertions.
std::unique_lock<std::mutex> lock(m_mutex);
cv_.wait(lock, [this] {
return m_processCount == 0 && nSuspend_ == 0 && m_jobSet.empty();
});
cv_.wait(
lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
XRPL_ASSERT(
m_processCount == 0,
"xrpl::JobQueue::stop : all processes completed");
@@ -338,12 +293,14 @@ JobQueue::stop()
m_jobSet.empty(), "xrpl::JobQueue::stop : all jobs completed");
XRPL_ASSERT(
nSuspend_ == 0, "xrpl::JobQueue::stop : no coros suspended");
stopped_ = true;
}
auto stopping = QueueState::Stopping;
if (!queueState_.compare_exchange_strong(stopping, QueueState::Stopped))
{
XRPL_ASSERT(false, "Incorrect queueState, should be stopping but not!");
}
}
bool
JobQueue::isStopped() const
{
return stopped_;
}
void

View File

@@ -82,21 +82,12 @@ public:
}));
gate g;
gate gStart;
auto coro = env.app().getJobQueue().postCoro(
env.app().getJobQueue().postCoro(
jtCLIENT, "CoroTest", [&](auto const& c) {
gStart.signal();
c->post();
c->yield();
g.signal();
});
// Wait for the coroutine to start.
BEAST_EXPECT(gStart.wait_for(5s));
BEAST_EXPECT(coro->state() == JobQueue::Coro::CoroState::Suspended);
// Post the coroutine.
coro->post();
BEAST_EXPECT(g.wait_for(5s));
}
@@ -165,78 +156,12 @@ public:
BEAST_EXPECT(*lv == -1);
}
void
stopJobQueueWhenCoroutineSuspended()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("Stop JobQueue when a coroutine is suspended");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
bool started = false;
bool finished = false;
std::optional<bool> shouldStop;
std::condition_variable cv;
std::mutex m;
std::unique_lock<std::mutex> lk(m);
auto coro = env.app().getJobQueue().postCoro(
jtCLIENT, "Coroutine-Test", [&](auto const& c) {
started = true;
cv.notify_all();
c->yield();
finished = true;
shouldStop = c->shouldStop();
cv.notify_all();
});
cv.wait_for(lk, 5s, [&]() { return started; });
env.app().getJobQueue().stop();
cv.wait_for(lk, 5s, [&]() { return finished; });
BEAST_EXPECT(finished);
BEAST_EXPECT(shouldStop.has_value() && *shouldStop == true);
}
void
coroutineGetsDestroyedBeforeExecuting()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("Coroutine gets destroyed before executing");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
{
auto coro = std::make_shared<JobQueue::Coro>(
Coro_create_t{},
env.app().getJobQueue(),
JobType::jtCLIENT,
"test",
[](auto coro) {
});
}
pass();
}
void
run() override
{
correct_order();
incorrect_order();
thread_specific_storage();
stopJobQueueWhenCoroutineSuspended();
coroutineGetsDestroyedBeforeExecuting();
}
};

View File

@@ -67,8 +67,6 @@ class JobQueue_test : public beast::unit_test::suite
while (yieldCount == 0)
;
coro->join();
// Now re-post until the Coro says it is done.
int old = yieldCount;
while (coro->runnable())
@@ -100,9 +98,6 @@ class JobQueue_test : public beast::unit_test::suite
return;
}
while (yieldCount == 0)
; // We should wait for the job to start and yield
// Wait for the Job to run and yield.
coro->join();

View File

@@ -109,17 +109,21 @@ doRipplePathFind(RPC::JsonContext& context)
// May 2017
jvResult = context.app.getPathRequests().makeLegacyPathRequest(
request,
[coro = context.coro]() {
// Capturing the shared_ptr keeps the coroutine alive up
[&context]() {
// Copying the shared_ptr keeps the coroutine alive up
// through the return. Otherwise the storage under the
// captured reference could evaporate when we return from
// coro->post().
// When post() failed, we won't get a thread to let
// the Coro finish. We should ignore the coroutine and
// let it destruct, as the JobQueu has been signaled to
// close, and resuming it manually messes up the internal
// state in JobQueue.
coro->post();
// coroCopy->resume(). This is not strictly necessary, but
// will make maintenance easier.
std::shared_ptr<JobQueue::Coro> coroCopy{context.coro};
if (!coroCopy->post())
{
// The post() failed, so we won't get a thread to let
// the Coro finish. We'll call Coro::resume() so the
// Coro can finish on our thread. Otherwise the
// application will hang on shutdown.
coroCopy->resume();
}
},
context.consumer,
lpLedger,
@@ -127,14 +131,6 @@ doRipplePathFind(RPC::JsonContext& context)
if (request)
{
context.coro->yield();
// Each time after we resume from yield(), we should
// check if cancellation has been requested. It would
// be a lot more elegant if we replace boost coroutine
// with c++ standard coroutine.
if (context.coro->shouldStop())
{
return jvResult;
}
jvResult = request->doStatus(context.params);
}