Compare commits

..

5 Commits

Author SHA1 Message Date
Michael Legleux
1e01cd34f7 Set version to 2.5.0 2025-06-23 10:13:01 -07:00
Alex Kremer
e2fa5c1b7c chore: Change libXRPL check conan remote to dev (#5482)
This change aligns the Conan remote used by the libXRPL Clio compatibility check workflow with the recent changes applied to Clio.
2025-06-20 17:02:16 +00:00
Ed Hennis
fc0984d286 Require a message on "Application::signalStop" (#5255)
This change adds a message parameter to Application::signalStop for extra context.
2025-06-20 16:24:34 +00:00
Valentin Balaschenko
8b3dcd41f7 refactor: Change getNodeFat Missing Node State Tree error into warning (#5455) 2025-06-20 15:44:42 +00:00
Denis Angell
8f2f5310e2 Fix: Improve error handling in Batch RPC response (#5503) 2025-06-18 17:46:45 -04:00
28 changed files with 624 additions and 429 deletions

View File

@@ -1,6 +1,6 @@
name: Check libXRPL compatibility with Clio
env:
CONAN_URL: http://18.143.149.228:8081/artifactory/api/conan/conan-non-prod
CONAN_URL: http://18.143.149.228:8081/artifactory/api/conan/dev
CONAN_LOGIN_USERNAME_RIPPLE: ${{ secrets.CONAN_USERNAME }}
CONAN_PASSWORD_RIPPLE: ${{ secrets.CONAN_TOKEN }}
on:

View File

@@ -940,7 +940,23 @@
#
# path Location to store the database
#
# Optional keys for NuDB and RocksDB:
# Optional keys
#
# cache_size Size of cache for database records. Default is 16384.
# Setting this value to 0 will use the default value.
#
# cache_age Length of time in minutes to keep database records
# cached. Default is 5 minutes. Setting this value to
# 0 will use the default value.
#
# Note: if neither cache_size nor cache_age is
# specified, the cache for database records will not
# be created. If only one of cache_size or cache_age
# is specified, the cache will be created using the
# default value for the unspecified parameter.
#
# Note: the cache will not be created if online_delete
# is specified.
#
# fast_load Boolean. If set, load the last persisted ledger
# from disk upon process start before syncing to
@@ -948,6 +964,8 @@
# if sufficient IOPS capacity is available.
# Default 0.
#
# Optional keys for NuDB or RocksDB:
#
# earliest_seq The default is 32570 to match the XRP ledger
# network's earliest allowed sequence. Alternate
# networks may set this value. Minimum value of 1.

View File

@@ -21,6 +21,7 @@
#define RIPPLE_BASICS_SHAMAP_HASH_H_INCLUDED
#include <xrpl/basics/base_uint.h>
#include <xrpl/basics/partitioned_unordered_map.h>
#include <ostream>

View File

@@ -170,6 +170,9 @@ public:
bool
retrieve(key_type const& key, T& data);
mutex_type&
peekMutex();
std::vector<key_type>
getKeys() const;
@@ -190,14 +193,11 @@ public:
private:
SharedPointerType
initialFetch(key_type const& key);
initialFetch(key_type const& key, std::lock_guard<mutex_type> const& l);
void
collect_metrics();
Mutex&
lockPartition(key_type const& key) const;
private:
struct Stats
{
@@ -300,8 +300,8 @@ private:
[[maybe_unused]] clock_type::time_point const& now,
typename KeyValueCacheType::map_type& partition,
SweptPointersVector& stuffToSweep,
std::atomic<int>& allRemoval,
Mutex& partitionLock);
std::atomic<int>& allRemovals,
std::lock_guard<std::recursive_mutex> const&);
[[nodiscard]] std::thread
sweepHelper(
@@ -310,12 +310,14 @@ private:
typename KeyOnlyCacheType::map_type& partition,
SweptPointersVector&,
std::atomic<int>& allRemovals,
Mutex& partitionLock);
std::lock_guard<std::recursive_mutex> const&);
beast::Journal m_journal;
clock_type& m_clock;
Stats m_stats;
mutex_type mutable m_mutex;
// Used for logging
std::string m_name;
@@ -326,11 +328,10 @@ private:
clock_type::duration const m_target_age;
// Number of items cached
std::atomic<int> m_cache_count;
int m_cache_count;
cache_type m_cache; // Hold strong reference to recent objects
std::atomic<std::uint64_t> m_hits;
std::atomic<std::uint64_t> m_misses;
mutable std::vector<mutex_type> partitionLocks_;
std::uint64_t m_hits;
std::uint64_t m_misses;
};
} // namespace ripple

View File

