Compare commits

..

2 Commits

Author SHA1 Message Date
Ed Hennis
8dcbcb22c2 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-04-10 12:12:54 -04:00
Ed Hennis
f44dfc89cc refactor: acquireAsync will dispatch a job, not the other way around
- Improve job queue collision checks and logging
    - Improve logging related to ledger acquisition and operating mode
      changes
    - Class "CanProcess" to keep track of processing of distinct items
2026-04-10 10:31:21 -04:00
24 changed files with 499 additions and 127 deletions

View File

@@ -76,7 +76,7 @@ jobs:
name: ${{ inputs.config_name }}
runs-on: ${{ fromJSON(inputs.runs_on) }}
container: ${{ inputs.image != '' && inputs.image || null }}
timeout-minutes: 60
timeout-minutes: ${{ inputs.sanitizers != '' && 360 || 60 }}
env:
# Use a namespace to keep the objects separate for each configuration.
CCACHE_NAMESPACE: ${{ inputs.config_name }}
@@ -227,11 +227,17 @@ jobs:
- name: Set sanitizer options
if: ${{ !inputs.build_only && env.SANITIZERS_ENABLED == 'true' }}
env:
CONFIG_NAME: ${{ inputs.config_name }}
run: |
echo "ASAN_OPTIONS=print_stacktrace=1:detect_container_overflow=0:suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/asan.supp" >> ${GITHUB_ENV}
echo "TSAN_OPTIONS=second_deadlock_stack=1:halt_on_error=0:suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/tsan.supp" >> ${GITHUB_ENV}
echo "UBSAN_OPTIONS=suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/ubsan.supp" >> ${GITHUB_ENV}
echo "LSAN_OPTIONS=suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/lsan.supp" >> ${GITHUB_ENV}
ASAN_OPTS="include=${GITHUB_WORKSPACE}/sanitizers/suppressions/runtime-asan-options.txt:suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/asan.supp"
if [[ "${CONFIG_NAME}" == *gcc* ]]; then
ASAN_OPTS="${ASAN_OPTS}:alloc_dealloc_mismatch=0"
fi
echo "ASAN_OPTIONS=${ASAN_OPTS}" >> ${GITHUB_ENV}
echo "TSAN_OPTIONS=include=${GITHUB_WORKSPACE}/sanitizers/suppressions/runtime-tsan-options.txt:suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/tsan.supp" >> ${GITHUB_ENV}
echo "UBSAN_OPTIONS=include=${GITHUB_WORKSPACE}/sanitizers/suppressions/runtime-ubsan-options.txt:suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/ubsan.supp" >> ${GITHUB_ENV}
echo "LSAN_OPTIONS=include=${GITHUB_WORKSPACE}/sanitizers/suppressions/runtime-lsan-options.txt:suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/lsan.supp" >> ${GITHUB_ENV}
- name: Run the separate tests
if: ${{ !inputs.build_only }}

View File

