Add true partitioning support to TaggedCache:

The primary change introduced in this commit is partitioning of
the `TaggedCache`, with each partition being indepedent of each
other, making it possible to potentially perform multiple cache
operations in parallel. In particular, the `sweep` operation is
now parallelized by default on systems with at least four cores
present.

The `TaggedCache` could also be instantiated in 'key-only' mode
which complicated the interface significantly but was only used
by a single consumer (the `FullBelowCache`). This commit splits
the 'key-only' functionality of `TaggedCache`, and incorporates
directly into `FullBelowCache`, resulting in simple and cleaner
interfaces for both `TaggedCache` and `FullBelowCache` but at a
cost: some code duplication.

Lastly, this commit includes a medley of changes, including the
restructuring of `Transaction`, reducing its size by 48 bytes.
This commit is contained in:
Nik Bougalis
2022-09-27 22:39:02 -07:00
parent 8a7913a996
commit ce3f0fcc23
46 changed files with 1712 additions and 1363 deletions

View File

@@ -764,7 +764,6 @@ if (tests)
src/test/basics/Expected_test.cpp
src/test/basics/FileUtilities_test.cpp
src/test/basics/IOUAmount_test.cpp
src/test/basics/KeyCache_test.cpp
src/test/basics/Number_test.cpp
src/test/basics/PerfLog_test.cpp
src/test/basics/RangeSet_test.cpp

View File

@@ -79,6 +79,7 @@ ripple.server > ripple.protocol
ripple.shamap > ripple.basics
ripple.shamap > ripple.beast
ripple.shamap > ripple.crypto
ripple.shamap > ripple.json
ripple.shamap > ripple.nodestore
ripple.shamap > ripple.protocol
test.app > ripple.app

View File