@@ -60,7 +60,6 @@ inline TaggedCache<
, m_hits(0)
, m_misses(0)
{
partitionLocks_ = std::vector<mutex_type>(m_cache.partitions());
}
template <
@@ -106,13 +105,8 @@ TaggedCache<
KeyEqual,
Mutex>::size() const
{
std::size_t totalSize = 0;
for (size_t i = 0; i < partitionLocks_.size(); ++i)
{
std::lock_guard<Mutex> lock(partitionLocks_[i]);
totalSize += m_cache.map()[i].size();
}
return totalSize;
std::lock_guard lock(m_mutex);
return m_cache.size();
}
template <
@@ -135,7 +129,8 @@ TaggedCache<
KeyEqual,
Mutex>::getCacheSize() const
{
return m_cache_count.load(std::memory_order_relaxed);
std::lock_guard lock(m_mutex);
return m_cache_count;
}
template <
@@ -158,7 +153,8 @@ TaggedCache<
KeyEqual,
Mutex>::getTrackSize() const
{
return size();
std::lock_guard lock(m_mutex);
return m_cache.size();
}
template <
@@ -181,10 +177,9 @@ TaggedCache<
KeyEqual,
Mutex>::getHitRate()
{
auto hits = m_hits.load(std::memory_order_relaxed);
auto misses = m_misses.load(std::memory_order_relaxed);
float total = float(hits + misses);
return hits * (100.0f / std::max(1.0f, total));
std::lock_guard lock(m_mutex);
auto const total = static_cast<float>(m_hits + m_misses);
return m_hits * (100.0f / std::max(1.0f, total));
}
template <
@@ -207,12 +202,9 @@ TaggedCache<
KeyEqual,
Mutex>::clear()
{
for (auto& mutex : partitionLocks_)
mutex.lock();
std::lock_guard lock(m_mutex);
m_cache.clear();
for (auto& mutex : partitionLocks_)
mutex.unlock();
m_cache_count.store(0, std::memory_order_relaxed);
m_cache_count = 0;
}
template <
@@ -235,14 +227,11 @@ TaggedCache<
KeyEqual,
Mutex>::reset()
{
for (auto& mutex : partitionLocks_)
mutex.lock();
std::lock_guard lock(m_mutex);
m_cache.clear();
for (auto& mutex : partitionLocks_)
mutex.unlock();
m_cache_count.store(0, std::memory_order_relaxed);
m_hits.store(0, std::memory_order_relaxed);
m_misses.store(0, std::memory_order_relaxed);
m_cache_count = 0;
m_hits = 0;
m_misses = 0;
}
template <
@@ -266,7 +255,7 @@ TaggedCache<
KeyEqual,
Mutex>::touch_if_exists(KeyComparable const& key)
{
std::lock_guard<Mutex> lock(lockPartition(key));
std::lock_guard lock(m_mutex);
auto const iter(m_cache.find(key));
if (iter == m_cache.end())
{
@@ -308,9 +297,26 @@ TaggedCache<
auto const start = std::chrono::steady_clock::now();
{
when_expire =
now + std::chrono::hours(1); // any future time works too to make
// sure that nothing survives
std::lock_guard lock(m_mutex);
if (m_target_size == 0 ||
(static_cast<int>(m_cache.size()) <= m_target_size))
{
when_expire = now - m_target_age;
}
else
{
when_expire = now - m_target_age * m_target_size / m_cache.size();
clock_type::duration const minimumAge(std::chrono::seconds(1));
if (when_expire > (now - minimumAge))
when_expire = now - minimumAge;
JLOG(m_journal.trace())
<< m_name << " is growing fast " << m_cache.size() << " of "
<< m_target_size << " aging at " << (now - when_expire).count()
<< " of " << m_target_age.count();
}
std::vector<std::thread> workers;
workers.reserve(m_cache.partitions());
@@ -324,13 +330,12 @@ TaggedCache<
m_cache.map()[p],
allStuffToSweep[p],
allRemovals,
partitionLocks_[p]));
lock));
}
for (std::thread& worker : workers)
worker.join();
int removals = allRemovals.load(std::memory_order_relaxed);
m_cache_count.fetch_sub(removals, std::memory_order_relaxed);
m_cache_count -= allRemovals;
}
// At this point allStuffToSweep will go out of scope outside the lock
// and decrement the reference count on each strong pointer.
@@ -364,8 +369,7 @@ TaggedCache<
{
// Remove from cache, if !valid, remove from map too. Returns true if
// removed from cache
std::lock_guard<Mutex> lock(lockPartition(key));
std::lock_guard lock(m_mutex);
auto cit = m_cache.find(key);
@@ -378,7 +382,7 @@ TaggedCache<
if (entry.isCached())
{
m_cache_count.fetch_sub(1, std::memory_order_relaxed);
--m_cache_count;
entry.ptr.convertToWeak();
ret = true;
}
@@ -416,16 +420,17 @@ TaggedCache<
{
// Return canonical value, store if needed, refresh in cache
// Return values: true=we had the data already
std::lock_guard lock(m_mutex);
std::lock_guard<Mutex> lock(lockPartition(key));
auto cit = m_cache.find(key);
if (cit == m_cache.end())
{
m_cache.emplace(
std::piecewise_construct,
std::forward_as_tuple(key),
std::forward_as_tuple(m_clock.now(), data));
m_cache_count.fetch_add(1, std::memory_order_relaxed);
++m_cache_count;
return false;
}
@@ -474,12 +479,12 @@ TaggedCache<
data = cachedData;
}
m_cache_count.fetch_add(1, std::memory_order_relaxed);
++m_cache_count;
return true;
}
entry.ptr = data;
m_cache_count.fetch_add(1, std::memory_order_relaxed);
++m_cache_count;
return false;
}
@@ -555,11 +560,10 @@ TaggedCache<
KeyEqual,
Mutex>::fetch(key_type const& key)
{
std::lock_guard<Mutex> lock(lockPartition(key));
auto ret = initialFetch(key);
std::lock_guard<mutex_type> l(m_mutex);
auto ret = initialFetch(key, l);
if (!ret)
m_misses.fetch_add(1, std::memory_order_relaxed);
++m_misses;
return ret;
}
@@ -623,8 +627,8 @@ TaggedCache<
Mutex>::insert(key_type const& key)
-> std::enable_if_t<IsKeyCache, ReturnType>
{
std::lock_guard lock(m_mutex);
clock_type::time_point const now(m_clock.now());
std::lock_guard<Mutex> lock(lockPartition(key));
auto [it, inserted] = m_cache.emplace(
std::piecewise_construct,
std::forward_as_tuple(key),
@@ -664,6 +668,29 @@ TaggedCache<
return true;
}
template <
class Key,
class T,
bool IsKeyCache,
class SharedWeakUnionPointer,
class SharedPointerType,
class Hash,
class KeyEqual,
class Mutex>
inline auto
TaggedCache<
Key,
T,
IsKeyCache,
SharedWeakUnionPointer,
SharedPointerType,
Hash,
KeyEqual,
Mutex>::peekMutex() -> mutex_type&
{
return m_mutex;
}
template <
class Key,
class T,
@@ -687,13 +714,10 @@ TaggedCache<
std::vector<key_type> v;
{
std::lock_guard lock(m_mutex);
v.reserve(m_cache.size());
for (std::size_t i = 0; i < partitionLocks_.size(); ++i)
{
std::lock_guard<Mutex> lock(partitionLocks_[i]);
for (auto const& entry : m_cache.map()[i])
v.push_back(entry.first);
}
for (auto const& _ : m_cache)
v.push_back(_.first);
}
return v;
@@ -719,12 +743,11 @@ TaggedCache<
KeyEqual,
Mutex>::rate() const
{
auto hits = m_hits.load(std::memory_order_relaxed);
auto misses = m_misses.load(std::memory_order_relaxed);
auto const tot = hits + misses;
std::lock_guard lock(m_mutex);
auto const tot = m_hits + m_misses;
if (tot == 0)
return 0.0;
return double(hits) / tot;
return 0;
return double(m_hits) / tot;
}
template <
@@ -748,16 +771,18 @@ TaggedCache<
KeyEqual,
Mutex>::fetch(key_type const& digest, Handler const& h)
{
std::lock_guard<Mutex> lock(lockPartition(digest));
if (auto ret = initialFetch(digest))
return ret;
{
std::lock_guard l(m_mutex);
if (auto ret = initialFetch(digest, l))
return ret;
}
auto sle = h();
if (!sle)
return {};
m_misses.fetch_add(1, std::memory_order_relaxed);
std::lock_guard l(m_mutex);
++m_misses;
auto const [it, inserted] =
m_cache.emplace(digest, Entry(m_clock.now(), std::move(sle)));
if (!inserted)
@@ -784,10 +809,9 @@ TaggedCache<
SharedPointerType,
Hash,
KeyEqual,
Mutex>::initialFetch(key_type const& key)
Mutex>::
initialFetch(key_type const& key, std::lock_guard<mutex_type> const& l)
{
std::lock_guard<Mutex> lock(lockPartition(key));
auto cit = m_cache.find(key);
if (cit == m_cache.end())
return {};
@@ -795,7 +819,7 @@ TaggedCache<
Entry& entry = cit->second;
if (entry.isCached())
{
m_hits.fetch_add(1, std::memory_order_relaxed);
++m_hits;
entry.touch(m_clock.now());
return entry.ptr.getStrong();
}
@@ -803,13 +827,12 @@ TaggedCache<
if (entry.isCached())
{
// independent of cache size, so not counted as a hit
m_cache_count.fetch_add(1, std::memory_order_relaxed);
++m_cache_count;
entry.touch(m_clock.now());
return entry.ptr.getStrong();
}
m_cache.erase(cit); // TODO: if this erase happens on fetch, what is left
// for a sweep?
m_cache.erase(cit);
return {};
}
@@ -838,11 +861,10 @@ TaggedCache<
{
beast::insight::Gauge::value_type hit_rate(0);
{
auto const hits = m_hits.load(std::memory_order_relaxed);
auto const misses = m_misses.load(std::memory_order_relaxed);
auto const total = hits + misses;
std::lock_guard lock(m_mutex);
auto const total(m_hits + m_misses);
if (total != 0)
hit_rate = (hits * 100) / total;
hit_rate = (m_hits * 100) / total;
}
m_stats.hit_rate.set(hit_rate);
}
@@ -873,14 +895,12 @@ TaggedCache<
typename KeyValueCacheType::map_type& partition,
SweptPointersVector& stuffToSweep,
std::atomic<int>& allRemovals,
Mutex& partitionLock)
std::lock_guard<std::recursive_mutex> const&)
{
return std::thread([&, this]() {
int cacheRemovals = 0;
int mapRemovals = 0;
std::lock_guard<Mutex> lock(partitionLock);
// Keep references to all the stuff we sweep
// so that we can destroy them outside the lock.
stuffToSweep.reserve(partition.size());
@@ -964,14 +984,12 @@ TaggedCache<
typename KeyOnlyCacheType::map_type& partition,
SweptPointersVector&,
std::atomic<int>& allRemovals,
Mutex& partitionLock)
std::lock_guard<std::recursive_mutex> const&)
{
return std::thread([&, this]() {
int cacheRemovals = 0;
int mapRemovals = 0;
std::lock_guard<Mutex> lock(partitionLock);
// Keep references to all the stuff we sweep
// so that we can destroy them outside the lock.
{
@@ -1006,29 +1024,6 @@ TaggedCache<
});
}
template <
class Key,
class T,
bool IsKeyCache,
class SharedWeakUnionPointer,
class SharedPointerType,
class Hash,
class KeyEqual,
class Mutex>
inline Mutex&
TaggedCache<
Key,
T,
IsKeyCache,
SharedWeakUnionPointer,
SharedPointerType,
Hash,
KeyEqual,
Mutex>::lockPartition(key_type const& key) const
{
return partitionLocks_[m_cache.partition_index(key)];
}
} // namespace ripple
#endif

View File

@@ -277,12 +277,6 @@ public:
return map_;
}
partition_map_type const&
map() const
{
return map_;
}
iterator
begin()
{
@@ -327,12 +321,6 @@ public:
return cend();
}
std::size_t
partition_index(key_type const& key) const
{
return partitioner(key);
}
private:
template <class T>
void
@@ -392,7 +380,7 @@ public:
clear()
{
for (auto& p : map_)
p.clear(); // TODO make sure that it is locked inside
p.clear();
}
iterator
@@ -418,7 +406,7 @@ public:
{
std::size_t ret = 0;
for (auto& p : map_)
ret += p.size(); // TODO make sure that it is locked inside
ret += p.size();
return ret;
}