@@ -23,7 +23,7 @@ target_compile_definitions(
BOOST_FILESYSTEM_NO_DEPRECATED
>
$<$<NOT:$<BOOL:${boost_show_deprecated}>>:
BOOST_COROUTINES_NO_DEPRECATION_WARNING
BOOST_COROUTINES2_NO_DEPRECATION_WARNING
BOOST_BEAST_ALLOW_DEPRECATED
BOOST_FILESYSTEM_DEPRECATED
>

View File

@@ -7,7 +7,7 @@ find_package(
COMPONENTS
chrono
container
coroutine
context
date_time
filesystem
json
@@ -26,7 +26,7 @@ target_link_libraries(
Boost::headers
Boost::chrono
Boost::container
Boost::coroutine
Boost::context
Boost::date_time
Boost::filesystem
Boost::json
@@ -38,23 +38,26 @@ target_link_libraries(
if(Boost_COMPILER)
target_link_libraries(xrpl_boost INTERFACE Boost::disable_autolinking)
endif()
if(SANITIZERS_ENABLED AND is_clang)
# TODO: gcc does not support -fsanitize-blacklist...can we do something else for gcc ?
if(NOT Boost_INCLUDE_DIRS AND TARGET Boost::headers)
get_target_property(
Boost_INCLUDE_DIRS
Boost::headers
INTERFACE_INCLUDE_DIRECTORIES
)
endif()
message(STATUS "Adding [${Boost_INCLUDE_DIRS}] to sanitizer blacklist")
file(
WRITE ${CMAKE_CURRENT_BINARY_DIR}/san_bl.txt
"src:${Boost_INCLUDE_DIRS}/*"
)
target_compile_options(
opts
INTERFACE # ignore boost headers for sanitizing
-fsanitize-blacklist=${CMAKE_CURRENT_BINARY_DIR}/san_bl.txt
# GCC 14+ has a false positive -Wuninitialized warning in Boost.Coroutine2's
# state.hpp when compiled with -O3. This is due to GCC's intentional behavior
# change (Bug #98871, #119388) where warnings from inlined system header code
# are no longer suppressed by -isystem. The warning occurs in operator|= in
# boost/coroutine2/detail/state.hpp when inlined from push_control_block::destroy().
# See: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=119388
if(is_gcc AND CMAKE_CXX_COMPILER_VERSION VERSION_GREATER_EQUAL 14)
target_compile_options(xrpl_boost INTERFACE -Wno-uninitialized)
endif()
# Boost.Context's ucontext backend has ASAN fiber-switching annotations
# (start/finish_switch_fiber) that are compiled in when BOOST_USE_ASAN is defined.
# This tells ASAN about coroutine stack switches, preventing false positive
# stack-use-after-scope errors. BOOST_USE_UCONTEXT ensures the ucontext backend
# is selected (fcontext does not support ASAN annotations).
# These defines must match what Boost was compiled with (see conan/profiles/sanitizers).
if(enable_asan)
target_compile_definitions(
xrpl_boost
INTERFACE BOOST_USE_ASAN BOOST_USE_UCONTEXT
)
endif()

View File

@@ -7,16 +7,21 @@ include(default)
{% if compiler == "gcc" %}
{% if "address" in sanitizers or "thread" in sanitizers or "undefinedbehavior" in sanitizers %}
{% set sanitizer_list = [] %}
{% set defines = [] %}
{% set model_code = "" %}
{% set extra_cxxflags = ["-fno-omit-frame-pointer", "-O1", "-Wno-stringop-overflow"] %}
{% if "address" in sanitizers %}
{% set _ = sanitizer_list.append("address") %}
{% set model_code = "-mcmodel=large" %}
{% set _ = defines.append("BOOST_USE_ASAN")%}
{% set _ = defines.append("BOOST_USE_UCONTEXT")%}
{% elif "thread" in sanitizers %}
{% set _ = sanitizer_list.append("thread") %}
{% set model_code = "-mcmodel=medium" %}
{% set _ = extra_cxxflags.append("-Wno-tsan") %}
{% set _ = defines.append("BOOST_USE_TSAN")%}
{% set _ = defines.append("BOOST_USE_UCONTEXT")%}
{% endif %}
{% if "undefinedbehavior" in sanitizers %}
@@ -29,16 +34,22 @@ include(default)
tools.build:cxxflags+=['{{sanitizer_flags}} {{" ".join(extra_cxxflags)}}']
tools.build:sharedlinkflags+=['{{sanitizer_flags}}']
tools.build:exelinkflags+=['{{sanitizer_flags}}']
tools.build:defines+={{defines}}
{% endif %}
{% elif compiler == "apple-clang" or compiler == "clang" %}
{% if "address" in sanitizers or "thread" in sanitizers or "undefinedbehavior" in sanitizers %}
{% set sanitizer_list = [] %}
{% set defines = [] %}
{% set extra_cxxflags = ["-fno-omit-frame-pointer", "-O1"] %}
{% if "address" in sanitizers %}
{% set _ = sanitizer_list.append("address") %}
{% set _ = defines.append("BOOST_USE_ASAN")%}
{% set _ = defines.append("BOOST_USE_UCONTEXT")%}
{% elif "thread" in sanitizers %}
{% set _ = sanitizer_list.append("thread") %}
{% set _ = defines.append("BOOST_USE_TSAN")%}
{% set _ = defines.append("BOOST_USE_UCONTEXT")%}
{% endif %}
{% if "undefinedbehavior" in sanitizers %}
@@ -52,8 +63,24 @@ include(default)
tools.build:cxxflags+=['{{sanitizer_flags}} {{" ".join(extra_cxxflags)}}']
tools.build:sharedlinkflags+=['{{sanitizer_flags}}']
tools.build:exelinkflags+=['{{sanitizer_flags}}']
tools.build:defines+={{defines}}
{% endif %}
{% endif %}
{% endif %}
tools.info.package_id:confs+=["tools.build:cxxflags", "tools.build:exelinkflags", "tools.build:sharedlinkflags"]
tools.info.package_id:confs+=["tools.build:cxxflags", "tools.build:exelinkflags", "tools.build:sharedlinkflags", "tools.build:defines"]
[options]
{% if sanitizers %}
{% if "address" in sanitizers %}
# Build Boost.Context with ucontext backend (not fcontext) so that
# ASAN fiber-switching annotations (__sanitizer_start/finish_switch_fiber)
# are compiled into the library. fcontext (assembly) has no ASAN support.
# define=BOOST_USE_ASAN=1 is critical: it must be defined when building
# Boost.Context itself so the ucontext backend compiles in the ASAN annotations.
boost/*:extra_b2_flags=context-impl=ucontext address-sanitizer=on define=BOOST_USE_ASAN=1
boost/*:without_context=False
# Boost stacktrace fails to build with some sanitizers
boost/*:without_stacktrace=True
{% endif %}
{% endif %}

View File

@@ -1,3 +1,4 @@
import os
import re
from conan.tools.cmake import CMake, CMakeToolchain, cmake_layout
@@ -56,6 +57,9 @@ class Xrpl(ConanFile):
"tests": False,
"unity": False,
"xrpld": False,
"boost/*:without_context": False,
"boost/*:without_coroutine": True,
"boost/*:without_coroutine2": False,
"date/*:header_only": True,
"ed25519/*:shared": False,
"grpc/*:shared": False,
@@ -125,6 +129,12 @@ class Xrpl(ConanFile):
if self.settings.compiler in ["clang", "gcc"]:
self.options["boost"].without_cobalt = True
# Check if environment variable exists
if "SANITIZERS" in os.environ:
sanitizers = os.environ["SANITIZERS"]
if "address" in sanitizers.lower():
self.default_options["fPIC"] = False
def requirements(self):
self.requires("boost/1.90.0", force=True, transitive_headers=True)
self.requires("date/3.0.4", transitive_headers=True)
@@ -191,7 +201,7 @@ class Xrpl(ConanFile):
"boost::headers",
"boost::chrono",
"boost::container",
"boost::coroutine",
"boost::context",
"boost::date_time",
"boost::filesystem",
"boost::json",

View File

@@ -104,6 +104,7 @@ words:
- endmacro
- exceptioned
- Falco
- fcontext
- finalizers
- firewalled
- fmtdur

View File

@@ -0,0 +1,139 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#include <functional>
#include <mutex>
#include <set>
/** RAII class to check if an Item is already being processed on another thread,
* as indicated by it's presence in a Collection.
*
* If the Item is not in the Collection, it will be added under lock in the
* ctor, and removed under lock in the dtor. The object will be considered
* "usable" and evaluate to `true`.
*
* If the Item is in the Collection, no changes will be made to the collection,
* and the CanProcess object will be considered "unusable".
*
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
* Process or skip a block of code, or set a flag.)
*
* The current use is to avoid lock contention that would be involved in
* processing something associated with the Item.
*
* Examples:
*
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
* {
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
* {
* acquire(hash, ...);
* }
* }
*
* bool
* NetworkOPsImp::recvValidation(
* std::shared_ptr<STValidation> const& val,
* std::string const& source)
* {
* CanProcess check(
* validationsMutex_, pendingValidations_, val->getLedgerHash());
* BypassAccept bypassAccept =
* check ? BypassAccept::no : BypassAccept::yes;
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
* }
*
*/
class CanProcess
{
public:
template <class Mutex, class Collection, class Item>
CanProcess(Mutex& mtx, Collection& collection, Item const& item)
: cleanup_(insert(mtx, collection, item))
{
}
~CanProcess()
{
if (cleanup_)
cleanup_();
}
CanProcess(CanProcess const&) = delete;
CanProcess&
operator=(CanProcess const&) = delete;
explicit
operator bool() const
{
return static_cast<bool>(cleanup_);
}
private:
template <bool useIterator, class Mutex, class Collection, class Item>
std::function<void()>
doInsert(Mutex& mtx, Collection& collection, Item const& item)
{
std::unique_lock<Mutex> lock(mtx);
// TODO: Use structured binding once LLVM 16 is the minimum supported
// version. See also: https://github.com/llvm/llvm-project/issues/48582
// https://github.com/llvm/llvm-project/commit/127bf44385424891eb04cff8e52d3f157fc2cb7c
auto const insertResult = collection.insert(item);
auto const it = insertResult.first;
if (!insertResult.second)
return {};
if constexpr (useIterator)
return [&, it]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(it);
};
else
return [&]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(item);
};
}
// Generic insert() function doesn't use iterators because they may get
// invalidated
template <class Mutex, class Collection, class Item>
std::function<void()>
insert(Mutex& mtx, Collection& collection, Item const& item)
{
return doInsert<false>(mtx, collection, item);
}
// Specialize insert() for std::set, which does not invalidate iterators for
// insert and erase
template <class Mutex, class Item>
std::function<void()>
insert(Mutex& mtx, std::set<Item>& collection, Item const& item)
{
return doInsert<true>(mtx, collection, item);
}
// If set, then the item is "usable"
std::function<void()> cleanup_;
};
#endif

View File

@@ -1,7 +1,5 @@
#pragma once
#include <xrpl/basics/ByteUtilities.h>
namespace xrpl {
template <class F>
@@ -10,16 +8,18 @@ JobQueue::Coro::Coro(Coro_create_t, JobQueue& jq, JobType type, std::string cons
, type_(type)
, name_(name)
, coro_(
// Stack size of 1MB wasn't sufficient for deep calls. ASAN tests flagged the issue. Hence
// increasing the size to 1.5MB.
boost::context::protected_fixedsize_stack(1536 * 1024),
[this, fn = std::forward<F>(f)](
boost::coroutines::asymmetric_coroutine<void>::push_type& do_yield) {
boost::coroutines2::asymmetric_coroutine<void>::push_type& do_yield) {
yield_ = &do_yield;
yield();
fn(shared_from_this());
#ifndef NDEBUG
finished_ = true;
#endif
},
boost::coroutines::attributes(megabytes(1)))
})
{
}

View File

@@ -199,7 +199,7 @@ public:
/** Add a suppression peer and get message's relay status.
* Return pair:
* element 1: true if the peer is added.
* element 1: true if the key is added.
* element 2: optional is seated to the relay time point or
* is unseated if has not relayed yet. */
std::pair<bool, std::optional<Stopwatch::time_point>>

View File

@@ -7,7 +7,8 @@
#include <xrpl/core/detail/Workers.h>
#include <xrpl/json/json_value.h>
#include <boost/coroutine/all.hpp>
#include <boost/context/protected_fixedsize_stack.hpp>
#include <boost/coroutine2/all.hpp>
#include <set>
@@ -48,8 +49,8 @@ public:
std::mutex mutex_;
std::mutex mutex_run_;
std::condition_variable cv_;
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
boost::coroutines2::coroutine<void>::pull_type coro_;
boost::coroutines2::coroutine<void>::push_type* yield_;
#ifndef NDEBUG
bool finished_ = false;
#endif

View File

@@ -35,6 +35,8 @@ struct LedgerHeader
// If validated is false, it means "not yet validated."
// Once validated is true, it will never be set false at a later time.
// NOTE: If you are accessing this directly, you are probably doing it
// wrong. Use LedgerMaster::isValidated().
// VFALCO TODO Make this not mutable
bool mutable validated = false;
bool accepted = false;

View File

@@ -185,7 +185,7 @@ public:
virtual bool
isFull() = 0;
virtual void
setMode(OperatingMode om) = 0;
setMode(OperatingMode om, char const* reason) = 0;
virtual bool
isBlocked() = 0;
virtual bool

View File

@@ -1,15 +1,6 @@
# The idea is to empty this file gradually by fixing the underlying issues and removing suppressions.
#
# ASAN_OPTIONS="print_stacktrace=1:detect_container_overflow=0:suppressions=sanitizers/suppressions/asan.supp:halt_on_error=0"
#
# The detect_container_overflow=0 option disables false positives from:
# - Boost intrusive containers (slist_iterator.hpp, hashtable.hpp, aged_unordered_container.h)
# - Boost context/coroutine stack switching (Workers.cpp, thread.h)
#
# See: https://github.com/google/sanitizers/wiki/AddressSanitizerContainerOverflow
# Boost
interceptor_name:boost/asio
# Suppress false positive stack-buffer errors in thread stack allocation
# Related to ASan's __asan_handle_no_return warnings (github.com/google/sanitizers/issues/189)

View File

@@ -85,7 +85,12 @@ public:
}
virtual void
acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) override
acquireAsync(
JobType type,
std::string const& name,
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) override
{
}

View File

@@ -0,0 +1,165 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012-2016 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpl/basics/CanProcess.h>
#include <xrpl/beast/unit_test.h>
#include <memory>
namespace ripple {
namespace test {
struct CanProcess_test : beast::unit_test::suite
{
template <class Mutex, class Collection, class Item>
void
test(
std::string const& name,
Mutex& mtx,
Collection& collection,
std::vector<Item> const& items)
{
testcase(name);
if (!BEAST_EXPECT(!items.empty()))
return;
if (!BEAST_EXPECT(collection.empty()))
return;
// CanProcess objects can't be copied or moved. To make that easier,
// store shared_ptrs
std::vector<std::shared_ptr<CanProcess>> trackers;
// Fill up the vector with two CanProcess for each Item. The first
// inserts the item into the collection and is "good". The second does
// not and is "bad".
for (int i = 0; i < items.size(); ++i)
{
{
auto const& good =
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
BEAST_EXPECT(*good);
}
BEAST_EXPECT(trackers.size() == (2 * i) + 1);
BEAST_EXPECT(collection.size() == i + 1);
{
auto const& bad =
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
BEAST_EXPECT(!*bad);
}
BEAST_EXPECT(trackers.size() == 2 * (i + 1));
BEAST_EXPECT(collection.size() == i + 1);
}
BEAST_EXPECT(collection.size() == items.size());
// Now remove the items from the vector<CanProcess> two at a time, and
// try to get another CanProcess for that item.
for (int i = 0; i < items.size(); ++i)
{
// Remove the "bad" one in the second position
// This will have no effect on the collection
{
auto const iter = trackers.begin() + 1;
BEAST_EXPECT(!**iter);
trackers.erase(iter);
}
BEAST_EXPECT(trackers.size() == (2 * items.size()) - 1);
BEAST_EXPECT(collection.size() == items.size());
{
// Append a new "bad" one
auto const& bad =
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
BEAST_EXPECT(!*bad);
}
BEAST_EXPECT(trackers.size() == 2 * items.size());
BEAST_EXPECT(collection.size() == items.size());
// Remove the "good" one from the front
{
auto const iter = trackers.begin();
BEAST_EXPECT(**iter);
trackers.erase(iter);
}
BEAST_EXPECT(trackers.size() == (2 * items.size()) - 1);
BEAST_EXPECT(collection.size() == items.size() - 1);
{
// Append a new "good" one
auto const& good =
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
BEAST_EXPECT(*good);
}
BEAST_EXPECT(trackers.size() == 2 * items.size());
BEAST_EXPECT(collection.size() == items.size());
}
// Now remove them all two at a time
for (int i = items.size() - 1; i >= 0; --i)
{
// Remove the "bad" one from the front
{
auto const iter = trackers.begin();
BEAST_EXPECT(!**iter);
trackers.erase(iter);
}
BEAST_EXPECT(trackers.size() == (2 * i) + 1);
BEAST_EXPECT(collection.size() == i + 1);
// Remove the "good" one now in front
{
auto const iter = trackers.begin();
BEAST_EXPECT(**iter);
trackers.erase(iter);
}
BEAST_EXPECT(trackers.size() == 2 * i);
BEAST_EXPECT(collection.size() == i);
}
BEAST_EXPECT(trackers.empty());
BEAST_EXPECT(collection.empty());
}
void
run() override
{
{
std::mutex m;
std::set<int> collection;
std::vector<int> const items{1, 2, 3, 4, 5};
test("set of int", m, collection, items);
}
{
std::mutex m;
std::set<std::string> collection;
std::vector<std::string> const items{"one", "two", "three", "four", "five"};
test("set of string", m, collection, items);
}
{
std::mutex m;
std::unordered_set<char> collection;
std::vector<char> const items{'1', '2', '3', '4', '5'};
test("unorderd_set of char", m, collection, items);
}
{
std::mutex m;
std::unordered_set<std::uint64_t> collection;
std::vector<std::uint64_t> const items{100u, 1000u, 150u, 4u, 0u};
test("unordered_set of uint64_t", m, collection, items);
}
}
};
BEAST_DEFINE_TESTSUITE(CanProcess, ripple_basics, ripple);
} // namespace test
} // namespace ripple

View File

@@ -105,10 +105,8 @@ RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash)
// Tell the ledger acquire system that we need the consensus ledger
acquiringLedger_ = hash;
app_.getJobQueue().addJob(jtADVANCE, "GetConsL1", [id = hash, &app = app_, this]() {
JLOG(j_.debug()) << "JOB advanceLedger getConsensusLedger1 started";
app.getInboundLedgers().acquireAsync(id, 0, InboundLedger::Reason::CONSENSUS);
});
app_.getInboundLedgers().acquireAsync(
jtADVANCE, "GetConsL1", hash, 0, InboundLedger::Reason::CONSENSUS);
}
return std::nullopt;
}
@@ -998,7 +996,7 @@ void
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
{
if ((positions == 0u) && app_.getOPs().isFull())
app_.getOPs().setMode(OperatingMode::CONNECTED);
app_.getOPs().setMode(OperatingMode::CONNECTED, "updateOperatingMode: no positions");
}
void