@@ -46,7 +46,7 @@ ConsensusTransSetSF::gotNode(
if (fromFilter)
return;
m_nodeCache.insert(nodeHash, nodeData);
m_nodeCache.insert(nodeHash.as_uint256(), nodeData);
if ((type == SHAMapNodeType::tnTRANSACTION_NM) && (nodeData.size() > 16))
{
@@ -78,23 +78,20 @@ ConsensusTransSetSF::gotNode(
std::optional<Blob>
ConsensusTransSetSF::getNode(SHAMapHash const& nodeHash) const
{
Blob nodeData;
if (m_nodeCache.retrieve(nodeHash, nodeData))
return nodeData;
auto const nh = nodeHash.as_uint256();
auto txn =
app_.getMasterTransaction().fetch_from_cache(nodeHash.as_uint256());
if (auto e = m_nodeCache.fetch(nh))
return *e; // This requires copying the blob.
if (txn)
if (auto txn = app_.getMasterTransaction().fetch_from_cache(nh))
{
// this is a transaction, and we have it
JLOG(j_.trace()) << "Node in our acquiring TX set is TXN we have";
Serializer s;
s.add32(HashPrefix::transactionID);
txn->getSTransaction()->add(s);
assert(sha512Half(s.slice()) == nodeHash.as_uint256());
nodeData = s.peekData();
return nodeData;
assert(sha512Half(s.slice()) == nh);
return std::move(s.modData());
}
return std::nullopt;

View File

@@ -34,7 +34,7 @@ namespace ripple {
class ConsensusTransSetSF : public SHAMapSyncFilter
{
public:
using NodeCache = TaggedCache<SHAMapHash, Blob>;
using NodeCache = TaggedCache<uint256, Blob>;
ConsensusTransSetSF(Application& app, NodeCache& nodeCache);

View File

@@ -34,13 +34,13 @@ LedgerHistory::LedgerHistory(
: app_(app)
, collector_(collector)
, mismatch_counter_(collector->make_counter("ledger.history", "mismatch"))
, m_ledgers_by_hash(
, ledgerCache_(
"LedgerCache",
app_.config().getValueFor(SizedItem::ledgerSize),
std::chrono::seconds{app_.config().getValueFor(SizedItem::ledgerAge)},
stopwatch(),
app_.journal("TaggedCache"))
, m_consensus_validated(
, consensusValidated_(
"ConsensusValidated",
64,
std::chrono::minutes{5},
@@ -60,10 +60,11 @@ LedgerHistory::insert(
assert(ledger->stateMap().getHash().isNonZero());
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
const bool alreadyHad =
ledgerCache_.insert_or_assign(ledger->info().hash, ledger);
std::unique_lock sl(mutex_);
const bool alreadyHad = m_ledgers_by_hash.canonicalize_replace_cache(
ledger->info().hash, ledger);
if (validated)
mLedgersByIndex[ledger->info().seq] = ledger->info().hash;
@@ -73,9 +74,11 @@ LedgerHistory::insert(
LedgerHash
LedgerHistory::getLedgerHash(LedgerIndex index)
{
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
std::unique_lock sl(mutex_);
if (auto it = mLedgersByIndex.find(index); it != mLedgersByIndex.end())
return it->second;
return {};
}
@@ -83,10 +86,9 @@ std::shared_ptr<Ledger const>
LedgerHistory::getLedgerBySeq(LedgerIndex index)
{
{
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
auto it = mLedgersByIndex.find(index);
std::unique_lock sl(mutex_);
if (it != mLedgersByIndex.end())
if (auto it = mLedgersByIndex.find(index); it != mLedgersByIndex.end())
{
uint256 hash = it->second;
sl.unlock();
@@ -100,22 +102,19 @@ LedgerHistory::getLedgerBySeq(LedgerIndex index)
return ret;
assert(ret->info().seq == index);
assert(ret->isImmutable());
ledgerCache_.retrieve_or_insert(ret->info().hash, ret);
{
// Add this ledger to the local tracking by index
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
assert(ret->isImmutable());
m_ledgers_by_hash.canonicalize_replace_client(ret->info().hash, ret);
mLedgersByIndex[ret->info().seq] = ret->info().hash;
return (ret->info().seq == index) ? ret : nullptr;
}
// Add this ledger to the local tracking by index
std::lock_guard sl(mutex_);
mLedgersByIndex[ret->info().seq] = ret->info().hash;
return (ret->info().seq == index) ? ret : nullptr;
}
std::shared_ptr<Ledger const>
LedgerHistory::getLedgerByHash(LedgerHash const& hash)
{
auto ret = m_ledgers_by_hash.fetch(hash);
auto ret = ledgerCache_.fetch(hash);
if (ret)
{
@@ -131,7 +130,7 @@ LedgerHistory::getLedgerByHash(LedgerHash const& hash)
assert(ret->isImmutable());
assert(ret->info().hash == hash);
m_ledgers_by_hash.canonicalize_replace_client(ret->info().hash, ret);
ledgerCache_.retrieve_or_insert(ret->info().hash, ret);
assert(ret->info().hash == hash);
return ret;
@@ -299,21 +298,6 @@ log_metadata_difference(
}
//------------------------------------------------------------------------------
// Return list of leaves sorted by key
static std::vector<SHAMapItem const*>
leaves(SHAMap const& sm)
{
std::vector<SHAMapItem const*> v;
for (auto const& item : sm)
v.push_back(&item);
std::sort(
v.begin(), v.end(), [](SHAMapItem const* lhs, SHAMapItem const* rhs) {
return lhs->key() < rhs->key();
});
return v;
}
void
LedgerHistory::handleMismatch(
LedgerHash const& built,
@@ -341,9 +325,10 @@ LedgerHistory::handleMismatch(
if (auto stream = j_.debug())
{
stream << "Built: " << getJson({*builtLedger, {}});
stream << "Valid: " << getJson({*validLedger, {}});
stream << "Consensus: " << consensus;
stream << "Mismatch on " << builtLedger->info().seq << ":\n"
<< " Built: " << getJson({*builtLedger, {}}) << "\n"
<< " Valid: " << getJson({*validLedger, {}}) << "\n"
<< " Consensus: " << consensus;
}
// Determine the mismatch reason, distinguishing Byzantine
@@ -375,6 +360,23 @@ LedgerHistory::handleMismatch(
<< to_string(*builtConsensusHash);
}
// Grab the leaves from the specified SHAMap and sort them by key:
auto leaves = [](SHAMap const& sm) {
std::vector<SHAMapItem const*> v;
for (auto const& item : sm)
v.push_back(&item);
std::sort(
v.begin(),
v.end(),
[](SHAMapItem const* lhs, SHAMapItem const* rhs) {
return lhs->key() < rhs->key();
});
return v;
};
// Find differences between built and valid ledgers
auto const builtTx = leaves(builtLedger->txMap());
auto const validTx = leaves(validLedger->txMap());
@@ -432,10 +434,8 @@ LedgerHistory::builtLedger(
LedgerHash hash = ledger->info().hash;
assert(!hash.isZero());
std::unique_lock sl(m_consensus_validated.peekMutex());
auto entry = std::make_shared<cv_entry>();
m_consensus_validated.canonicalize_replace_client(index, entry);
consensusValidated_.retrieve_or_insert(index, entry);
if (entry->validated && !entry->built)
{
@@ -472,61 +472,65 @@ LedgerHistory::validatedLedger(
LedgerHash hash = ledger->info().hash;
assert(!hash.isZero());
std::unique_lock sl(m_consensus_validated.peekMutex());
auto entry = std::make_shared<cv_entry>();
m_consensus_validated.canonicalize_replace_client(index, entry);
consensusValidated_.retrieve_or_insert(index, entry);
if (entry->built && !entry->validated)
if (entry->built && !entry->validated && entry->built.value() != hash)
{
if (entry->built.value() != hash)
{
JLOG(j_.error())
<< "MISMATCH: seq=" << index
<< " built:" << entry->built.value() << " then:" << hash;
handleMismatch(
entry->built.value(),
hash,
entry->builtConsensusHash,
consensusHash,
entry->consensus.value());
}
else
{
// We built a ledger locally and then validated it
JLOG(j_.debug()) << "MATCH: seq=" << index;
}
JLOG(j_.error()) << "Mismatch on validated ledger (seq " << index
<< "): built is" << entry->built.value()
<< ", validated is:" << hash;
handleMismatch(
entry->built.value(),
hash,
entry->builtConsensusHash,
consensusHash,
entry->consensus.value());
}
entry->validated.emplace(hash);
entry->validatedConsensusHash = consensusHash;
}
/** Ensure m_ledgers_by_hash doesn't have the wrong hash for a particular index
/** Ensure ledgerCache_ doesn't have the wrong hash for a particular index
*/
bool
LedgerHistory::fixIndex(LedgerIndex ledgerIndex, LedgerHash const& ledgerHash)
{
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
auto it = mLedgersByIndex.find(ledgerIndex);
std::lock_guard sl(mutex_);
if ((it != mLedgersByIndex.end()) && (it->second != ledgerHash))
if (auto it = mLedgersByIndex.find(ledgerIndex);
(it != mLedgersByIndex.end()) && (it->second != ledgerHash))
{
it->second = ledgerHash;
return false;
}
return true;
}
void
LedgerHistory::clearLedgerCachePrior(LedgerIndex seq)
{
for (LedgerHash it : m_ledgers_by_hash.getKeys())
ledgerCache_.erase_if(
[seq](Ledger const& ledger) { return ledger.info().seq < seq; });
}
Json::Value
LedgerHistory::info() const
{
Json::Value ret(Json::objectValue);
ret["lc"] = ledgerCache_.info();
ret["cv"] = consensusValidated_.info();
{
auto const ledger = getLedgerByHash(it);
if (!ledger || ledger->info().seq < seq)
m_ledgers_by_hash.del(it, false);
std::lock_guard sl(mutex_);
ret["lbi"] = std::to_string(mLedgersByIndex.size());
}
return ret;
}
} // namespace ripple

View File

@@ -24,6 +24,7 @@
#include <ripple/app/main/Application.h>
#include <ripple/beast/insight/Collector.h>
#include <ripple/beast/insight/Event.h>
#include <ripple/json/json_value.h>
#include <ripple/protocol/RippleLedgerHash.h>
#include <optional>
@@ -46,15 +47,6 @@ public:
bool
insert(std::shared_ptr<Ledger const> const& ledger, bool validated);
/** Get the ledgers_by_hash cache hit rate
@return the hit rate
*/
float
getCacheHitRate()
{
return m_ledgers_by_hash.getHitRate();
}
/** Get a ledger given its sequence number */
std::shared_ptr<Ledger const>
getLedgerBySeq(LedgerIndex ledgerIndex);
@@ -75,8 +67,8 @@ public:
void
sweep()
{
m_ledgers_by_hash.sweep();
m_consensus_validated.sweep();
ledgerCache_.sweep();
consensusValidated_.sweep();
}
/** Report that we have locally built a particular ledger */
@@ -103,6 +95,9 @@ public:
void
clearLedgerCachePrior(LedgerIndex seq);
Json::Value
info() const;
private:
/** Log details in the case where we build one ledger but
validate a different one.
@@ -126,9 +121,7 @@ private:
beast::insight::Collector::ptr collector_;
beast::insight::Counter mismatch_counter_;
using LedgersByHash = TaggedCache<LedgerHash, Ledger const>;
LedgersByHash m_ledgers_by_hash;
TaggedCache<LedgerHash, Ledger const> ledgerCache_;
// Maps ledger indexes to the corresponding hashes
// For debug and logging purposes
@@ -145,8 +138,11 @@ private:
// Consensus metadata of built ledger
std::optional<Json::Value> consensus;
};
using ConsensusValidated = TaggedCache<LedgerIndex, cv_entry>;
ConsensusValidated m_consensus_validated;
TaggedCache<LedgerIndex, cv_entry> consensusValidated_;
// Protects mLedgersByIndex
std::mutex mutable mutex_;
// Maps ledger indexes to the corresponding hash.
std::map<LedgerIndex, LedgerHash> mLedgersByIndex; // validated ledgers

View File

@@ -33,6 +33,7 @@
#include <ripple/basics/UptimeClock.h>
#include <ripple/basics/chrono.h>
#include <ripple/beast/insight/Collector.h>
#include <ripple/json/json_value.h>
#include <ripple/protocol/Protocol.h>
#include <ripple/protocol/RippleLedgerHash.h>
#include <ripple/protocol/STValidation.h>
@@ -221,8 +222,6 @@ public:
void
sweep();
float
getCacheHitRate();
void
checkAccept(std::shared_ptr<Ledger const> const& ledger);
@@ -296,6 +295,9 @@ public:
std::optional<uint256>
txnIDfromIndex(uint32_t ledgerSeq, uint32_t txnIndex);
Json::Value
info() const;
private:
void
setValidLedger(std::shared_ptr<Ledger const> const& l);
@@ -368,7 +370,7 @@ private:
// A set of transactions to replay during the next close
std::unique_ptr<LedgerReplay> replayData;
std::recursive_mutex mCompleteLock;
std::recursive_mutex mutable mCompleteLock;
RangeSet<std::uint32_t> mCompleteLedgers;
// Publish thread is running.

View File

@@ -76,14 +76,14 @@ public:
bool
inLedger(uint256 const& hash, std::uint32_t ledger);
void
canonicalize(std::shared_ptr<Transaction>* pTransaction);
[[nodiscard]] std::shared_ptr<Transaction>
canonicalize(std::shared_ptr<Transaction> tx);
void
sweep(void);
sweep();
TaggedCache<uint256, Transaction>&
getCache();
Json::Value
info() const;
private:
Application& mApp;

View File

@@ -1881,12 +1881,6 @@ LedgerMaster::sweep()
fetch_packs_.sweep();
}
float
LedgerMaster::getCacheHitRate()
{
return mLedgerHistory.getCacheHitRate();
}
void
LedgerMaster::clearPriorLedgers(LedgerIndex seq)
{
@@ -2143,20 +2137,22 @@ LedgerMaster::doAdvance(std::unique_lock<std::recursive_mutex>& sl)
void
LedgerMaster::addFetchPack(uint256 const& hash, std::shared_ptr<Blob> data)
{
fetch_packs_.canonicalize_replace_client(hash, data);
fetch_packs_.retrieve_or_insert(hash, data);
}
std::optional<Blob>
LedgerMaster::getFetchPack(uint256 const& hash)
{
Blob data;
if (fetch_packs_.retrieve(hash, data))
std::optional<Blob> ret;
// We retrieve the data from the cache _and_ remove it.
if (auto bp = fetch_packs_.fetch(hash, true);
bp && hash == sha512Half(makeSlice(*bp)))
{
fetch_packs_.del(hash, false);
if (hash == sha512Half(makeSlice(data)))
return data;
ret.emplace(std::move(*bp));
}
return std::nullopt;
return ret;
}
void
@@ -2366,6 +2362,34 @@ LedgerMaster::getFetchPackCacheSize() const
return fetch_packs_.getCacheSize();
}
Json::Value
LedgerMaster::info() const
{
Json::Value ret(Json::objectValue);
ret["fetch_packs"] = fetch_packs_.info();
{
std::lock_guard sl(mCompleteLock);
ret["complete_ledgers"] = to_string(mCompleteLedgers);
}
{
std::lock_guard sl(m_mutex);
ret["validated"] = Json::objectValue;
ret["validated"]["hash"] = to_string(mLastValidLedger.first);
ret["validated"]["index"] = mLastValidLedger.second;
}
ret["history"] = mLedgerHistory.info();
ret["fetch_depth"] = fetch_depth_;
ret["ledger_history"] = ledger_history_;
ret["ledger_fetch_size"] = ledger_fetch_size_;
return ret;
}
// Returns the minimum ledger sequence in SQL database, if any.
std::optional<LedgerIndex>
LedgerMaster::minSqlSeq()

View File

@@ -73,7 +73,7 @@ TransactionMaster::fetch(uint256 const& txnID, error_code_i& ec)
auto [txn, txnMeta] = std::get<TxPair>(v);
if (txn)
mCache.canonicalize_replace_client(txnID, txn);
mCache.retrieve_or_insert(txnID, txn);
return std::pair{std::move(txn), std::move(txnMeta)};
}
@@ -100,7 +100,7 @@ TransactionMaster::fetch(
auto [txn, txnMeta] = std::get<TxPair>(v);
if (txn)
mCache.canonicalize_replace_client(txnID, txn);
mCache.retrieve_or_insert(txnID, txn);
return std::pair{std::move(txn), std::move(txnMeta)};
}
@@ -139,29 +139,26 @@ TransactionMaster::fetch(
return txn;
}
void
TransactionMaster::canonicalize(std::shared_ptr<Transaction>* pTransaction)
std::shared_ptr<Transaction>
TransactionMaster::canonicalize(std::shared_ptr<Transaction> tx)
{
uint256 const tid = (*pTransaction)->getID();
if (tid != beast::zero)
{
auto txn = *pTransaction;
// VFALCO NOTE canonicalize can change the value of txn!
mCache.canonicalize_replace_client(tid, txn);
*pTransaction = txn;
}
// Note that retrieve_or_insert can change the value of tx
if (uint256 const tid = tx->getID(); tid != beast::zero)
mCache.retrieve_or_insert(tid, tx);
return tx;
}
void
TransactionMaster::sweep(void)
TransactionMaster::sweep()
{
mCache.sweep();
}
TaggedCache<uint256, Transaction>&
TransactionMaster::getCache()
Json::Value
TransactionMaster::info() const
{
return mCache;
return mCache.info();
}
} // namespace ripple

View File

@@ -54,17 +54,13 @@ class ShardArchiveHandler;
// VFALCO TODO Fix forward declares required for header dependency loops
class AmendmentTable;
template <
class Key,
class T,
bool IsKeyCache,
class Hash,
class KeyEqual,
class Mutex>
template <class Key, class Value, std::size_t N, class Hash, class KeyEqual>
class TaggedCache;
class STLedgerEntry;
using SLE = STLedgerEntry;
using CachedSLEs = TaggedCache<uint256, SLE const>;
class CachedSLEs;
class CollectorManager;
class Family;
@@ -105,7 +101,7 @@ class SHAMapStore;
class ReportingETL;
using NodeCache = TaggedCache<SHAMapHash, Blob>;
using NodeCache = TaggedCache<uint256, Blob>;
template <class Adaptor>
class Validations;

View File

@@ -1183,9 +1183,7 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
return;
}
std::string reason;
auto tx = std::make_shared<Transaction>(trans, reason, app_);
auto tx = std::make_shared<Transaction>(trans);
m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx]() {
auto t = tx;
@@ -1250,8 +1248,7 @@ NetworkOPsImp::processTransaction(
return;
}
// canonicalize can change our pointer
app_.getMasterTransaction().canonicalize(&transaction);
transaction = app_.getMasterTransaction().canonicalize(transaction);
if (bLocal)
doTransactionSync(transaction, bUnlimited, failType);
@@ -1454,9 +1451,8 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
auto const txNext = m_ledgerMaster.popAcctTransaction(txCur);
if (txNext)
{
std::string reason;
auto const trans = sterilize(*txNext);
auto t = std::make_shared<Transaction>(trans, reason, app_);
auto t = std::make_shared<Transaction>(trans);
submit_held.emplace_back(t, false, false, FailHard::no);
t->setApplying();
}
@@ -2984,7 +2980,7 @@ NetworkOPsImp::pubLedger(std::shared_ptr<ReadView const> const& lpAccepted)
if (!alpAccepted)
{
alpAccepted = std::make_shared<AcceptedLedger>(lpAccepted, app_);
app_.getAcceptedLedgerCache().canonicalize_replace_client(
app_.getAcceptedLedgerCache().retrieve_or_insert(
lpAccepted->info().hash, alpAccepted);
}

View File

@@ -290,8 +290,7 @@ SHAMapStoreImp::run()
LedgerIndex lastRotated = state_db_.getState().lastRotated;
netOPs_ = &app_.getOPs();
ledgerMaster_ = &app_.getLedgerMaster();
fullBelowCache_ = &(*app_.getNodeFamily().getFullBelowCache(0));
treeNodeCache_ = &(*app_.getNodeFamily().getTreeNodeCache(0));
fullBelowCache_ = app_.getNodeFamily().getFullBelowCache(0);
if (advisoryDelete_)
canDelete_ = state_db_.getCanDelete();
@@ -388,13 +387,6 @@ SHAMapStoreImp::run()
JLOG(journal_.debug()) << "copied ledger " << validatedSeq
<< " nodecount " << nodeCount;
JLOG(journal_.debug()) << "freshening caches";
freshenCaches();
if (healthWait() == stopping)
return;
// Only log if we completed without a "health" abort
JLOG(journal_.debug()) << validatedSeq << " freshened caches";
JLOG(journal_.trace()) << "Making a new backend";
auto newBackend = makeBackendRotating();
JLOG(journal_.debug())
@@ -604,15 +596,6 @@ SHAMapStoreImp::clearCaches(LedgerIndex validatedSeq)
fullBelowCache_->clear();
}
void
SHAMapStoreImp::freshenCaches()
{
if (freshenCache(*treeNodeCache_))
return;
if (freshenCache(app_.getMasterTransaction().getCache()))
return;
}
void
SHAMapStoreImp::clearPrior(LedgerIndex lastRotated)
{

View File

@@ -114,8 +114,8 @@ private:
// as of run() or before
NetworkOPs* netOPs_ = nullptr;
LedgerMaster* ledgerMaster_ = nullptr;
FullBelowCache* fullBelowCache_ = nullptr;
TreeNodeCache* treeNodeCache_ = nullptr;
std::shared_ptr<FullBelowCache> fullBelowCache_;
static constexpr auto nodeStoreName_ = "NodeStore";
@@ -188,23 +188,6 @@ private:
std::unique_ptr<NodeStore::Backend>
makeBackendRotating(std::string path = std::string());
template <class CacheInstance>
bool
freshenCache(CacheInstance& cache)
{
std::uint64_t check = 0;
for (auto const& key : cache.getKeys())
{
dbRotating_->fetchNodeObject(
key, 0, NodeStore::FetchType::synchronous, true);
if (!(++check % checkHealthInterval_) && healthWait() == stopping)
return true;
}
return false;
}
/** delete from sqlite table in batches to not lock the db excessively.
* Pause briefly to extend access time to other users.
* Call with mutex object unlocked.
@@ -218,8 +201,6 @@ private:
void
clearCaches(LedgerIndex validatedSeq);
void
freshenCaches();
void
clearPrior(LedgerIndex lastRotated);
/**

View File

@@ -21,7 +21,6 @@
#define RIPPLE_APP_MISC_TRANSACTION_H_INCLUDED
#include <ripple/basics/RangeSet.h>
#include <ripple/beast/utility/Journal.h>
#include <ripple/protocol/ErrorCodes.h>
#include <ripple/protocol/Protocol.h>
#include <ripple/protocol/STTx.h>
@@ -42,7 +41,7 @@ class Application;
class Database;
class Rules;
enum TransStatus {
enum TransStatus : std::uint8_t {
NEW = 0, // just received / generated
INVALID = 1, // no valid signature, insufficient funds
INCLUDED = 2, // added to the current ledger
@@ -56,33 +55,28 @@ enum TransStatus {
enum class TxSearched { all, some, unknown };
// The boost::optional parameter is because SOCI requires
// boost::optional (not std::optional) parameters.
TransStatus
sqlTransactionStatus(boost::optional<std::string> const& status);
// This class is for constructing and examining transactions.
// Transactions are static so manipulation functions are unnecessary.
class Transaction : public std::enable_shared_from_this<Transaction>,
public CountedObject<Transaction>
{
/** Flags used for the state of a transaction. */
constexpr static std::uint8_t const applied_ = 1;
constexpr static std::uint8_t const broadcast_ = 2;
constexpr static std::uint8_t const queued_ = 4;
constexpr static std::uint8_t const kept_ = 8;
constexpr static std::uint8_t const applying_ = 16;
public:
using pointer = std::shared_ptr<Transaction>;
using ref = const pointer&;
Transaction(
std::shared_ptr<STTx const> const&,
std::string&,
Application&) noexcept;
// The two boost::optional parameters are because SOCI requires
// boost::optional (not std::optional) parameters.
static Transaction::pointer
transactionFromSQL(
boost::optional<std::uint64_t> const& ledgerSeq,
boost::optional<std::string> const& status,
Blob const& rawTxn,
Application& app);
// The boost::optional parameter is because SOCI requires
// boost::optional (not std::optional) parameters.
static TransStatus
sqlTransactionStatus(boost::optional<std::string> const& status);
Transaction(std::shared_ptr<STTx const> const& stx) noexcept
: mTransaction(stx)
{
}
std::shared_ptr<STTx const> const&
getSTransaction()
@@ -90,10 +84,10 @@ public:
return mTransaction;
}
uint256 const&
uint256
getID() const
{
return mTransactionID;
return mTransaction->getTransactionID();
}
LedgerIndex
@@ -111,7 +105,7 @@ public:
TransStatus
getStatus() const
{
return mStatus;
return status_;
}
TER
@@ -127,18 +121,16 @@ public:
}
void
setStatus(TransStatus status, std::uint32_t ledgerSeq);
setStatus(TransStatus status, std::uint32_t ledgerSeq)
{
status_ = status;
mInLedger = ledgerSeq;
}
void
setStatus(TransStatus status)
{
mStatus = status;
}
void
setLedger(LedgerIndex ledger)
{
mInLedger = ledger;
status_ = status;
}
/**
@@ -147,7 +139,7 @@ public:
void
setApplying()
{
mApplying = true;
state_ |= applying_;
}
/**
@@ -158,7 +150,7 @@ public:
bool
getApplying()
{
return mApplying;
return (state_ & applying_) == applying_;
}
/**
@@ -167,37 +159,33 @@ public:
void
clearApplying()
{
mApplying = false;
state_ &= ~applying_;
}
struct SubmitResult
class SubmitResult
{
/**
* @brief clear Clear all states
*/
void
clear()
friend class Transaction;
SubmitResult(std::uint8_t state)
: applied(state & applied_)
, broadcast(state & broadcast_)
, queued(state & queued_)
, kept(state & kept_)
{
applied = false;
broadcast = false;
queued = false;
kept = false;
}
/**
* @brief any Get true of any state is true
* @return True if any state if true
*/
public:
bool const applied;
bool const broadcast;
bool const queued;
bool const kept;
/** Returns true if any state if true */
bool
any() const
{
return applied || broadcast || queued || kept;
}
bool applied = false;
bool broadcast = false;
bool queued = false;
bool kept = false;
};
/**
@@ -207,7 +195,7 @@ public:
SubmitResult
getSubmitResult() const
{
return submitResult_;
return {state_.load()};
}
/**
@@ -216,43 +204,35 @@ public:
void
clearSubmitResult()
{
submitResult_.clear();
state_ &= ~(applied_ | broadcast_ | queued_ | kept_);
}
/**
* @brief setApplied Set this flag once was applied to open ledger
*/
/** Note that the transaction was applied to open ledger */
void
setApplied()
{
submitResult_.applied = true;
state_ |= applied_;
}
/**
* @brief setQueued Set this flag once was put into heldtxns queue
*/
/** Note that the trnasaction was put into heldtxns queue */
void
setQueued()
{
submitResult_.queued = true;
state_ |= queued_;
}
/**
* @brief setBroadcast Set this flag once was broadcasted via network
*/
/** Note that the transaction was broadcast via network */
void
setBroadcast()
{
submitResult_.broadcast = true;
state_ |= broadcast_;
}
/**
* @brief setKept Set this flag once was put to localtxns queue
*/
/** Note that the transaction was put to localtxns queue */
void
setKept()
{
submitResult_.kept = true;
state_ |= kept_;
}
struct CurrentLedgerState
@@ -306,7 +286,7 @@ public:
}
Json::Value
getJson(JsonOptions options, bool binary = false) const;
getJson(Application& app, JsonOptions options, bool binary = false) const;
// Information used to locate a transaction.
// Contains a nodestore hash and ledger sequence pair if the transaction was
@@ -384,21 +364,20 @@ private:
std::optional<ClosedInterval<uint32_t>> const& range,
error_code_i& ec);
uint256 mTransactionID;
/** The ledger in which this transaction appears, or 0. */
LedgerIndex mInLedger = 0;
TransStatus mStatus = INVALID;
/** The result of applying this transaction. */
TER mResult = temUNCERTAIN;
bool mApplying = false;
/** different ways for transaction to be accepted */
SubmitResult submitResult_;
/** The current state of the transaction. */
std::atomic<std::uint8_t> state_ = 0;
std::optional<CurrentLedgerState> currentLedgerState_;
/** The status of the transaction. */
TransStatus status_ = NEW;
std::shared_ptr<STTx const> mTransaction;
Application& mApp;
beast::Journal j_;
std::optional<CurrentLedgerState> currentLedgerState_;
};
} // namespace ripple

View File

@@ -40,17 +40,14 @@ convertBlobsToTxResult(
{
SerialIter it(makeSlice(rawTxn));
auto txn = std::make_shared<STTx const>(it);
std::string reason;
auto tr = std::make_shared<Transaction>(txn, reason, app);
auto tr = std::make_shared<Transaction>(txn);
tr->setStatus(Transaction::sqlTransactionStatus(status));
tr->setLedger(ledger_index);
tr->setStatus(sqlTransactionStatus(status), ledger_index);
auto metaset =
std::make_shared<TxMeta>(tr->getID(), tr->getLedger(), rawMeta);
to.emplace_back(std::move(tr), metaset);
to.emplace_back(
std::move(tr),
std::make_shared<TxMeta>(tr->getID(), tr->getLedger(), rawMeta));
};
void

View File

@@ -35,38 +35,12 @@
namespace ripple {
Transaction::Transaction(
std::shared_ptr<STTx const> const& stx,
std::string& reason,
Application& app) noexcept
: mTransaction(stx), mApp(app), j_(app.journal("Ledger"))
{
try
{
mTransactionID = mTransaction->getTransactionID();
}
catch (std::exception& e)
{
reason = e.what();
return;
}
mStatus = NEW;
}
//
// Misc.
//
void
Transaction::setStatus(TransStatus ts, std::uint32_t lseq)
{
mStatus = ts;
mInLedger = lseq;
}
TransStatus
Transaction::sqlTransactionStatus(boost::optional<std::string> const& status)
sqlTransactionStatus(boost::optional<std::string> const& status)
{
char const c = (status) ? (*status)[0] : safe_cast<char>(txnSqlUnknown);
@@ -88,26 +62,6 @@ Transaction::sqlTransactionStatus(boost::optional<std::string> const& status)
return INVALID;
}
Transaction::pointer
Transaction::transactionFromSQL(
boost::optional<std::uint64_t> const& ledgerSeq,
boost::optional<std::string> const& status,
Blob const& rawTxn,
Application& app)
{
std::uint32_t const inLedger =
rangeCheckedCast<std::uint32_t>(ledgerSeq.value_or(0));
SerialIter it(makeSlice(rawTxn));
auto txn = std::make_shared<STTx const>(it);
std::string reason;
auto tr = std::make_shared<Transaction>(txn, reason, app);
tr->setStatus(sqlTransactionStatus(status));
tr->setLedger(inLedger);
return tr;
}
std::variant<
std::pair<std::shared_ptr<Transaction>, std::shared_ptr<TxMeta>>,
TxSearched>
@@ -165,7 +119,7 @@ Transaction::load(
// options 1 to include the date of the transaction
Json::Value
Transaction::getJson(JsonOptions options, bool binary) const
Transaction::getJson(Application& app, JsonOptions options, bool binary) const
{
Json::Value ret(mTransaction->getJson(JsonOptions::none, binary));
@@ -176,7 +130,7 @@ Transaction::getJson(JsonOptions options, bool binary) const
if (options == JsonOptions::include_date)
{
auto ct = mApp.getLedgerMaster().getCloseTimeBySeq(mInLedger);
auto ct = app.getLedgerMaster().getCloseTimeBySeq(mInLedger);
if (ct)
ret[jss::date] = ct->time_since_epoch().count();
}

View File

@@ -231,7 +231,7 @@ saveValidatedLedger(
if (!aLedger)
{
aLedger = std::make_shared<AcceptedLedger>(ledger, app);
app.getAcceptedLedgerCache().canonicalize_replace_client(
app.getAcceptedLedgerCache().retrieve_or_insert(
ledger->info().hash, aLedger);
}
}
@@ -617,6 +617,36 @@ getHashesByIndex(
return res;
}
/** Construct a transaction object from the database.
@param ledgerSeq if set, specifies the ledger in which this transaction was
was included.
@param status if set, specifies the status that the server believes this
transaction achieved.
@param rawTxn The transaction, in serialized format.
@param app The main application object.
@note The two optional parameters use boost::optional because that's the
interface that SOCI expects.
*/
static std::shared_ptr<Transaction>
transactionFromSQL(
boost::optional<std::uint64_t> const& ledgerSeq,
boost::optional<std::string> const& status,
Blob const& rawTxn,
Application& app)
{
std::uint32_t const inLedger =
rangeCheckedCast<std::uint32_t>(ledgerSeq.value_or(0));
SerialIter it(makeSlice(rawTxn));
auto txn = std::make_shared<STTx const>(it);
auto tr = std::make_shared<Transaction>(txn);
tr->setStatus(sqlTransactionStatus(status), inLedger);
return tr;
}
std::pair<std::vector<std::shared_ptr<Transaction>>, int>
getTxHistory(
soci::session& session,
@@ -656,8 +686,7 @@ getTxHistory(
else
rawTxn.clear();
if (auto trans = Transaction::transactionFromSQL(
ledgerSeq, status, rawTxn, app))
if (auto trans = transactionFromSQL(ledgerSeq, status, rawTxn, app))
{
total++;
txs.push_back(trans);
@@ -859,8 +888,7 @@ getAccountTxs(
else
txnMeta.clear();
auto txn =
Transaction::transactionFromSQL(ledgerSeq, status, rawTxn, app);
auto txn = transactionFromSQL(ledgerSeq, status, rawTxn, app);
if (txnMeta.empty())
{ // Work around a bug that could leave the metadata missing
@@ -1342,8 +1370,7 @@ getTransaction(
try
{
auto txn =
Transaction::transactionFromSQL(ledgerSeq, status, rawTxn, app);
auto txn = transactionFromSQL(ledgerSeq, status, rawTxn, app);
if (!ledgerSeq)
return std::pair{std::move(txn), nullptr};

View File

@@ -78,7 +78,7 @@ saveLedgerMeta(
if (!aLedger)
{
aLedger = std::make_shared<AcceptedLedger>(ledger, app);
app.getAcceptedLedgerCache().canonicalize_replace_client(
app.getAcceptedLedgerCache().retrieve_or_insert(
ledger->info().hash, aLedger);
}

View File

@@ -376,10 +376,8 @@ flatFetchTransactions(
else
{
auto& transactions = std::get<TxnsData>(ret);
std::string reason;
auto txnRet = std::make_shared<Transaction>(txn, reason, app);
txnRet->setLedger(ledgerSequences[i]);
txnRet->setStatus(COMMITTED);
auto txnRet = std::make_shared<Transaction>(txn);
txnRet->setStatus(COMMITTED, ledgerSequences[i]);
auto txMeta = std::make_shared<TxMeta>(
txnRet->getID(), ledgerSequences[i], *meta);
transactions.push_back(std::make_pair(txnRet, txMeta));
@@ -829,10 +827,8 @@ PostgresDatabaseImp::getTxHistory(LedgerIndex startIndex)
auto const& [sttx, meta] = txns[i];
assert(sttx);
std::string reason;
auto txn = std::make_shared<Transaction>(sttx, reason, app_);
txn->setLedger(ledgerSequences[i]);
txn->setStatus(COMMITTED);
auto txn = std::make_shared<Transaction>(sttx);
txn->setStatus(COMMITTED, ledgerSequences[i]);
ret.push_back(txn);
}

View File

@@ -1,32 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2021 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_BASICS_KEYCACHE_H
#define RIPPLE_BASICS_KEYCACHE_H
#include <ripple/basics/TaggedCache.h>
#include <ripple/basics/base_uint.h>
namespace ripple {
using KeyCache = TaggedCache<uint256, int, true>;
} // namespace ripple
#endif // RIPPLE_BASICS_KEYCACHE_H

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,214 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2022 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef BEAST_UTILITY_ATOMIC_SHARED_PTR_INCLUDED
#define BEAST_UTILITY_ATOMIC_SHARED_PTR_INCLUDED
// Temporarily shim in support for atomic<shared_ptr<T>>
#ifndef __cpp_lib_atomic_shared_ptr
namespace std {
template <class T>
struct atomic<shared_ptr<T>>
{
private:
shared_ptr<T> p_;
public:
constexpr atomic() noexcept = default;
atomic(const atomic&) = delete;
void
operator=(const atomic&) = delete;
constexpr atomic(nullptr_t) noexcept;
atomic(shared_ptr<T> desired) noexcept;
using value_type = shared_ptr<T>;
static constexpr bool is_always_lock_free = false;
bool
is_lock_free() const noexcept;
shared_ptr<T>
load(memory_order order = memory_order_seq_cst) const noexcept;
operator shared_ptr<T>() const noexcept;
void
store(
shared_ptr<T> desired,
memory_order order = memory_order_seq_cst) noexcept;
void
operator=(shared_ptr<T> desired) noexcept;
shared_ptr<T>
exchange(
shared_ptr<T> desired,
memory_order order = memory_order_seq_cst) noexcept;
bool
compare_exchange_weak(
shared_ptr<T>& expected,
shared_ptr<T> desired,
memory_order order = memory_order_seq_cst) noexcept;
bool
compare_exchange_weak(
shared_ptr<T>& expected,
shared_ptr<T> desired,
memory_order success,
memory_order failure) noexcept;
bool
compare_exchange_strong(
shared_ptr<T>& expected,
shared_ptr<T> desired,
memory_order order = memory_order_seq_cst) noexcept;
bool
compare_exchange_strong(
shared_ptr<T>& expected,
shared_ptr<T> desired,
memory_order success,
memory_order failure) noexcept;
// Not supported:
// void wait(shared_ptr<T> old,
// memory_order order = memory_order_seq_cst) const noexcept;
// void notify_one() noexcept;
// void notify_all() noexcept;
private:
static constexpr memory_order
transform(memory_order order) noexcept;
};
template <class T>
inline constexpr atomic<shared_ptr<T>>::atomic(nullptr_t) noexcept : atomic()
{
}
template <class T>
inline atomic<shared_ptr<T>>::atomic(shared_ptr<T> desired) noexcept
: p_{desired}
{
}
template <class T>
inline bool
atomic<shared_ptr<T>>::is_lock_free() const noexcept
{
return atomic_is_lock_free(&p_);
}
template <class T>
inline shared_ptr<T>
atomic<shared_ptr<T>>::load(memory_order order) const noexcept
{
return atomic_load_explicit(&p_, order);
}
template <class T>
inline atomic<shared_ptr<T>>::operator shared_ptr<T>() const noexcept
{
return load();
}
template <class T>
inline void
atomic<shared_ptr<T>>::store(shared_ptr<T> desired, memory_order order) noexcept
{
atomic_store_explicit(&p_, std::move(desired), order);
}
template <class T>
inline void
atomic<shared_ptr<T>>::operator=(shared_ptr<T> desired) noexcept
{
store(std::move(desired));
}
template <class T>
inline shared_ptr<T>
atomic<shared_ptr<T>>::exchange(
shared_ptr<T> desired,
memory_order order) noexcept
{
return atomic_exchange_explicit(&p_, std::move(desired), order);
}
template <class T>
inline constexpr memory_order
atomic<shared_ptr<T>>::transform(memory_order order) noexcept
{
memory_order fail_order = order;
if (fail_order == memory_order_acq_rel)
order = memory_order_acquire;
else if (fail_order == memory_order_release)
order = memory_order_relaxed;
return order;
}
template <class T>
inline bool
atomic<shared_ptr<T>>::compare_exchange_weak(
shared_ptr<T>& expected,
shared_ptr<T> desired,
memory_order order) noexcept
{
return compare_exchange_weak(
expected, std::move(desired), order, transform(order));
}
template <class T>
inline bool
atomic<shared_ptr<T>>::compare_exchange_weak(
shared_ptr<T>& expected,
shared_ptr<T> desired,
memory_order success,
memory_order failure) noexcept
{
return atomic_compare_exchange_weak_explicit(
&p_, &expected, std::move(desired), success, failure);
}
template <class T>
inline bool
atomic<shared_ptr<T>>::compare_exchange_strong(
shared_ptr<T>& expected,
shared_ptr<T> desired,
memory_order order) noexcept
{
return compare_exchange_weak(
expected, std::move(desired), order, transform(order));
}
template <class T>
inline bool
atomic<shared_ptr<T>>::compare_exchange_strong(
shared_ptr<T>& expected,
shared_ptr<T> desired,
memory_order success,
memory_order failure) noexcept
{
return atomic_compare_exchange_strong_explicit(
&p_, &expected, std::move(desired), success, failure);
}
} // namespace std
#endif // __cpp_lib_atomic_shared_ptr
#endif

View File

@@ -25,7 +25,56 @@
#include <ripple/protocol/STLedgerEntry.h>
namespace ripple {
using CachedSLEs = TaggedCache<uint256, SLE const>;
}
class CachedSLEs : public TaggedCache<uint256, SLE const>
{
std::atomic<std::uint64_t> handlerHits_ = 0;
std::atomic<std::uint64_t> handlerMisses_ = 0;
public:
using TaggedCache::TaggedCache;
/** Fetch from the cache; if needed, invoke the handler to load the item. */
template <class Handler>
std::shared_ptr<SLE const>
fetch(uint256 const& digest, Handler const& handler)
{
if (auto ret = TaggedCache::fetch(digest))
return ret;
if (auto sle = handler(); sle)
{
if (retrieve_or_insert(digest, sle))
handlerHits_++;
return sle;
}
handlerMisses_++;
return {};
}
// Reintroduce the function we just hid.
using TaggedCache::fetch;
/** Returns the fraction of cache hits. */
double
rate() const
{
// TODO
return 0;
}
Json::Value
info()
{
auto ret = TaggedCache::info();
ret["handler_hits"] = std::to_string(handlerHits_.load());
ret["handler_misses"] = std::to_string(handlerMisses_.load());
return ret;
}
};
} // namespace ripple
#endif // RIPPLE_LEDGER_CACHEDSLES_H_INCLUDED

View File

@@ -105,12 +105,12 @@ DatabaseNodeImp::fetchNodeObject(
if (cache_)
{
if (nodeObject)
cache_->canonicalize_replace_client(hash, nodeObject);
cache_->retrieve_or_insert(hash, nodeObject);
else
{
auto notFound =
NodeObject::createObject(hotDUMMY, {}, hash);
cache_->canonicalize_replace_client(hash, notFound);
cache_->retrieve_or_insert(hash, notFound);
if (notFound->getType() != hotDUMMY)
nodeObject = notFound;
}
@@ -188,7 +188,7 @@ DatabaseNodeImp::fetchBatch(std::vector<uint256> const& hashes)
{
// Ensure all threads get the same object
if (cache_)
cache_->canonicalize_replace_client(hash, nObj);
cache_->retrieve_or_insert(hash, nObj);
}
else
{
@@ -198,7 +198,7 @@ DatabaseNodeImp::fetchBatch(std::vector<uint256> const& hashes)
if (cache_)
{
auto notFound = NodeObject::createObject(hotDUMMY, {}, hash);
cache_->canonicalize_replace_client(hash, notFound);
cache_->retrieve_or_insert(hash, notFound);
if (notFound->getType() != hotDUMMY)
nObj = std::move(notFound);
}

View File

@@ -174,8 +174,7 @@ Shard::tryClose()
acquireInfo_.reset();
// Reset caches to reduce memory use
app_.getShardFamily()->getFullBelowCache(lastSeq_)->reset();
app_.getShardFamily()->getTreeNodeCache(lastSeq_)->reset();
app_.getShardFamily()->resetCacheFor(lastSeq_);
return true;
}
@@ -647,12 +646,11 @@ Shard::finalize(bool writeSQLite, std::optional<uint256> const& referenceHash)
std::shared_ptr<Ledger const> next;
auto const lastLedgerHash{hash};
auto& shardFamily{*app_.getShardFamily()};
auto const fullBelowCache{shardFamily.getFullBelowCache(lastSeq_)};
auto const treeNodeCache{shardFamily.getTreeNodeCache(lastSeq_)};
// Reset caches to reduce memory usage
fullBelowCache->reset();
treeNodeCache->reset();
shardFamily.resetCacheFor(lastSeq_);
auto const fullBelowCache = shardFamily.getFullBelowCache(lastSeq_);
auto const treeNodeCache = shardFamily.getTreeNodeCache(lastSeq_);
Serializer s;
s.add32(version);
@@ -720,8 +718,7 @@ Shard::finalize(bool writeSQLite, std::optional<uint256> const& referenceHash)
--ledgerSeq;
fullBelowCache->reset();
treeNodeCache->reset();
shardFamily.resetCacheFor(lastSeq_);
}
JLOG(j_.debug()) << "shard " << index_ << " is valid";

View File

@@ -23,7 +23,6 @@
#include <ripple/app/ledger/Ledger.h>
#include <ripple/app/rdb/RelationalDatabase.h>
#include <ripple/basics/BasicConfig.h>
#include <ripple/basics/KeyCache.h>
#include <ripple/basics/MathUtilities.h>
#include <ripple/basics/RangeSet.h>
#include <ripple/basics/ThreadSafetyAnalysis.h>
@@ -40,8 +39,6 @@
namespace ripple {
namespace NodeStore {
using PCache = TaggedCache<uint256, NodeObject>;
using NCache = KeyCache;
class DatabaseShard;
/* A range of historical ledgers backed by a node store.

View File

@@ -390,7 +390,6 @@ JSS(ledger_hash); // in: RPCHelpers, LedgerRequest,
// out: NetworkOPs, RPCHelpers,
// LedgerClosed, LedgerData,
// AccountLines
JSS(ledger_hit_rate); // out: GetCounts
JSS(ledger_index); // in/out: many
JSS(ledger_index_max); // in, out: AccountTx*
JSS(ledger_index_min); // in, out: AccountTx*

View File

@@ -299,7 +299,8 @@ populateJsonResponse(
{
Json::Value& jvObj = jvTxns.append(Json::objectValue);
jvObj[jss::tx] = txn->getJson(JsonOptions::include_date);
jvObj[jss::tx] =
txn->getJson(context.app, JsonOptions::include_date);
if (txnMeta)
{
jvObj[jss::meta] =

View File

@@ -103,21 +103,22 @@ getCountsJson(Application& app, int minObjectCount)
}
}
ret["ledger_master"] = app.getLedgerMaster().info();
auto& caches = (ret["caches"] = Json::arrayValue);
{
caches.append(app.getNodeFamily().getFullBelowCache(0)->info());
caches.append(app.getNodeFamily().getTreeNodeCache(0)->info());
caches.append(app.getTempNodeCache().info());
caches.append(app.getAcceptedLedgerCache().info());
caches.append(app.cachedSLEs().info());
}
ret[jss::write_load] = app.getNodeStore().getWriteLoad();
ret[jss::historical_perminute] =
static_cast<int>(app.getInboundLedgers().fetchRate());
ret[jss::SLE_hit_rate] = app.cachedSLEs().rate();
ret[jss::ledger_hit_rate] = app.getLedgerMaster().getCacheHitRate();
ret[jss::AL_size] = Json::UInt(app.getAcceptedLedgerCache().size());
ret[jss::AL_hit_rate] = app.getAcceptedLedgerCache().getHitRate();
ret[jss::fullbelow_size] =
static_cast<int>(app.getNodeFamily().getFullBelowCache(0)->size());
ret[jss::treenode_cache_size] =
app.getNodeFamily().getTreeNodeCache(0)->getCacheSize();
ret[jss::treenode_track_size] =
app.getNodeFamily().getTreeNodeCache(0)->getTrackSize();
std::string uptime;
auto s = UptimeClock::now();
@@ -132,12 +133,22 @@ getCountsJson(Application& app, int minObjectCount)
if (auto shardStore = app.getShardStore())
{
auto shardFamily{dynamic_cast<ShardFamily*>(app.getShardFamily())};
auto const [cacheSz, trackSz] = shardFamily->getTreeNodeCacheSize();
Json::Value& jv = (ret[jss::shards] = Json::objectValue);
jv[jss::fullbelow_size] = shardFamily->getFullBelowCacheSize();
jv[jss::treenode_cache_size] = cacheSz;
jv[jss::treenode_track_size] = trackSz;
auto& caches = (ret["caches"] = Json::arrayValue);
{
shardFamily->forEachFullBelowCache(
[&caches](FullBelowCache const* fbc) {
caches.append(fbc->info());
});
shardFamily->forEachTreeNodeCache(
[&caches](TreeNodeCache const* tnc) {
caches.append(tnc->info());
});
}
ret[jss::write_load] = shardStore->getWriteLoad();
jv[jss::node_writes] = std::to_string(shardStore->getStoreCount());
jv[jss::node_reads_total] = shardStore->getFetchTotalCount();

View File

@@ -116,12 +116,11 @@ doSubmit(RPC::JsonContext& context)
}
}
std::string reason;
auto tpTrans = std::make_shared<Transaction>(stpTrans, reason, context.app);
auto tpTrans = std::make_shared<Transaction>(stpTrans);
if (tpTrans->getStatus() != NEW)
{
jvResult[jss::error] = "invalidTransaction";
jvResult[jss::error_exception] = "fails local checks: " + reason;
jvResult[jss::error_exception] = "fails local checks";
return jvResult;
}
@@ -143,7 +142,8 @@ doSubmit(RPC::JsonContext& context)
try
{
jvResult[jss::tx_json] = tpTrans->getJson(JsonOptions::none);
jvResult[jss::tx_json] =
tpTrans->getJson(context.app, JsonOptions::none);
jvResult[jss::tx_blob] =
strHex(tpTrans->getSTransaction()->getSerializer().peekData());

View File

@@ -49,7 +49,7 @@ isValidated(LedgerMaster& ledgerMaster, std::uint32_t seq, uint256 const& hash)
struct TxResult
{
Transaction::pointer txn;
std::shared_ptr<Transaction> txn;
std::variant<std::shared_ptr<TxMeta>, Blob> meta;
bool validated = false;
std::optional<std::string> ctid;
@@ -121,10 +121,8 @@ doTxPostgres(RPC::Context& context, TxArgs const& args)
assert(false);
return {res, {rpcINTERNAL, "Error deserializing SHAMap node"}};
}
std::string reason;
res.txn = std::make_shared<Transaction>(sttx, reason, context.app);
res.txn->setLedger(locator.getLedgerSequence());
res.txn->setStatus(COMMITTED);
res.txn = std::make_shared<Transaction>(sttx);
res.txn->setStatus(COMMITTED, locator.getLedgerSequence());
if (args.binary)
{
SerialIter it(item->slice());
@@ -305,7 +303,8 @@ populateJsonResponse(
// no errors
else if (result.txn)
{
response = result.txn->getJson(JsonOptions::include_date, args.binary);
response = result.txn->getJson(
context.app, JsonOptions::include_date, args.binary);
// populate binary metadata
if (auto blob = std::get_if<Blob>(&result.meta))

View File

@@ -63,7 +63,7 @@ doTxHistory(RPC::JsonContext& context)
obj["used_postgres"] = true;
for (auto const& t : trans)
txs.append(t->getJson(JsonOptions::none));
txs.append(t->getJson(context.app, JsonOptions::none));
return obj;
}

View File

@@ -550,23 +550,22 @@ transactionPreProcessImpl(
return transactionPreProcessResult{std::move(stpTrans)};
}
static std::pair<Json::Value, Transaction::pointer>
static std::pair<Json::Value, std::shared_ptr<Transaction>>
transactionConstructImpl(
std::shared_ptr<STTx const> const& stpTrans,
Rules const& rules,
Application& app)
{
std::pair<Json::Value, Transaction::pointer> ret;
std::pair<Json::Value, std::shared_ptr<Transaction>> ret;
// Turn the passed in STTx into a Transaction.
Transaction::pointer tpTrans;
std::shared_ptr<Transaction> tpTrans;
{
std::string reason;
tpTrans = std::make_shared<Transaction>(stpTrans, reason, app);
tpTrans = std::make_shared<Transaction>(stpTrans);
if (tpTrans->getStatus() != NEW)
{
ret.first = RPC::make_error(
rpcINTERNAL, "Unable to construct transaction: " + reason);
ret.first =
RPC::make_error(rpcINTERNAL, "Unable to construct transaction");
return ret;
}
}
@@ -597,9 +596,7 @@ transactionConstructImpl(
return ret;
}
std::string reason;
auto tpTransNew =
std::make_shared<Transaction>(sttxNew, reason, app);
auto tpTransNew = std::make_shared<Transaction>(sttxNew);
if (tpTransNew)
{
@@ -629,12 +626,14 @@ transactionConstructImpl(
}
static Json::Value
transactionFormatResultImpl(Transaction::pointer tpTrans)
transactionFormatResultImpl(
std::shared_ptr<Transaction> tpTrans,
Application& app)
{
Json::Value jvResult;
try
{
jvResult[jss::tx_json] = tpTrans->getJson(JsonOptions::none);
jvResult[jss::tx_json] = tpTrans->getJson(app, JsonOptions::none);
jvResult[jss::tx_blob] =
strHex(tpTrans->getSTransaction()->getSerializer().peekData());
@@ -785,13 +784,13 @@ transactionSign(
else
ledger = app.openLedger().current();
// Make sure the STTx makes a legitimate Transaction.
std::pair<Json::Value, Transaction::pointer> txn =
std::pair<Json::Value, std::shared_ptr<Transaction>> txn =
transactionConstructImpl(preprocResult.second, ledger->rules(), app);
if (!txn.second)
return txn.first;
return transactionFormatResultImpl(txn.second);
return transactionFormatResultImpl(txn.second, app);
}
/** Returns a Json::objectValue. */
@@ -819,7 +818,7 @@ transactionSubmit(
return preprocResult.first;
// Make sure the STTx makes a legitimate Transaction.
std::pair<Json::Value, Transaction::pointer> txn =
std::pair<Json::Value, std::shared_ptr<Transaction>> txn =
transactionConstructImpl(preprocResult.second, ledger->rules(), app);
if (!txn.second)
@@ -837,7 +836,7 @@ transactionSubmit(
rpcINTERNAL, "Exception occurred during transaction submission.");
}
return transactionFormatResultImpl(txn.second);
return transactionFormatResultImpl(txn.second, app);
}
namespace detail {
@@ -1021,13 +1020,13 @@ transactionSignFor(
}
// Make sure the STTx makes a legitimate Transaction.
std::pair<Json::Value, Transaction::pointer> txn =
std::pair<Json::Value, std::shared_ptr<Transaction>> txn =
transactionConstructImpl(sttx, ledger->rules(), app);
if (!txn.second)
return txn.first;
return transactionFormatResultImpl(txn.second);
return transactionFormatResultImpl(txn.second, app);
}
/** Returns a Json::objectValue. */
@@ -1201,7 +1200,7 @@ transactionSubmitMultiSigned(
return err;
// Make sure the SerializedTransaction makes a legitimate Transaction.
std::pair<Json::Value, Transaction::pointer> txn =
std::pair<Json::Value, std::shared_ptr<Transaction>> txn =
transactionConstructImpl(stpTrans, ledger->rules(), app);
if (!txn.second)
@@ -1219,7 +1218,7 @@ transactionSubmitMultiSigned(
rpcINTERNAL, "Exception occurred during transaction submission.");
}
return transactionFormatResultImpl(txn.second);
return transactionFormatResultImpl(txn.second, app);
}
} // namespace RPC

View File

@@ -53,6 +53,9 @@ public:
virtual beast::Journal const&
journal() = 0;
virtual void
resetCacheFor(std::uint32_t ledgerSeq) = 0;
/** Return a pointer to the Family Full Below Cache
@param ledgerSeq ledger sequence determines a corresponding shard cache
@@ -92,9 +95,6 @@ public:
*/
virtual void
missingNodeAcquireByHash(uint256 const& refHash, std::uint32_t refNum) = 0;
virtual void
reset() = 0;
};
} // namespace ripple

View File

@@ -20,31 +20,350 @@
#ifndef RIPPLE_SHAMAP_FULLBELOWCACHE_H_INCLUDED
#define RIPPLE_SHAMAP_FULLBELOWCACHE_H_INCLUDED
#include <ripple/basics/KeyCache.h>
#include <ripple/basics/TaggedCache.h>
#include <ripple/basics/Log.h>
#include <ripple/basics/UnorderedContainers.h>
#include <ripple/basics/base_uint.h>
#include <ripple/basics/hardened_hash.h>
#include <ripple/beast/clock/abstract_clock.h>
#include <ripple/beast/insight/Collector.h>
#include <ripple/beast/insight/Insight.h>
#include <ripple/beast/utility/Journal.h>
#include <ripple/beast/utility/WrappedSink.h>
#include <ripple/beast/utility/atomic_shared_ptr.h>
#include <ripple/json/json_value.h>
#include <array>
#include <atomic>
#include <functional>
#include <mutex>
#include <string>
#include <thread>
#include <type_traits>
#include <utility>
#include <vector>
namespace ripple {
namespace detail {
template <
class Key,
class Hash = hardened_hash<>,
class KeyEqual = std::equal_to<Key>>
class FullBelowCacheImpl
{
public:
using key_type = Key;
using clock_type = beast::abstract_clock<std::chrono::steady_clock>;
private:
struct Stats
{
template <class Handler>
Stats(
std::string const& prefix,
Handler const& handler,
beast::insight::Collector::ptr const& collector)
: hook(collector->make_hook(handler))
, size(collector->make_gauge(prefix, "size"))
, hit_rate(collector->make_gauge(prefix, "hit_rate"))
{
}
beast::insight::Hook hook;
beast::insight::Gauge size;
beast::insight::Gauge hit_rate;
};
Hash hash_;
// Used for logging
std::string name_;
beast::WrappedSink sink_;
beast::Journal journal_;
clock_type& clock_;
Stats m_stats;
// Desired number of cache entries (0 = ignore)
std::size_t const targetSize_;
// Desired maximum cache age
std::chrono::seconds const targetAge_;
/** Map partitions:
The idea is to partition the key space into multiple, independent
maps, each with their own lock. This helps to increase concurrency
since it's possible to operate on multiple partitions at a time.
*/
struct Partition
{
std::mutex mutable mutex;
std::size_t const index;
hardened_hash_map<Key, clock_type::time_point, Hash, KeyEqual> items;
Partition(std::size_t i) : index(i)
{
}
};
/** The partitions that, together, map the entire key space. */
std::array<Partition, 64> partitions_;
/** Number of items where we have either a strong or weak pointer to. */
std::atomic<std::size_t> size_ = 0;
/** The number of times that we found an item in the cache */
std::atomic<std::uint64_t> hits_ = 0;
std::atomic<std::uint64_t> misses_ = 0;
Partition&
getPartition(Key const& key) noexcept
{
return partitions_[hash_(key) % partitions_.size()];
}
private:
template <std::size_t... Is>
FullBelowCacheImpl(
std::string name,
std::size_t targetSize,
std::chrono::seconds expiration,
clock_type& clock,
beast::Journal journal,
beast::insight::Collector::ptr const& collector,
std::index_sequence<Is...>)
: name_(std::move(name))
, sink_(journal, "[" + name_ + "] ")
, journal_(sink_)
, clock_(clock)
, m_stats(
name_,
[this]() {
m_stats.size.set(size());
m_stats.hit_rate.set([this]() {
auto const h = hits_.load();
auto const m = misses_.load();
auto ret = h + m;
if (ret != 0)
ret = (h * 100) / ret;
return ret;
}());
},
collector)
, targetSize_(targetSize)
, targetAge_(expiration)
, partitions_{(Is)...}
{
}
public:
FullBelowCacheImpl(
std::string name,
std::size_t targetSize,
std::chrono::seconds expiration,
clock_type& clock,
beast::Journal journal,
beast::insight::Collector::ptr const& collector =
beast::insight::NullCollector::New())
: FullBelowCacheImpl(
std::move(name),
targetSize,
expiration,
clock,
journal,
collector,
std::make_index_sequence<64>{})
{
}
/** Returns the total number cached items. */
std::size_t
size() const
{
return size_.load();
}
/** Returns the name of the FullBelowCacheImpl instance. */
std::string const&
name() const
{
return name_;
}
/** Refresh the last access time on a key if present.
@return `true` If the key was found.
*/
bool
touch_if_exists(Key const& key)
{
auto& p = getPartition(key);
{
std::lock_guard lock(p.mutex);
if (auto const it = p.items.find(key); it != p.items.end())
{
it->second = clock_.now();
++hits_;
return true;
}
}
++misses_;
return false;
}
bool
insert(Key const& key)
{
auto& p = getPartition(key);
std::lock_guard l(p.mutex);
auto const now = clock_.now();
auto [it, inserted] = p.items.emplace(
std::piecewise_construct,
std::forward_as_tuple(key),
std::forward_as_tuple(now));
if (inserted)
{
size_.fetch_add(1, std::memory_order_relaxed);
return true;
}
it->second = now;
return false;
}
/** Erases all elements that match the predicate. */
void
sweep()
{
// Calculate the expiration time
auto const expire = [this]() {
using namespace std::chrono_literals;
auto exp = targetAge_;
// If the size of the cache exceeds the target size, then we adjust
// the expiry timeout down to prune data faster.
if (exp != std::chrono::seconds::zero() && targetSize_ != 0)
{
if (auto const size = size_.load(); size > targetSize_)
exp = exp * targetSize_ / size;
}
return std::max<std::chrono::seconds>(1s, exp);
}();
auto eraser = [this, expire](
Partition& partition,
clock_type::time_point const& now) {
std::size_t decTotal = 0;
std::lock_guard lock(partition.mutex);
auto it = partition.items.begin();
while (it != partition.items.end())
{
if (it->second + expire <= now)
{
++decTotal;
it = partition.items.erase(it);
continue;
}
// This item is fine.
++it;
}
if (decTotal)
size_.fetch_sub(decTotal, std::memory_order_relaxed);
};
auto const start = clock_.now();
// For systems with a small number of cores always use the single
// threaded algorithm.
if (std::thread::hardware_concurrency() <= 4)
{
for (auto& p : partitions_)
eraser(p, start);
}
else
{
// We want to limit the number of threads to avoid resource
// starvation.
std::array<std::thread, 4> threads{};
for (std::size_t i = 0; i != threads.size(); ++i)
{
threads[i] = std::thread(
[this, start, &eraser, step = threads.size()](
std::size_t j) {
while (j < partitions_.size())
{
eraser(partitions_[j], start);
j += step;
}
},
i);
}
for (auto& t : threads)
t.join();
}
if (auto const d = clock_.now() - start; d >= std::chrono::seconds(2))
{
auto const secs =
std::chrono::duration_cast<std::chrono::seconds>(d);
auto const usecs =
std::chrono::duration_cast<std::chrono::milliseconds>(d) -
std::chrono::duration_cast<std::chrono::milliseconds>(secs);
JLOG(journal_.info())
<< "sweep: Iteration over " << size_.load() << " items took "
<< secs.count() << "." << usecs.count() << " seconds.";
}
}
Json::Value
info() const
{
Json::Value v{Json::objectValue};
v["name"] = name_;
v["partitions"] = static_cast<std::uint32_t>(partitions_.size());
v["total_size"] = std::to_string(size_.load());
v["cache_hits"] = std::to_string(hits_.load());
v["cache_misses"] = std::to_string(misses_.load());
v["target_size"] = std::to_string(targetSize_);
v["target_age"] = std::to_string(targetAge_.count());
return v;
}
};
} // namespace detail
/** Remembers which tree keys have all descendants resident.
This optimizes the process of acquiring a complete tree.
*/
class BasicFullBelowCache
class FullBelowCache
{
private:
using CacheType = KeyCache;
using KeyCache = detail::FullBelowCacheImpl<uint256>;
public:
enum { defaultCacheTargetSize = 0 };
using key_type = uint256;
using clock_type = typename CacheType::clock_type;
using key_type = KeyCache::key_type;
using clock_type = KeyCache::clock_type;
/** Construct the cache.
@@ -53,23 +372,22 @@ public:
@param targetSize The cache target size.
@param targetExpirationSeconds The expiration time for items.
*/
BasicFullBelowCache(
FullBelowCache(
std::string const& name,
clock_type& clock,
beast::Journal j,
std::size_t targetSize,
std::chrono::seconds expiration = std::chrono::minutes{2},
beast::insight::Collector::ptr const& collector =
beast::insight::NullCollector::New(),
std::size_t target_size = defaultCacheTargetSize,
std::chrono::seconds expiration = std::chrono::minutes{2})
: m_cache(name, target_size, expiration, clock, j, collector), m_gen(1)
beast::insight::NullCollector::New())
: name_(name)
, clock_(clock)
, journal_(j)
, collector_(collector)
, targetSize_(targetSize)
, expiration_(expiration)
{
}
/** Return the clock associated with the cache. */
clock_type&
clock()
{
return m_cache.clock();
clear();
}
/** Return the number of elements in the cache.
@@ -79,29 +397,39 @@ public:
std::size_t
size() const
{
return m_cache.size();
if (auto c = cache_.load())
return c->size();
return 0;
}
/** Remove expired cache items.
Thread safety:
Safe to call from any thread.
*/
void
sweep()
{
m_cache.sweep();
if (auto c = cache_.load())
c->sweep();
}
/** Refresh the last access time of an item, if it exists.
Thread safety:
Safe to call from any thread.
@param key The key to refresh.
@return `true` If the key exists.
*/
bool
touch_if_exists(key_type const& key)
{
return m_cache.touch_if_exists(key);
if (auto c = cache_.load())
return c->touch_if_exists(key);
return false;
}
/** Insert a key into the cache.
@@ -114,39 +442,75 @@ public:
void
insert(key_type const& key)
{
m_cache.insert(key);
if (auto c = cache_.load())
c->insert(key);
}
/** generation determines whether cached entry is valid */
/** Returns the generation that can determine if a cached entry is valid. */
std::uint32_t
getGeneration(void) const
generation() const
{
return m_gen;
return generation_;
}
/** Clears the cache.
This effectively replaces the cache with an entirely new instance, and
destroys the old instance.
It is safe to call this method from any thread.
*/
void
clear()
{
m_cache.clear();
++m_gen;
auto c = cache_.load();
++generation_;
cache_.store(std::make_shared<KeyCache>(
"FullBelow: " + name_,
targetSize_,
expiration_,
clock_,
journal_,
collector_));
if (c)
{
std::thread t(
[](std::shared_ptr<KeyCache> cache) {
// just invoke the destructor of cache
},
std::move(c));
t.detach();
}
}
void
reset()
Json::Value
info() const
{
m_cache.clear();
m_gen = 1;
if (auto c = cache_.load())
{
auto ret = c->info();
ret["generation"] = std::to_string(generation_);
return ret;
}
return {Json::objectValue};
}
private:
CacheType m_cache;
std::atomic<std::uint32_t> m_gen;
std::string const name_;
clock_type& clock_;
beast::Journal journal_;
beast::insight::Collector::ptr collector_;
std::size_t const targetSize_;
std::chrono::seconds const expiration_;
std::atomic<std::uint32_t> generation_ = 0;
std::atomic<std::shared_ptr<KeyCache>> cache_;
};
} // namespace detail
using FullBelowCache = detail::BasicFullBelowCache;
} // namespace ripple
#endif

View File

@@ -66,6 +66,11 @@ public:
return false;
}
void resetCacheFor(std::uint32_t) override
{
return;
}
std::shared_ptr<FullBelowCache> getFullBelowCache(std::uint32_t) override
{
return fbCache_;
@@ -79,9 +84,6 @@ public:
void
sweep() override;
void
reset() override;
void
missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& hash) override;
@@ -105,6 +107,9 @@ private:
void
acquire(uint256 const& hash, std::uint32_t seq);
void
initCaches();
};
} // namespace ripple

View File

@@ -32,7 +32,6 @@
#include <ripple/shamap/SHAMapLeafNode.h>
#include <ripple/shamap/SHAMapMissingNode.h>
#include <ripple/shamap/SHAMapTreeNode.h>
#include <ripple/shamap/TreeNodeCache.h>
#include <cassert>
#include <stack>
#include <vector>

View File

@@ -66,28 +66,18 @@ public:
return true;
}
void
resetCacheFor(std::uint32_t ledgerSeq) override;
std::shared_ptr<FullBelowCache>
getFullBelowCache(std::uint32_t ledgerSeq) override;
/** Return the number of entries in the cache */
int
getFullBelowCacheSize();
std::shared_ptr<TreeNodeCache>
getTreeNodeCache(std::uint32_t ledgerSeq) override;
/** Return a pair where the first item is the number of items cached
and the second item is the number of entries in the cached
*/
std::pair<int, int>
getTreeNodeCacheSize();
void
sweep() override;
void
reset() override;
void
missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash)
override;
@@ -98,6 +88,24 @@ public:
acquire(hash, seq);
}
template <class Func>
void
forEachFullBelowCache(Func&& f)
{
std::lock_guard lock(fbCacheMutex_);
for (auto const& e : fbCache_)
f(e.second.get());
}
template <class Func>
void
forEachTreeNodeCache(Func&& f)
{
std::lock_guard lock(tnCacheMutex_);
for (auto const& e : tnCache_)
f(e.second.get());
}
private:
Application& app_;
NodeStore::Database& db_;

View File

@@ -26,24 +26,9 @@
namespace ripple {
NodeFamily::NodeFamily(Application& app, CollectorManager& cm)
: app_(app)
, db_(app.getNodeStore())
, j_(app.journal("NodeFamily"))
, fbCache_(std::make_shared<FullBelowCache>(
"Node family full below cache",
stopwatch(),
app.journal("NodeFamilyFulLBelowCache"),
cm.collector(),
fullBelowTargetSize,
fullBelowExpiration))
, tnCache_(std::make_shared<TreeNodeCache>(
"Node family tree node cache",
app.config().getValueFor(SizedItem::treeCacheSize),
std::chrono::seconds(
app.config().getValueFor(SizedItem::treeCacheAge)),
stopwatch(),
j_))
: app_(app), db_(app.getNodeStore()), j_(app.journal("NodeFamily"))
{
initCaches();
}
void
@@ -54,21 +39,22 @@ NodeFamily::sweep()
}
void
NodeFamily::reset()
NodeFamily::acquire(uint256 const& hash, std::uint32_t seq)
{
if (hash.isNonZero())
{
std::lock_guard lock(maxSeqMutex_);
maxSeq_ = 0;
}
JLOG(j_.error()) << "Missing node in " << to_string(hash);
fbCache_->reset();
tnCache_->reset();
app_.getInboundLedgers().acquire(
hash, seq, InboundLedger::Reason::GENERIC);
}
}
void
NodeFamily::missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash)
{
JLOG(j_.error()) << "Missing node in " << seq;
if (app_.config().reporting())
{
std::stringstream ss;
@@ -103,15 +89,23 @@ NodeFamily::missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash)
}
void
NodeFamily::acquire(uint256 const& hash, std::uint32_t seq)
NodeFamily::initCaches()
{
if (hash.isNonZero())
{
JLOG(j_.error()) << "Missing node in " << to_string(hash);
fbCache_ = std::make_shared<FullBelowCache>(
"NodeFamily",
stopwatch(),
j_,
fullBelowTargetSize,
fullBelowExpiration,
app_.getCollectorManager().collector());
app_.getInboundLedgers().acquire(
hash, seq, InboundLedger::Reason::GENERIC);
}
tnCache_ = std::make_shared<TreeNodeCache>(
"TreeNodeCache: NodeFamily",
app_.config().getValueFor(SizedItem::treeCacheSize),
std::chrono::seconds(
app_.config().getValueFor(SizedItem::treeCacheAge)),
stopwatch(),
j_);
}
} // namespace ripple

View File

@@ -1177,7 +1177,7 @@ SHAMap::canonicalize(
assert(node->getHash() == hash);
f_.getTreeNodeCache(ledgerSeq_)
->canonicalize_replace_client(hash.as_uint256(), node);
->retrieve_or_insert(hash.as_uint256(), node);
}
void

View File

@@ -323,7 +323,7 @@ SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter)
max,
filter,
512, // number of async reads per pass
f_.getFullBelowCache(ledgerSeq_)->getGeneration());
f_.getFullBelowCache(ledgerSeq_)->generation());
if (!root_->isInner() ||
std::static_pointer_cast<SHAMapInnerNode>(root_)->isFullBelow(
@@ -579,7 +579,7 @@ SHAMap::addKnownNode(
return SHAMapAddNode::duplicate();
}
auto const generation = f_.getFullBelowCache(ledgerSeq_)->getGeneration();
auto const generation = f_.getFullBelowCache(ledgerSeq_)->generation();
SHAMapNodeID iNodeID;
auto iNode = root_.get();

View File

@@ -44,6 +44,39 @@ ShardFamily::ShardFamily(Application& app, CollectorManager& cm)
{
}
void
ShardFamily::resetCacheFor(std::uint32_t ledgerSeq)
{
auto const shardIndex = app_.getShardStore()->seqToShardIndex(ledgerSeq);
{ // Destroy the cache without a lock
std::shared_ptr<FullBelowCache> fbc;
{
std::lock_guard lock(fbCacheMutex_);
if (auto const it = fbCache_.find(shardIndex); it != fbCache_.end())
{
fbc = std::move(it->second);
fbCache_.erase(it);
}
}
}
{ // Destroy the cache without a lock
std::shared_ptr<TreeNodeCache> tnc;
{
std::lock_guard lock(tnCacheMutex_);
if (auto const it = tnCache_.find(shardIndex); it != tnCache_.end())
{
tnc = std::move(it->second);
tnCache_.erase(it);
}
}
}
}
std::shared_ptr<FullBelowCache>
ShardFamily::getFullBelowCache(std::uint32_t ledgerSeq)
{
@@ -54,25 +87,15 @@ ShardFamily::getFullBelowCache(std::uint32_t ledgerSeq)
// Create a cache for the corresponding shard
auto fbCache{std::make_shared<FullBelowCache>(
"Shard family full below cache shard " + std::to_string(shardIndex),
"Shard #" + std::to_string(shardIndex),
stopwatch(),
j_,
cm_.collector(),
fullBelowTargetSize,
fullBelowExpiration)};
fullBelowExpiration,
cm_.collector())};
return fbCache_.emplace(shardIndex, std::move(fbCache)).first->second;
}
int
ShardFamily::getFullBelowCacheSize()
{
size_t sz{0};
std::lock_guard lock(fbCacheMutex_);
for (auto const& e : fbCache_)
sz += e.second->size();
return sz;
}
std::shared_ptr<TreeNodeCache>
ShardFamily::getTreeNodeCache(std::uint32_t ledgerSeq)
{
@@ -91,20 +114,6 @@ ShardFamily::getTreeNodeCache(std::uint32_t ledgerSeq)
return tnCache_.emplace(shardIndex, std::move(tnCache)).first->second;
}
std::pair<int, int>
ShardFamily::getTreeNodeCacheSize()
{
int cacheSz{0};
int trackSz{0};
std::lock_guard lock(tnCacheMutex_);
for (auto const& e : tnCache_)
{
cacheSz += e.second->getCacheSize();
trackSz += e.second->getTrackSize();
}
return {cacheSz, trackSz};
}
void
ShardFamily::sweep()
{
@@ -135,23 +144,6 @@ ShardFamily::sweep()
}
}
void
ShardFamily::reset()
{
{
std::lock_guard lock(maxSeqMutex_);
maxSeq_ = 0;
}
{
std::lock_guard lock(fbCacheMutex_);
fbCache_.clear();
}
std::lock_guard lock(tnCacheMutex_);
tnCache_.clear();
}
void
ShardFamily::missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash)
{

View File

@@ -1,99 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/basics/TaggedCache.h>
#include <ripple/basics/chrono.h>
#include <ripple/beast/clock/manual_clock.h>
#include <ripple/beast/unit_test.h>
#include <ripple/beast/utility/Journal.h>
#include <ripple/protocol/Protocol.h>
#include <test/unit_test/SuiteJournal.h>
namespace ripple {
class KeyCache_test : public beast::unit_test::suite
{
public:
void
run() override
{
using namespace std::chrono_literals;
TestStopwatch clock;
clock.set(0);
using Key = std::string;
using Cache = TaggedCache<Key, int, true>;
test::SuiteJournal j("KeyCacheTest", *this);
// Insert an item, retrieve it, and age it so it gets purged.
{
Cache c("test", LedgerIndex(1), 2s, clock, j);
BEAST_EXPECT(c.size() == 0);
BEAST_EXPECT(c.insert("one"));
BEAST_EXPECT(!c.insert("one"));
BEAST_EXPECT(c.size() == 1);
BEAST_EXPECT(c.touch_if_exists("one"));
++clock;
c.sweep();
BEAST_EXPECT(c.size() == 1);
++clock;
c.sweep();
BEAST_EXPECT(c.size() == 0);
BEAST_EXPECT(!c.touch_if_exists("one"));
}
// Insert two items, have one expire
{
Cache c("test", LedgerIndex(2), 2s, clock, j);
BEAST_EXPECT(c.insert("one"));
BEAST_EXPECT(c.size() == 1);
BEAST_EXPECT(c.insert("two"));
BEAST_EXPECT(c.size() == 2);
++clock;
c.sweep();
BEAST_EXPECT(c.size() == 2);
BEAST_EXPECT(c.touch_if_exists("two"));
++clock;
c.sweep();
BEAST_EXPECT(c.size() == 1);
}
// Insert three items (1 over limit), sweep
{
Cache c("test", LedgerIndex(2), 3s, clock, j);
BEAST_EXPECT(c.insert("one"));
++clock;
BEAST_EXPECT(c.insert("two"));
++clock;
BEAST_EXPECT(c.insert("three"));
++clock;
BEAST_EXPECT(c.size() == 3);
c.sweep();
BEAST_EXPECT(c.size() < 3);
}
}
};
BEAST_DEFINE_TESTSUITE(KeyCache, common, ripple);
} // namespace ripple

View File

@@ -51,9 +51,8 @@ public:
using Key = LedgerIndex;
using Value = std::string;
using Cache = TaggedCache<Key, Value>;
Cache c("test", 1, 1s, clock, journal);
TaggedCache<Key, Value> c("test", 1, 1s, clock, journal);
// Insert an item, retrieve it, and age it so it gets purged.
{
@@ -63,12 +62,6 @@ public:
BEAST_EXPECT(c.getCacheSize() == 1);
BEAST_EXPECT(c.getTrackSize() == 1);
{
std::string s;
BEAST_EXPECT(c.retrieve(1, s));
BEAST_EXPECT(s == "one");
}
++clock;
c.sweep();
BEAST_EXPECT(c.getCacheSize() == 0);
@@ -105,7 +98,7 @@ public:
{
auto const p1 = c.fetch(3);
auto p2 = std::make_shared<Value>("three");
c.canonicalize_replace_client(3, p2);
c.retrieve_or_insert(3, p2);
BEAST_EXPECT(p1.get() == p2.get());
}
++clock;
@@ -136,7 +129,7 @@ public:
BEAST_EXPECT(c.getTrackSize() == 1);
// Canonicalize a new object with the same key
auto p2 = std::make_shared<std::string>("four");
BEAST_EXPECT(c.canonicalize_replace_client(4, p2));
BEAST_EXPECT(c.retrieve_or_insert(4, p2));
BEAST_EXPECT(c.getCacheSize() == 1);
BEAST_EXPECT(c.getTrackSize() == 1);
// Make sure we get the original object

View File

@@ -32,6 +32,8 @@ namespace tests {
class TestNodeFamily : public Family
{
private:
beast::Journal const j_;
std::unique_ptr<NodeStore::Database> db_;
std::shared_ptr<FullBelowCache> fbCache_;
@@ -40,21 +42,28 @@ private:
TestStopwatch clock_;
NodeStore::DummyScheduler scheduler_;
beast::Journal const j_;
private:
std::shared_ptr<TreeNodeCache>
initTreeNodeCache()
{
return std::make_shared<TreeNodeCache>(
"App family tree node cache",
65536,
std::chrono::minutes{1},
clock_,
j_);
}
public:
TestNodeFamily(beast::Journal j)
: fbCache_(std::make_shared<FullBelowCache>(
: j_(j)
, fbCache_(std::make_shared<FullBelowCache>(
"App family full below cache",
clock_,
j))
, tnCache_(std::make_shared<TreeNodeCache>(
"App family tree node cache",
65536,
std::chrono::minutes{1},
clock_,
j))
, j_(j)
j,
100000,
std::chrono::minutes{2}))
, tnCache_(initTreeNodeCache())
{
Section testSection;
testSection.set("type", "memory");
@@ -81,6 +90,11 @@ public:
return j_;
}
void resetCacheFor(std::uint32_t) override
{
(void)0;
}
std::shared_ptr<FullBelowCache> getFullBelowCache(std::uint32_t) override
{
return fbCache_;
@@ -118,13 +132,6 @@ public:
Throw<std::runtime_error>("missing node");
}
void
reset() override
{
fbCache_->reset();
tnCache_->reset();
}
beast::manual_clock<std::chrono::steady_clock>
clock()
{