View File

@@ -22,6 +22,7 @@
#include <xrpl/basics/ByteUtilities.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/basics/partitioned_unordered_map.h>
#include <cstdint>

View File

@@ -36,7 +36,7 @@ namespace BuildInfo {
// and follow the format described at http://semver.org/
//------------------------------------------------------------------------------
// clang-format off
char const* const versionString = "2.5.0-rc2"
char const* const versionString = "2.5.0"
// clang-format on
#if defined(DEBUG) || defined(SANITIZER)

View File

@@ -760,6 +760,12 @@ isRawTransactionOkay(STObject const& st, std::string& reason)
{
TxType const tt =
safe_cast<TxType>(raw.getFieldU16(sfTransactionType));
if (tt == ttBATCH)
{
reason = "Raw Transactions may not contain batch transactions.";
return false;
}
raw.applyTemplate(getTxFormat(tt)->getSOTemplate());
}
catch (std::exception const& e)

View File

@@ -24,6 +24,7 @@
#include <xrpld/app/misc/HashRouter.h>
#include <xrpld/app/misc/Transaction.h>
#include <xrpld/app/tx/apply.h>
#include <xrpld/app/tx/detail/Batch.h>
#include <xrpl/protocol/Batch.h>
#include <xrpl/protocol/Feature.h>
@@ -317,7 +318,8 @@ class Batch_test : public beast::unit_test::suite
env.close();
}
// temINVALID: Batch: batch cannot have inner batch txn.
// DEFENSIVE: temINVALID: Batch: batch cannot have inner batch txn.
// ACTUAL: telENV_RPC_FAILED: isRawTransactionOkay()
{
auto const seq = env.seq(alice);
auto const batchFee = batch::calcBatchFee(env, 0, 2);
@@ -325,7 +327,7 @@ class Batch_test : public beast::unit_test::suite
batch::inner(
batch::outer(alice, seq, batchFee, tfAllOrNothing), seq),
batch::inner(pay(alice, bob, XRP(1)), seq + 2),
ter(temINVALID));
ter(telENV_RPC_FAILED));
env.close();
}
@@ -3953,6 +3955,176 @@ class Batch_test : public beast::unit_test::suite
}
}
void
testValidateRPCResponse(FeatureBitset features)
{
// Verifying that the RPC response from submit includes
// the account_sequence_available, account_sequence_next,
// open_ledger_cost and validated_ledger_index fields.
testcase("Validate RPC response");
using namespace jtx;
Env env(*this);
Account const alice("alice");
Account const bob("bob");
env.fund(XRP(10000), alice, bob);
env.close();
// tes
{
auto const baseFee = env.current()->fees().base;
auto const aliceSeq = env.seq(alice);
auto jtx = env.jt(pay(alice, bob, XRP(1)));
Serializer s;
jtx.stx->add(s);
auto const jr = env.rpc("submit", strHex(s.slice()))[jss::result];
env.close();
BEAST_EXPECT(jr.isMember(jss::account_sequence_available));
BEAST_EXPECT(
jr[jss::account_sequence_available].asUInt() == aliceSeq + 1);
BEAST_EXPECT(jr.isMember(jss::account_sequence_next));
BEAST_EXPECT(
jr[jss::account_sequence_next].asUInt() == aliceSeq + 1);
BEAST_EXPECT(jr.isMember(jss::open_ledger_cost));
BEAST_EXPECT(jr[jss::open_ledger_cost] == to_string(baseFee));
BEAST_EXPECT(jr.isMember(jss::validated_ledger_index));
}
// tec failure
{
auto const baseFee = env.current()->fees().base;
auto const aliceSeq = env.seq(alice);
env(fset(bob, asfRequireDest));
auto jtx = env.jt(pay(alice, bob, XRP(1)), seq(aliceSeq));
Serializer s;
jtx.stx->add(s);
auto const jr = env.rpc("submit", strHex(s.slice()))[jss::result];
env.close();
BEAST_EXPECT(jr.isMember(jss::account_sequence_available));
BEAST_EXPECT(
jr[jss::account_sequence_available].asUInt() == aliceSeq + 1);
BEAST_EXPECT(jr.isMember(jss::account_sequence_next));
BEAST_EXPECT(
jr[jss::account_sequence_next].asUInt() == aliceSeq + 1);
BEAST_EXPECT(jr.isMember(jss::open_ledger_cost));
BEAST_EXPECT(jr[jss::open_ledger_cost] == to_string(baseFee));
BEAST_EXPECT(jr.isMember(jss::validated_ledger_index));
}
// tem failure
{
auto const baseFee = env.current()->fees().base;
auto const aliceSeq = env.seq(alice);
auto jtx = env.jt(pay(alice, bob, XRP(1)), seq(aliceSeq + 1));
Serializer s;
jtx.stx->add(s);
auto const jr = env.rpc("submit", strHex(s.slice()))[jss::result];
env.close();
BEAST_EXPECT(jr.isMember(jss::account_sequence_available));
BEAST_EXPECT(
jr[jss::account_sequence_available].asUInt() == aliceSeq);
BEAST_EXPECT(jr.isMember(jss::account_sequence_next));
BEAST_EXPECT(jr[jss::account_sequence_next].asUInt() == aliceSeq);
BEAST_EXPECT(jr.isMember(jss::open_ledger_cost));
BEAST_EXPECT(jr[jss::open_ledger_cost] == to_string(baseFee));
BEAST_EXPECT(jr.isMember(jss::validated_ledger_index));
}
}
void
testBatchCalculateBaseFee(FeatureBitset features)
{
using namespace jtx;
Env env(*this);
Account const alice("alice");
Account const bob("bob");
Account const carol("carol");
env.fund(XRP(10000), alice, bob, carol);
env.close();
auto getBaseFee = [&](JTx const& jtx) -> XRPAmount {
Serializer s;
jtx.stx->add(s);
return Batch::calculateBaseFee(*env.current(), *jtx.stx);
};
// bad: Inner Batch transaction found
{
auto const seq = env.seq(alice);
XRPAmount const batchFee = batch::calcBatchFee(env, 0, 2);
auto jtx = env.jt(
batch::outer(alice, seq, batchFee, tfAllOrNothing),
batch::inner(
batch::outer(alice, seq, batchFee, tfAllOrNothing), seq),
batch::inner(pay(alice, bob, XRP(1)), seq + 2));
XRPAmount const txBaseFee = getBaseFee(jtx);
BEAST_EXPECT(txBaseFee == XRPAmount(INITIAL_XRP));
}
// bad: Raw Transactions array exceeds max entries.
{
auto const seq = env.seq(alice);
XRPAmount const batchFee = batch::calcBatchFee(env, 0, 2);
auto jtx = env.jt(
batch::outer(alice, seq, batchFee, tfAllOrNothing),
batch::inner(pay(alice, bob, XRP(1)), seq + 1),
batch::inner(pay(alice, bob, XRP(1)), seq + 2),
batch::inner(pay(alice, bob, XRP(1)), seq + 3),
batch::inner(pay(alice, bob, XRP(1)), seq + 4),
batch::inner(pay(alice, bob, XRP(1)), seq + 5),
batch::inner(pay(alice, bob, XRP(1)), seq + 6),
batch::inner(pay(alice, bob, XRP(1)), seq + 7),
batch::inner(pay(alice, bob, XRP(1)), seq + 8),
batch::inner(pay(alice, bob, XRP(1)), seq + 9));
XRPAmount const txBaseFee = getBaseFee(jtx);
BEAST_EXPECT(txBaseFee == XRPAmount(INITIAL_XRP));
}
// bad: Signers array exceeds max entries.
{
auto const seq = env.seq(alice);
XRPAmount const batchFee = batch::calcBatchFee(env, 0, 2);
auto jtx = env.jt(
batch::outer(alice, seq, batchFee, tfAllOrNothing),
batch::inner(pay(alice, bob, XRP(10)), seq + 1),
batch::inner(pay(alice, bob, XRP(5)), seq + 2),
batch::sig(
bob,
carol,
alice,
bob,
carol,
alice,
bob,
carol,
alice,
alice));
XRPAmount const txBaseFee = getBaseFee(jtx);
BEAST_EXPECT(txBaseFee == XRPAmount(INITIAL_XRP));
}
// good:
{
auto const seq = env.seq(alice);
XRPAmount const batchFee = batch::calcBatchFee(env, 0, 2);
auto jtx = env.jt(
batch::outer(alice, seq, batchFee, tfAllOrNothing),
batch::inner(pay(alice, bob, XRP(1)), seq + 1),
batch::inner(pay(bob, alice, XRP(2)), seq + 2));
XRPAmount const txBaseFee = getBaseFee(jtx);
BEAST_EXPECT(txBaseFee == batchFee);
}
}
void
testWithFeats(FeatureBitset features)
{
@@ -3983,6 +4155,8 @@ class Batch_test : public beast::unit_test::suite
testBatchTxQueue(features);
testBatchNetworkOps(features);
testBatchDelegate(features);
testValidateRPCResponse(features);
testBatchCalculateBaseFee(features);
}
public:

View File

@@ -558,8 +558,23 @@ public:
Env env(*this, envconfig(onlineDelete));
/////////////////////////////////////////////////////////////
// Create NodeStore with two backends to allow online deletion of data.
// Normally, SHAMapStoreImp handles all these details.
// Create the backend. Normally, SHAMapStoreImp handles all these
// details
auto nscfg = env.app().config().section(ConfigSection::nodeDatabase());
// Provide default values:
if (!nscfg.exists("cache_size"))
nscfg.set(
"cache_size",
std::to_string(env.app().config().getValueFor(
SizedItem::treeCacheSize, std::nullopt)));
if (!nscfg.exists("cache_age"))
nscfg.set(
"cache_age",
std::to_string(env.app().config().getValueFor(
SizedItem::treeCacheAge, std::nullopt)));
NodeStoreScheduler scheduler(env.app().getJobQueue());
std::string const writableDb = "write";
@@ -567,8 +582,9 @@ public:
auto writableBackend = makeBackendRotating(env, scheduler, writableDb);
auto archiveBackend = makeBackendRotating(env, scheduler, archiveDb);
// Create NodeStore with two backends to allow online deletion of
// data
constexpr int readThreads = 4;
auto nscfg = env.app().config().section(ConfigSection::nodeDatabase());
auto dbr = std::make_unique<NodeStore::DatabaseRotatingImp>(
scheduler,
readThreads,

View File

@@ -32,66 +32,64 @@ public:
void
run() override
{
// using namespace std::chrono_literals;
// TestStopwatch clock;
// clock.set(0);
using namespace std::chrono_literals;
TestStopwatch clock;
clock.set(0);
// using Key = std::string;
// using Cache = TaggedCache<Key, int, true>;
using Key = std::string;
using Cache = TaggedCache<Key, int, true>;
test::SuiteJournal j("KeyCacheTest", *this);
BEAST_EXPECT(true);
// Insert an item, retrieve it, and age it so it gets purged.
{
Cache c("test", LedgerIndex(1), 2s, clock, j);
// // Insert an item, retrieve it, and age it so it gets purged.
// {
// // Cache c("test", LedgerIndex(1), 2s, clock, j);
BEAST_EXPECT(c.size() == 0);
BEAST_EXPECT(c.insert("one"));
BEAST_EXPECT(!c.insert("one"));
BEAST_EXPECT(c.size() == 1);
BEAST_EXPECT(c.touch_if_exists("one"));
++clock;
c.sweep();
BEAST_EXPECT(c.size() == 1);
++clock;
c.sweep();
BEAST_EXPECT(c.size() == 0);
BEAST_EXPECT(!c.touch_if_exists("one"));
}
// // BEAST_EXPECT(c.size() == 0);
// // BEAST_EXPECT(c.insert("one"));
// // BEAST_EXPECT(!c.insert("one"));
// // BEAST_EXPECT(c.size() == 1);
// // BEAST_EXPECT(c.touch_if_exists("one"));
// // ++clock;
// // c.sweep();
// // BEAST_EXPECT(c.size() == 1);
// // ++clock;
// // c.sweep();
// // BEAST_EXPECT(c.size() == 0);
// // BEAST_EXPECT(!c.touch_if_exists("one"));
// }
// Insert two items, have one expire
{
Cache c("test", LedgerIndex(2), 2s, clock, j);
// // Insert two items, have one expire
// {
// // Cache c("test", LedgerIndex(2), 2s, clock, j);
BEAST_EXPECT(c.insert("one"));
BEAST_EXPECT(c.size() == 1);
BEAST_EXPECT(c.insert("two"));
BEAST_EXPECT(c.size() == 2);
++clock;
c.sweep();
BEAST_EXPECT(c.size() == 2);
BEAST_EXPECT(c.touch_if_exists("two"));
++clock;
c.sweep();
BEAST_EXPECT(c.size() == 1);
}
// // BEAST_EXPECT(c.insert("one"));
// // BEAST_EXPECT(c.size() == 1);
// // BEAST_EXPECT(c.insert("two"));
// // BEAST_EXPECT(c.size() == 2);
// // ++clock;
// // c.sweep();
// // BEAST_EXPECT(c.size() == 2);
// // BEAST_EXPECT(c.touch_if_exists("two"));
// // ++clock;
// // c.sweep();
// // BEAST_EXPECT(c.size() == 1);
// }
// Insert three items (1 over limit), sweep
{
Cache c("test", LedgerIndex(2), 3s, clock, j);
// // Insert three items (1 over limit), sweep
// {
// Cache c("test", LedgerIndex(2), 3s, clock, j);
// // BEAST_EXPECT(c.insert("one"));
// // ++clock;
// // BEAST_EXPECT(c.insert("two"));
// // ++clock;
// // BEAST_EXPECT(c.insert("three"));
// // ++clock;
// // BEAST_EXPECT(c.size() == 3);
// // c.sweep();
// // BEAST_EXPECT(c.size() < 3);
// }
BEAST_EXPECT(c.insert("one"));
++clock;
BEAST_EXPECT(c.insert("two"));
++clock;
BEAST_EXPECT(c.insert("three"));
++clock;
BEAST_EXPECT(c.size() == 3);
c.sweep();
BEAST_EXPECT(c.size() < 3);
}
}
};