View File

@@ -116,12 +116,8 @@ RCLValidationsAdaptor::acquire(LedgerHash const& hash)
{
JLOG(j_.warn()) << "Need validated ledger for preferred ledger analysis " << hash;
Application* pApp = &app_;
app_.getJobQueue().addJob(jtADVANCE, "GetConsL2", [pApp, hash, this]() {
JLOG(j_.debug()) << "JOB advanceLedger getConsensusLedger2 started";
pApp->getInboundLedgers().acquireAsync(hash, 0, InboundLedger::Reason::CONSENSUS);
});
app_.getInboundLedgers().acquireAsync(
jtADVANCE, "GetConsL2", hash, 0, InboundLedger::Reason::CONSENSUS);
return std::nullopt;
}

View File

@@ -26,7 +26,12 @@ public:
// Queue. TODO review whether all callers of acquire() can use this
// instead. Inbound ledger acquisition is asynchronous anyway.
virtual void
acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) = 0;
acquireAsync(
JobType type,
std::string const& name,
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) = 0;
virtual std::shared_ptr<InboundLedger>
find(LedgerHash const& hash) = 0;

View File

@@ -353,7 +353,14 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
if (!wasProgress)
{
checkLocal();
if (checkLocal())
{
// Done. Something else (probably consensus) built the ledger
// locally while waiting for data (or possibly before requesting)
XRPL_ASSERT(isDone(), "ripple::InboundLedger::onTimer : done");
JLOG(journal_.info()) << "Finished while waiting " << hash_;
return;
}
mByHash = true;

View File

@@ -2,6 +2,7 @@
#include <xrpld/app/ledger/LedgerMaster.h>
#include <xrpld/app/main/Application.h>
#include <xrpl/basics/CanProcess.h>
#include <xrpl/basics/DecayingSample.h>
#include <xrpl/basics/scope.h>
#include <xrpl/beast/container/aged_map.h>
@@ -58,12 +59,15 @@ public:
(reason != InboundLedger::Reason::CONSENSUS))
return {};
std::stringstream ss;
bool isNew = true;
std::shared_ptr<InboundLedger> inbound;
{
ScopedLockType sl(mLock);
if (stopping_)
{
JLOG(j_.debug()) << "Abort(stopping): " << ss.str();
return {};
}
@@ -82,47 +86,61 @@ public:
++mCounter;
}
}
ss << " IsNew: " << (isNew ? "true" : "false");
if (inbound->isFailed())
{
JLOG(j_.debug()) << "Abort(failed): " << ss.str();
return {};
}
if (!isNew)
inbound->update(seq);
if (!inbound->isComplete())
{
JLOG(j_.debug()) << "InProgress: " << ss.str();
return {};
}
JLOG(j_.debug()) << "Complete: " << ss.str();
return inbound->getLedger();
};
using namespace std::chrono_literals;
std::shared_ptr<Ledger const> ledger =
perf::measureDurationAndLog(doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
return ledger;
return perf::measureDurationAndLog(doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
}
void
acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) override
acquireAsync(
JobType type,
std::string const& name,
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) override
{
std::unique_lock lock(acquiresMutex_);
try
if (auto check = std::make_shared<CanProcess const>(acquiresMutex_, pendingAcquires_, hash);
*check)
{
if (pendingAcquires_.contains(hash))
return;
pendingAcquires_.insert(hash);
scope_unlock const unlock(lock);
acquire(hash, seq, reason);
app_.getJobQueue().addJob(type, name, [check, name, hash, seq, reason, this]() {
JLOG(j_.debug()) << "JOB acquireAsync " << name << " started ";
try
{
acquire(hash, seq, reason);
}
catch (std::exception const& e)
{
JLOG(j_.warn()) << "Exception thrown for acquiring new "
"inbound ledger "
<< hash << ": " << e.what();
}
catch (...)
{
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new "
"inbound ledger "
<< hash;
}
});
}
catch (std::exception const& e)
{
JLOG(j_.warn()) << "Exception thrown for acquiring new inbound ledger " << hash << ": "
<< e.what();
}
catch (...)
{
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new inbound ledger " << hash;
}
pendingAcquires_.erase(hash);
}
std::shared_ptr<InboundLedger>