View File

@@ -96,7 +96,7 @@ Env::AppBundle::~AppBundle()
if (app)
{
app->getJobQueue().rendezvous();
app->signalStop();
app->signalStop("~AppBundle");
}
if (thread.joinable())
thread.join();

View File

@@ -27,6 +27,8 @@
namespace ripple {
// FIXME: Need to clean up ledgers by index at some point
LedgerHistory::LedgerHistory(
beast::insight::Collector::ptr const& collector,
Application& app)
@@ -45,7 +47,6 @@ LedgerHistory::LedgerHistory(
std::chrono::minutes{5},
stopwatch(),
app_.journal("TaggedCache"))
, mLedgersByIndex(256)
, j_(app.journal("LedgerHistory"))
{
}
@@ -62,18 +63,12 @@ LedgerHistory::insert(
ledger->stateMap().getHash().isNonZero(),
"ripple::LedgerHistory::insert : nonzero hash");
// TODOL merge the below into a single call to avoid lock and race
// conditions, i.e. - return alreadyHad on assignment somehow.
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
bool const alreadyHad = m_ledgers_by_hash.canonicalize_replace_cache(
ledger->info().hash, ledger);
if (validated)
{
mLedgersByIndex[ledger->info().seq] = ledger->info().hash;
JLOG(j_.info()) << "LedgerHistory::insert: mLedgersByIndex size: "
<< mLedgersByIndex.size() << " , total size: "
<< mLedgersByIndex.size() *
(sizeof(LedgerIndex) + sizeof(LedgerHash));
}
return alreadyHad;
}
@@ -81,7 +76,7 @@ LedgerHistory::insert(
LedgerHash
LedgerHistory::getLedgerHash(LedgerIndex index)
{
// TODO: is it safe to get iterator without lock here?
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
if (auto it = mLedgersByIndex.find(index); it != mLedgersByIndex.end())
return it->second;
return {};
@@ -91,12 +86,13 @@ std::shared_ptr<Ledger const>
LedgerHistory::getLedgerBySeq(LedgerIndex index)
{
{
// TODO: this lock is not needed
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
auto it = mLedgersByIndex.find(index);
if (it != mLedgersByIndex.end())
{
uint256 hash = it->second;
sl.unlock();
return getLedgerByHash(hash);
}
}
@@ -112,19 +108,13 @@ LedgerHistory::getLedgerBySeq(LedgerIndex index)
{
// Add this ledger to the local tracking by index
// std::unique_lock sl(m_ledgers_by_hash.peekMutex());
// TODO: make sure that canonicalize_replace_client lock the partition
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
XRPL_ASSERT(
ret->isImmutable(),
"ripple::LedgerHistory::getLedgerBySeq : immutable result ledger");
m_ledgers_by_hash.canonicalize_replace_client(ret->info().hash, ret);
mLedgersByIndex[ret->info().seq] = ret->info().hash;
JLOG(j_.info())
<< "LedgerHistory::getLedgerBySeq: mLedgersByIndex size: "
<< mLedgersByIndex.size() << " , total size: "
<< mLedgersByIndex.size() *
(sizeof(LedgerIndex) + sizeof(LedgerHash));
return (ret->info().seq == index) ? ret : nullptr;
}
}
@@ -468,8 +458,7 @@ LedgerHistory::builtLedger(
XRPL_ASSERT(
!hash.isZero(), "ripple::LedgerHistory::builtLedger : nonzero hash");
// std::unique_lock sl(m_consensus_validated.peekMutex());
// TODO: make sure that canonicalize_replace_client lock the partition
std::unique_lock sl(m_consensus_validated.peekMutex());
auto entry = std::make_shared<cv_entry>();
m_consensus_validated.canonicalize_replace_client(index, entry);
@@ -511,8 +500,7 @@ LedgerHistory::validatedLedger(
!hash.isZero(),
"ripple::LedgerHistory::validatedLedger : nonzero hash");
// std::unique_lock sl(m_consensus_validated.peekMutex());
// TODO: make sure that canonicalize_replace_client lock the partition
std::unique_lock sl(m_consensus_validated.peekMutex());
auto entry = std::make_shared<cv_entry>();
m_consensus_validated.canonicalize_replace_client(index, entry);
@@ -547,9 +535,7 @@ LedgerHistory::validatedLedger(
bool
LedgerHistory::fixIndex(LedgerIndex ledgerIndex, LedgerHash const& ledgerHash)
{
// std::unique_lock sl(m_ledgers_by_hash.peekMutex());
// TODO: how to ensure that? "Ensure m_ledgers_by_hash doesn't have the
// wrong hash for a particular index"
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
auto it = mLedgersByIndex.find(ledgerIndex);
if ((it != mLedgersByIndex.end()) && (it->second != ledgerHash))

View File

@@ -22,7 +22,6 @@
#include <xrpld/app/ledger/Ledger.h>
#include <xrpld/app/main/Application.h>
#include <xrpld/core/detail/LRUMap.h>
#include <xrpl/beast/insight/Collector.h>
#include <xrpl/protocol/RippleLedgerHash.h>
@@ -150,7 +149,7 @@ private:
ConsensusValidated m_consensus_validated;
// Maps ledger indexes to the corresponding hash.
LRUMap<LedgerIndex, LedgerHash> mLedgersByIndex; // validated ledgers
std::map<LedgerIndex, LedgerHash> mLedgersByIndex; // validated ledgers
beast::Journal j_;
};

View File

@@ -404,7 +404,7 @@ public:
it->second->touch();
++it;
}
else if ((la + std::chrono::seconds(10)) < start)
else if ((la + std::chrono::minutes(1)) < start)
{
stuffToSweep.push_back(it->second);
// shouldn't cause the actual final delete

View File

@@ -285,7 +285,7 @@ public:
config_->CONFIG_DIR),
*this,
logs_->journal("PerfLog"),
[this] { signalStop(); }))
[this] { signalStop("PerfLog"); }))
, m_txMaster(*this)
@@ -505,7 +505,7 @@ public:
void
run() override;
void
signalStop(std::string msg = "") override;
signalStop(std::string msg) override;
bool
checkSigs() const override;
void
@@ -977,7 +977,7 @@ public:
if (!config_->standalone() &&
!getRelationalDatabase().transactionDbHasSpace(*config_))
{
signalStop();
signalStop("Out of transaction DB space");
}
// VFALCO NOTE Does the order of calls matter?
@@ -1193,7 +1193,7 @@ ApplicationImp::setup(boost::program_options::variables_map const& cmdline)
JLOG(m_journal.info()) << "Received signal " << signum;
if (signum == SIGTERM || signum == SIGINT)
signalStop();
signalStop("Signal: " + to_string(signum));
});
auto debug_log = config_->getDebugLogFile();

View File

@@ -141,7 +141,7 @@ public:
virtual void
run() = 0;
virtual void
signalStop(std::string msg = "") = 0;
signalStop(std::string msg) = 0;
virtual bool
checkSigs() const = 0;
virtual void

View File

@@ -1701,7 +1701,7 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
}
}
if (!isTemMalformed(e.result) && validatedLedgerIndex)
if (validatedLedgerIndex)
{
auto [fee, accountSeq, availableSeq] =
app_.getTxQ().getTxRequiredFeeAndSeq(

View File

@@ -162,6 +162,20 @@ std::unique_ptr<NodeStore::Database>
SHAMapStoreImp::makeNodeStore(int readThreads)
{
auto nscfg = app_.config().section(ConfigSection::nodeDatabase());
// Provide default values:
if (!nscfg.exists("cache_size"))
nscfg.set(
"cache_size",
std::to_string(app_.config().getValueFor(
SizedItem::treeCacheSize, std::nullopt)));
if (!nscfg.exists("cache_age"))
nscfg.set(
"cache_age",
std::to_string(app_.config().getValueFor(
SizedItem::treeCacheAge, std::nullopt)));
std::unique_ptr<NodeStore::Database> db;
if (deleteInterval_)
@@ -255,6 +269,8 @@ SHAMapStoreImp::run()
LedgerIndex lastRotated = state_db_.getState().lastRotated;
netOPs_ = &app_.getOPs();
ledgerMaster_ = &app_.getLedgerMaster();
fullBelowCache_ = &(*app_.getNodeFamily().getFullBelowCache());
treeNodeCache_ = &(*app_.getNodeFamily().getTreeNodeCache());
if (advisoryDelete_)
canDelete_ = state_db_.getCanDelete();
@@ -547,12 +563,16 @@ void
SHAMapStoreImp::clearCaches(LedgerIndex validatedSeq)
{
ledgerMaster_->clearLedgerCachePrior(validatedSeq);
fullBelowCache_->clear();
}
void
SHAMapStoreImp::freshenCaches()
{
freshenCache(app_.getMasterTransaction().getCache());
if (freshenCache(*treeNodeCache_))
return;
if (freshenCache(app_.getMasterTransaction().getCache()))
return;
}
void

View File

@@ -112,6 +112,8 @@ private:
// as of run() or before
NetworkOPs* netOPs_ = nullptr;
LedgerMaster* ledgerMaster_ = nullptr;
FullBelowCache* fullBelowCache_ = nullptr;
TreeNodeCache* treeNodeCache_ = nullptr;
static constexpr auto nodeStoreName_ = "NodeStore";

View File

@@ -61,7 +61,10 @@ Batch::calculateBaseFee(ReadView const& view, STTx const& tx)
// LCOV_EXCL_START
if (baseFee > maxAmount - view.fees().base)
throw std::overflow_error("XRPAmount overflow");
{
JLOG(debugLog().error()) << "BatchTrace: Base fee overflow detected.";
return XRPAmount{INITIAL_XRP};
}
// LCOV_EXCL_STOP
XRPAmount const batchBase = view.fees().base + baseFee;
@@ -72,32 +75,36 @@ Batch::calculateBaseFee(ReadView const& view, STTx const& tx)
{
auto const& txns = tx.getFieldArray(sfRawTransactions);
XRPL_ASSERT(
txns.size() <= maxBatchTxCount,
"Raw Transactions array exceeds max entries.");
// LCOV_EXCL_START
if (txns.size() > maxBatchTxCount)
throw std::length_error(
"Raw Transactions array exceeds max entries");
{
JLOG(debugLog().error())
<< "BatchTrace: Raw Transactions array exceeds max entries.";
return XRPAmount{INITIAL_XRP};
}
// LCOV_EXCL_STOP
for (STObject txn : txns)
{
STTx const stx = STTx{std::move(txn)};
XRPL_ASSERT(
stx.getTxnType() != ttBATCH, "Inner Batch transaction found.");
// LCOV_EXCL_START
if (stx.getTxnType() == ttBATCH)
throw std::invalid_argument("Inner Batch transaction found");
{
JLOG(debugLog().error())
<< "BatchTrace: Inner Batch transaction found.";
return XRPAmount{INITIAL_XRP};
}
// LCOV_EXCL_STOP
auto const fee = ripple::calculateBaseFee(view, stx);
// LCOV_EXCL_START
if (txnFees > maxAmount - fee)
throw std::overflow_error("XRPAmount overflow");
{
JLOG(debugLog().error())
<< "BatchTrace: XRPAmount overflow in txnFees calculation.";
return XRPAmount{INITIAL_XRP};
}
// LCOV_EXCL_STOP
txnFees += fee;
}
@@ -108,13 +115,14 @@ Batch::calculateBaseFee(ReadView const& view, STTx const& tx)
if (tx.isFieldPresent(sfBatchSigners))
{
auto const& signers = tx.getFieldArray(sfBatchSigners);
XRPL_ASSERT(
signers.size() <= maxBatchTxCount,
"Batch Signers array exceeds max entries.");
// LCOV_EXCL_START
if (signers.size() > maxBatchTxCount)
throw std::length_error("Batch Signers array exceeds max entries");
{
JLOG(debugLog().error())
<< "BatchTrace: Batch Signers array exceeds max entries.";
return XRPAmount{INITIAL_XRP};
}
// LCOV_EXCL_STOP
for (STObject const& signer : signers)
@@ -128,16 +136,28 @@ Batch::calculateBaseFee(ReadView const& view, STTx const& tx)
// LCOV_EXCL_START
if (signerCount > 0 && view.fees().base > maxAmount / signerCount)
throw std::overflow_error("XRPAmount overflow");
{
JLOG(debugLog().error())
<< "BatchTrace: XRPAmount overflow in signerCount calculation.";
return XRPAmount{INITIAL_XRP};
}
// LCOV_EXCL_STOP
XRPAmount signerFees = signerCount * view.fees().base;
// LCOV_EXCL_START
if (signerFees > maxAmount - txnFees)
throw std::overflow_error("XRPAmount overflow");
{
JLOG(debugLog().error())
<< "BatchTrace: XRPAmount overflow in signerFees calculation.";
return XRPAmount{INITIAL_XRP};
}
if (txnFees + signerFees > maxAmount - batchBase)
throw std::overflow_error("XRPAmount overflow");
{
JLOG(debugLog().error())
<< "BatchTrace: XRPAmount overflow in total fee calculation.";
return XRPAmount{INITIAL_XRP};
}
// LCOV_EXCL_STOP
// 10 drops per batch signature + sum of inner tx fees + batchBase

View File

@@ -1,153 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_APP_LRU_MAP_H_INCLUDED
#define RIPPLE_APP_LRU_MAP_H_INCLUDED
#include <list>
#include <map>
#include <utility>
namespace ripple {
template <typename Key, typename Value>
class LRUMap
{
public:
explicit LRUMap(std::size_t capacity) : capacity_(capacity)
{
// TODO: check capacity_ > 0
}
Value&
operator[](Key const& key)
{
auto it = data_.find(key);
if (it != data_.end())
{
bump_to_front(key);
return it->second;
}
if (data_.size() >= capacity_)
{
std::size_t excess = (data_.size() + 1) - capacity_;
for (std::size_t i = 0; i < excess; ++i)
{
auto lru = usage_list_.back();
usage_list_.pop_back();
data_.erase(lru);
}
}
usage_list_.push_front(key);
return data_[key];
}
auto
find(Key const& key)
{
return data_.find(key);
}
auto
find(Key const& key) const
{
return data_.find(key);
}
auto
begin()
{
return data_.begin();
}
auto
begin() const
{
return data_.begin();
}
auto
end()
{
return data_.end();
}
auto
end() const
{
return data_.end();
}
bool
erase(Key const& key)
{
auto it = data_.find(key);
if (it == data_.end())
return false;
for (auto list_it = usage_list_.begin(); list_it != usage_list_.end();
++list_it)
{
if (*list_it == key)
{
usage_list_.erase(list_it);
break;
}
}
data_.erase(it);
return true;
}
std::size_t
size() const noexcept
{
return data_.size();
}
std::size_t
capacity() const noexcept
{
return capacity_;
}
void
clear()
{
data_.clear();
usage_list_.clear();
}
private:
void
bump_to_front(Key const& key)
{
for (auto it = usage_list_.begin(); it != usage_list_.end(); ++it)
{
if (*it == key)
{
usage_list_.erase(it);
usage_list_.push_front(key);
return;
}
}
}
std::size_t capacity_;
std::map<Key, Value> data_;
std::list<Key> usage_list_;
};
} // namespace ripple
#endif

View File

@@ -33,6 +33,14 @@ DatabaseNodeImp::store(
auto obj = NodeObject::createObject(type, std::move(data), hash);
backend_->store(obj);
if (cache_)
{
// After the store, replace a negative cache entry if there is one
cache_->canonicalize(
hash, obj, [](std::shared_ptr<NodeObject> const& n) {
return n->getType() == hotDUMMY;
});
}
}
void
@@ -41,12 +49,23 @@ DatabaseNodeImp::asyncFetch(
std::uint32_t ledgerSeq,
std::function<void(std::shared_ptr<NodeObject> const&)>&& callback)
{
if (cache_)
{
std::shared_ptr<NodeObject> obj = cache_->fetch(hash);
if (obj)
{
callback(obj->getType() == hotDUMMY ? nullptr : obj);
return;
}
}
Database::asyncFetch(hash, ledgerSeq, std::move(callback));
}
void
DatabaseNodeImp::sweep()
{
if (cache_)
cache_->sweep();
}
std::shared_ptr<NodeObject>
@@ -56,33 +75,64 @@ DatabaseNodeImp::fetchNodeObject(
FetchReport& fetchReport,
bool duplicate)
{
std::shared_ptr<NodeObject> nodeObject = nullptr;
Status status;
std::shared_ptr<NodeObject> nodeObject =
cache_ ? cache_->fetch(hash) : nullptr;
try
if (!nodeObject)
{
status = backend_->fetch(hash.data(), &nodeObject);
}
catch (std::exception const& e)
{
JLOG(j_.fatal()) << "fetchNodeObject " << hash
<< ": Exception fetching from backend: " << e.what();
Rethrow();
}
JLOG(j_.trace()) << "fetchNodeObject " << hash << ": record not "
<< (cache_ ? "cached" : "found");
switch (status)
Status status;
try
{
status = backend_->fetch(hash.data(), &nodeObject);
}
catch (std::exception const& e)
{
JLOG(j_.fatal())
<< "fetchNodeObject " << hash
<< ": Exception fetching from backend: " << e.what();
Rethrow();
}
switch (status)
{
case ok:
if (cache_)
{
if (nodeObject)
cache_->canonicalize_replace_client(hash, nodeObject);
else
{
auto notFound =
NodeObject::createObject(hotDUMMY, {}, hash);
cache_->canonicalize_replace_client(hash, notFound);
if (notFound->getType() != hotDUMMY)
nodeObject = notFound;
}
}
break;
case notFound:
break;
case dataCorrupt:
JLOG(j_.fatal()) << "fetchNodeObject " << hash
<< ": nodestore data is corrupted";
break;
default:
JLOG(j_.warn())
<< "fetchNodeObject " << hash
<< ": backend returns unknown result " << status;
break;
}
}
else
{
case ok:
case notFound:
break;
case dataCorrupt:
JLOG(j_.fatal()) << "fetchNodeObject " << hash
<< ": nodestore data is corrupted";
break;
default:
JLOG(j_.warn()) << "fetchNodeObject " << hash
<< ": backend returns unknown result " << status;
break;
JLOG(j_.trace()) << "fetchNodeObject " << hash
<< ": record found in cache";
if (nodeObject->getType() == hotDUMMY)
nodeObject.reset();
}
if (nodeObject)
@@ -94,33 +144,71 @@ DatabaseNodeImp::fetchNodeObject(
std::vector<std::shared_ptr<NodeObject>>
DatabaseNodeImp::fetchBatch(std::vector<uint256> const& hashes)
{
std::vector<std::shared_ptr<NodeObject>> results{hashes.size()};
using namespace std::chrono;
auto const before = steady_clock::now();
std::vector<uint256 const*> batch{hashes.size()};
std::unordered_map<uint256 const*, size_t> indexMap;
std::vector<uint256 const*> cacheMisses;
uint64_t hits = 0;
uint64_t fetches = 0;
for (size_t i = 0; i < hashes.size(); ++i)
{
auto const& hash = hashes[i];
batch.push_back(&hash);
// See if the object already exists in the cache
auto nObj = cache_ ? cache_->fetch(hash) : nullptr;
++fetches;
if (!nObj)
{
// Try the database
indexMap[&hash] = i;
cacheMisses.push_back(&hash);
}
else
{
results[i] = nObj->getType() == hotDUMMY ? nullptr : nObj;
// It was in the cache.
++hits;
}
}
std::vector<std::shared_ptr<NodeObject>> results{hashes.size()};
results = backend_->fetchBatch(batch).first;
for (size_t i = 0; i < results.size(); ++i)
JLOG(j_.debug()) << "fetchBatch - cache hits = "
<< (hashes.size() - cacheMisses.size())
<< " - cache misses = " << cacheMisses.size();
auto dbResults = backend_->fetchBatch(cacheMisses).first;
for (size_t i = 0; i < dbResults.size(); ++i)
{
if (!results[i])
auto nObj = std::move(dbResults[i]);
size_t index = indexMap[cacheMisses[i]];
auto const& hash = hashes[index];
if (nObj)
{
// Ensure all threads get the same object
if (cache_)
cache_->canonicalize_replace_client(hash, nObj);
}
else
{
JLOG(j_.error())
<< "fetchBatch - "
<< "record not found in db. hash = " << strHex(hashes[i]);
<< "record not found in db or cache. hash = " << strHex(hash);
if (cache_)
{
auto notFound = NodeObject::createObject(hotDUMMY, {}, hash);
cache_->canonicalize_replace_client(hash, notFound);
if (notFound->getType() != hotDUMMY)
nObj = std::move(notFound);
}
}
results[index] = std::move(nObj);
}
auto fetchDurationUs =
std::chrono::duration_cast<std::chrono::microseconds>(
steady_clock::now() - before)
.count();
updateFetchMetrics(hashes.size(), 0, fetchDurationUs);
updateFetchMetrics(fetches, hits, fetchDurationUs);
return results;
}

View File

@@ -45,6 +45,38 @@ public:
: Database(scheduler, readThreads, config, j)
, backend_(std::move(backend))
{
std::optional<int> cacheSize, cacheAge;
if (config.exists("cache_size"))
{
cacheSize = get<int>(config, "cache_size");
if (cacheSize.value() < 0)
{
Throw<std::runtime_error>(
"Specified negative value for cache_size");
}
}
if (config.exists("cache_age"))
{
cacheAge = get<int>(config, "cache_age");
if (cacheAge.value() < 0)
{
Throw<std::runtime_error>(
"Specified negative value for cache_age");
}
}
if (cacheSize != 0 || cacheAge != 0)
{
cache_ = std::make_shared<TaggedCache<uint256, NodeObject>>(
"DatabaseNodeImp",
cacheSize.value_or(0),
std::chrono::minutes(cacheAge.value_or(0)),
stopwatch(),
j);
}
XRPL_ASSERT(
backend_,
"ripple::NodeStore::DatabaseNodeImp::DatabaseNodeImp : non-null "
@@ -105,6 +137,9 @@ public:
sweep() override;
private:
// Cache for database objects. This cache is not always initialized. Check
// for null before using.
std::shared_ptr<TaggedCache<uint256, NodeObject>> cache_;
// Persistent key/value storage
std::shared_ptr<Backend> backend_;

View File

@@ -3440,7 +3440,7 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
if (!m->has_ledgerhash())
info += ", no hash specified";
JLOG(p_journal_.error())
JLOG(p_journal_.warn())
<< "processLedgerRequest: getNodeFat with nodeId "
<< *shaMapNodeId << " and ledger info type " << info
<< " throws exception: " << e.what();

View File

@@ -31,7 +31,7 @@ struct JsonContext;
Json::Value
doStop(RPC::JsonContext& context)
{
context.app.signalStop();
context.app.signalStop("RPC");
return RPC::makeObjectValue(systemName() + " server stopping");
}

View File

@@ -35,8 +35,8 @@ namespace {
// element must be the number of children in a dense array.
constexpr std::array<std::uint8_t, 4> boundaries{
2,
3,
5,
4,
6,
SHAMapInnerNode::branchFactor};
static_assert(
boundaries.size() <= 4,