View File

@@ -924,8 +924,9 @@ LedgerMaster::checkAccept(std::shared_ptr<Ledger const> const& ledger)
return;
}
JLOG(m_journal.info()) << "Advancing accepted ledger to " << ledger->header().seq
<< " with >= " << minVal << " validations";
JLOG(m_journal.info()) << "Advancing accepted ledger to " << ledger->header().seq << " ("
<< to_short_string(ledger->header().hash) << ") with >= " << minVal
<< " validations";
ledger->setValidated();
ledger->setFull();

View File

@@ -13,7 +13,8 @@ TimeoutCounter::TimeoutCounter(
QueueJobParameter&& jobParameter,
beast::Journal journal)
: app_(app)
, journal_(journal)
, sink_(journal, to_short_string(hash) + " ")
, journal_(sink_)
, hash_(hash)
, timerInterval_(interval)
, queueJobParameter_(std::move(jobParameter))
@@ -29,6 +30,7 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
{
if (isDone())
return;
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count() << "ms";
timer_.expires_after(timerInterval_);
timer_.async_wait([wptr = pmDowncast()](boost::system::error_code const& ec) {
if (ec == boost::asio::error::operation_aborted)
@@ -36,6 +38,10 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
if (auto ptr = wptr.lock())
{
JLOG(ptr->journal_.debug())
<< "timer: ec: " << ec
<< " (operation_aborted: " << boost::asio::error::operation_aborted << " - "
<< (ec == boost::asio::error::operation_aborted ? "aborted" : "other") << ")";
ScopedLockType sl(ptr->mtx_);
ptr->queueJob(sl);
}

View File

@@ -3,6 +3,7 @@
#include <xrpld/app/main/Application.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/beast/utility/WrappedSink.h>
#include <xrpl/core/Job.h>
#include <boost/asio/basic_waitable_timer.hpp>
@@ -103,6 +104,7 @@ protected:
// Used in this class for access to boost::asio::io_context and
// xrpl::Overlay. Used in subtypes for the kitchen sink.
Application& app_;
beast::WrappedSink sink_;
beast::Journal journal_;
mutable std::recursive_mutex mtx_;

View File

@@ -30,10 +30,10 @@
#include <xrpld/rpc/MPTokenIssuanceID.h>
#include <xrpld/rpc/ServerHandler.h>
#include <xrpl/basics/CanProcess.h>
#include <xrpl/basics/UptimeClock.h>
#include <xrpl/basics/mulDiv.h>
#include <xrpl/basics/safe_cast.h>
#include <xrpl/basics/scope.h>
#include <xrpl/beast/utility/rngfill.h>
#include <xrpl/core/HashRouter.h>
#include <xrpl/core/NetworkIDService.h>
@@ -401,7 +401,7 @@ public:
isFull() override;
void
setMode(OperatingMode om) override;
setMode(OperatingMode om, char const* reason) override;
bool
isBlocked() override;
@@ -839,7 +839,7 @@ NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
inline void
NetworkOPsImp::setStandAlone()
{
setMode(OperatingMode::FULL);
setMode(OperatingMode::FULL, "setStandAlone");
}
inline void
@@ -982,7 +982,7 @@ NetworkOPsImp::processHeartbeatTimer()
{
if (mMode != OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::DISCONNECTED);
setMode(OperatingMode::DISCONNECTED, "Heartbeat: insufficient peers");
std::stringstream ss;
ss << "Node count (" << numPeers << ") has fallen "
<< "below required minimum (" << minPeerCount_ << ").";
@@ -1006,7 +1006,7 @@ NetworkOPsImp::processHeartbeatTimer()
if (mMode == OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "Heartbeat: sufficient peers");
JLOG(m_journal.info()) << "Node count (" << numPeers << ") is sufficient.";
CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers << " peers. ";
}
@@ -1017,11 +1017,11 @@ NetworkOPsImp::processHeartbeatTimer()
CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
if (mMode == OperatingMode::SYNCING)
{
setMode(OperatingMode::SYNCING);
setMode(OperatingMode::SYNCING, "Heartbeat: check syncing");
}
else if (mMode == OperatingMode::CONNECTED)
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "Heartbeat: check connected");
}
auto newMode = mMode.load();
if (origMode != newMode)
@@ -1726,7 +1726,7 @@ void
NetworkOPsImp::setAmendmentBlocked()
{
amendmentBlocked_ = true;
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "setAmendmentBlocked");
}
inline bool
@@ -1757,7 +1757,7 @@ void
NetworkOPsImp::setUNLBlocked()
{
unlBlocked_ = true;
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "setUNLBlocked");
}
inline void
@@ -1857,7 +1857,7 @@ NetworkOPsImp::checkLastClosedLedger(Overlay::PeerSequence const& peerList, uint
if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "check LCL: not on consensus ledger");
}
if (consensus)
@@ -1945,8 +1945,8 @@ NetworkOPsImp::beginConsensus(
// this shouldn't happen unless we jump ledgers
if (mMode == OperatingMode::FULL)
{
JLOG(m_journal.warn()) << "Don't have LCL, going to tracking";
setMode(OperatingMode::TRACKING);
JLOG(m_journal.warn()) << "beginConsensus Don't have LCL, going to tracking";
setMode(OperatingMode::TRACKING, "beginConsensus: No LCL");
CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. ";
}
@@ -2074,7 +2074,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
// validations we have for LCL. If the ledger is good enough, go to
// TRACKING - TODO
if (!needNetworkLedger_)
setMode(OperatingMode::TRACKING);
setMode(OperatingMode::TRACKING, "endConsensus: check tracking");
}
if (((mMode == OperatingMode::CONNECTED) || (mMode == OperatingMode::TRACKING)) &&
@@ -2087,7 +2087,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
if (registry_.get().getTimeKeeper().now() <
(current->header().parentCloseTime + 2 * current->header().closeTimeResolution))
{
setMode(OperatingMode::FULL);
setMode(OperatingMode::FULL, "endConsensus: check full");
}
}
@@ -2099,7 +2099,7 @@ NetworkOPsImp::consensusViewChange()
{
if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING))
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "consensusViewChange");
}
}
@@ -2403,7 +2403,7 @@ NetworkOPsImp::pubPeerStatus(std::function<Json::Value(void)> const& func)
}
void
NetworkOPsImp::setMode(OperatingMode om)
NetworkOPsImp::setMode(OperatingMode om, char const* reason)
{
using namespace std::chrono_literals;
if (om == OperatingMode::CONNECTED)
@@ -2423,11 +2423,12 @@ NetworkOPsImp::setMode(OperatingMode om)
if (mMode == om)
return;
auto const sink = om < mMode ? m_journal.warn() : m_journal.info();
mMode = om;
accounting_.mode(om);
JLOG(m_journal.info()) << "STATE->" << strOperatingMode();
JLOG(sink) << "STATE->" << strOperatingMode() << " - " << reason;
pubServer();
}
@@ -2436,36 +2437,24 @@ NetworkOPsImp::recvValidation(std::shared_ptr<STValidation> const& val, std::str
{
JLOG(m_journal.trace()) << "recvValidation " << val->getLedgerHash() << " from " << source;
std::unique_lock lock(validationsMutex_);
BypassAccept bypassAccept = BypassAccept::no;
try
{
if (pendingValidations_.contains(val->getLedgerHash()))
CanProcess const check(validationsMutex_, pendingValidations_, val->getLedgerHash());
try
{
bypassAccept = BypassAccept::yes;
BypassAccept bypassAccept = check ? BypassAccept::no : BypassAccept::yes;
handleNewValidation(registry_.app(), val, source, bypassAccept, m_journal);
}
else
catch (std::exception const& e)
{
pendingValidations_.insert(val->getLedgerHash());
JLOG(m_journal.warn()) << "Exception thrown for handling new validation "
<< val->getLedgerHash() << ": " << e.what();
}
catch (...)
{
JLOG(m_journal.warn())
<< "Unknown exception thrown for handling new validation " << val->getLedgerHash();
}
scope_unlock const unlock(lock);
handleNewValidation(registry_.get().getApp(), val, source, bypassAccept, m_journal);
}
catch (std::exception const& e)
{
JLOG(m_journal.warn()) << "Exception thrown for handling new validation "
<< val->getLedgerHash() << ": " << e.what();
}
catch (...)
{
JLOG(m_journal.warn()) << "Unknown exception thrown for handling new validation "
<< val->getLedgerHash();
}
if (bypassAccept == BypassAccept::no)
{
pendingValidations_.erase(val->getLedgerHash());
}
lock.unlock();
pubValidation(val);