mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-05 16:57:56 +00:00
Reduce lock contention in manifest cache:
This commit combines the `apply_mutex` and `read_mutex` into a single `mutex_` var. This new `mutex_` var is a `shared_mutex`, and most operations only need to lock it with a `shared_lock`. The only exception is `applyMutex`, which may need a `unique_lock`. One consequence of removing the `apply_mutex` is more than one `applyMutex` function can run at the same time. To help reduce lock contention that a `unique_lock` would cause, checks that only require reading data are run a `shared_lock` (call these the "prewriteChecks"), then the lock is released, then a `unique_lock` is acquired. Since a currently running `applyManifest` may write data between the time a `shared_lock` is released and the `write_lock` is acquired, the "prewriteChecks" need to be rerun. Duplicating this work isn't ideal, but the "prewirteChecks" are relatively inexpensive. A couple of other designs were considered. We could restrict more than one `applyMutex` function from running concurrently - either with a `applyMutex` or my setting the max number of manifest jobs on the job queue to one. The biggest issue with this is if any other function ever adds a write lock for any reason, `applyManifest` would not be broken - data could be written between the release of the `shared_lock` and the acquisition of the `unique_lock`. Note: it is tempting to solve this problem by not releasing the `shared_mutex` and simply upgrading the lock. In the presence of concurrently running `applyManifest` functions, this will deadlock (both function need to wait for the other to release their read locks before they can acquire a write lock).
This commit is contained in:
@@ -24,7 +24,9 @@
|
|||||||
#include <ripple/beast/utility/Journal.h>
|
#include <ripple/beast/utility/Journal.h>
|
||||||
#include <ripple/protocol/PublicKey.h>
|
#include <ripple/protocol/PublicKey.h>
|
||||||
#include <ripple/protocol/SecretKey.h>
|
#include <ripple/protocol/SecretKey.h>
|
||||||
|
|
||||||
#include <optional>
|
#include <optional>
|
||||||
|
#include <shared_mutex>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
@@ -223,9 +225,8 @@ class DatabaseCon;
|
|||||||
class ManifestCache
|
class ManifestCache
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
beast::Journal mutable j_;
|
beast::Journal j_;
|
||||||
std::mutex apply_mutex_;
|
std::shared_mutex mutable mutex_;
|
||||||
std::mutex mutable read_mutex_;
|
|
||||||
|
|
||||||
/** Active manifests stored by master public key. */
|
/** Active manifests stored by master public key. */
|
||||||
hash_map<PublicKey, Manifest> map_;
|
hash_map<PublicKey, Manifest> map_;
|
||||||
@@ -378,8 +379,10 @@ public:
|
|||||||
|
|
||||||
/** Invokes the callback once for every populated manifest.
|
/** Invokes the callback once for every populated manifest.
|
||||||
|
|
||||||
@note Undefined behavior results when calling ManifestCache members from
|
@note Do not call ManifestCache member functions from within the
|
||||||
within the callback
|
callback. This can re-lock the mutex from the same thread, which is UB.
|
||||||
|
@note Do not write ManifestCache member variables from within the
|
||||||
|
callback. This can lead to data races.
|
||||||
|
|
||||||
@param f Function called for each manifest
|
@param f Function called for each manifest
|
||||||
|
|
||||||
@@ -391,7 +394,7 @@ public:
|
|||||||
void
|
void
|
||||||
for_each_manifest(Function&& f) const
|
for_each_manifest(Function&& f) const
|
||||||
{
|
{
|
||||||
std::lock_guard lock{read_mutex_};
|
std::shared_lock lock{mutex_};
|
||||||
for (auto const& [_, manifest] : map_)
|
for (auto const& [_, manifest] : map_)
|
||||||
{
|
{
|
||||||
(void)_;
|
(void)_;
|
||||||
@@ -401,8 +404,10 @@ public:
|
|||||||
|
|
||||||
/** Invokes the callback once for every populated manifest.
|
/** Invokes the callback once for every populated manifest.
|
||||||
|
|
||||||
@note Undefined behavior results when calling ManifestCache members from
|
@note Do not call ManifestCache member functions from within the
|
||||||
within the callback
|
callback. This can re-lock the mutex from the same thread, which is UB.
|
||||||
|
@note Do not write ManifestCache member variables from
|
||||||
|
within the callback. This can lead to data races.
|
||||||
|
|
||||||
@param pf Pre-function called with the maximum number of times f will be
|
@param pf Pre-function called with the maximum number of times f will be
|
||||||
called (useful for memory allocations)
|
called (useful for memory allocations)
|
||||||
@@ -417,7 +422,7 @@ public:
|
|||||||
void
|
void
|
||||||
for_each_manifest(PreFun&& pf, EachFun&& f) const
|
for_each_manifest(PreFun&& pf, EachFun&& f) const
|
||||||
{
|
{
|
||||||
std::lock_guard lock{read_mutex_};
|
std::shared_lock lock{mutex_};
|
||||||
pf(map_.size());
|
pf(map_.size());
|
||||||
for (auto const& [_, manifest] : map_)
|
for (auto const& [_, manifest] : map_)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -28,8 +28,11 @@
|
|||||||
#include <ripple/json/json_reader.h>
|
#include <ripple/json/json_reader.h>
|
||||||
#include <ripple/protocol/PublicKey.h>
|
#include <ripple/protocol/PublicKey.h>
|
||||||
#include <ripple/protocol/Sign.h>
|
#include <ripple/protocol/Sign.h>
|
||||||
|
|
||||||
#include <boost/algorithm/string/trim.hpp>
|
#include <boost/algorithm/string/trim.hpp>
|
||||||
|
|
||||||
#include <numeric>
|
#include <numeric>
|
||||||
|
#include <shared_mutex>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
@@ -283,7 +286,7 @@ loadValidatorToken(std::vector<std::string> const& blob)
|
|||||||
PublicKey
|
PublicKey
|
||||||
ManifestCache::getSigningKey(PublicKey const& pk) const
|
ManifestCache::getSigningKey(PublicKey const& pk) const
|
||||||
{
|
{
|
||||||
std::lock_guard lock{read_mutex_};
|
std::shared_lock lock{mutex_};
|
||||||
auto const iter = map_.find(pk);
|
auto const iter = map_.find(pk);
|
||||||
|
|
||||||
if (iter != map_.end() && !iter->second.revoked())
|
if (iter != map_.end() && !iter->second.revoked())
|
||||||
@@ -295,7 +298,7 @@ ManifestCache::getSigningKey(PublicKey const& pk) const
|
|||||||
PublicKey
|
PublicKey
|
||||||
ManifestCache::getMasterKey(PublicKey const& pk) const
|
ManifestCache::getMasterKey(PublicKey const& pk) const
|
||||||
{
|
{
|
||||||
std::lock_guard lock{read_mutex_};
|
std::shared_lock lock{mutex_};
|
||||||
|
|
||||||
if (auto const iter = signingToMasterKeys_.find(pk);
|
if (auto const iter = signingToMasterKeys_.find(pk);
|
||||||
iter != signingToMasterKeys_.end())
|
iter != signingToMasterKeys_.end())
|
||||||
@@ -307,7 +310,7 @@ ManifestCache::getMasterKey(PublicKey const& pk) const
|
|||||||
std::optional<std::uint32_t>
|
std::optional<std::uint32_t>
|
||||||
ManifestCache::getSequence(PublicKey const& pk) const
|
ManifestCache::getSequence(PublicKey const& pk) const
|
||||||
{
|
{
|
||||||
std::lock_guard lock{read_mutex_};
|
std::shared_lock lock{mutex_};
|
||||||
auto const iter = map_.find(pk);
|
auto const iter = map_.find(pk);
|
||||||
|
|
||||||
if (iter != map_.end() && !iter->second.revoked())
|
if (iter != map_.end() && !iter->second.revoked())
|
||||||
@@ -319,7 +322,7 @@ ManifestCache::getSequence(PublicKey const& pk) const
|
|||||||
std::optional<std::string>
|
std::optional<std::string>
|
||||||
ManifestCache::getDomain(PublicKey const& pk) const
|
ManifestCache::getDomain(PublicKey const& pk) const
|
||||||
{
|
{
|
||||||
std::lock_guard lock{read_mutex_};
|
std::shared_lock lock{mutex_};
|
||||||
auto const iter = map_.find(pk);
|
auto const iter = map_.find(pk);
|
||||||
|
|
||||||
if (iter != map_.end() && !iter->second.revoked())
|
if (iter != map_.end() && !iter->second.revoked())
|
||||||
@@ -331,7 +334,7 @@ ManifestCache::getDomain(PublicKey const& pk) const
|
|||||||
std::optional<std::string>
|
std::optional<std::string>
|
||||||
ManifestCache::getManifest(PublicKey const& pk) const
|
ManifestCache::getManifest(PublicKey const& pk) const
|
||||||
{
|
{
|
||||||
std::lock_guard lock{read_mutex_};
|
std::shared_lock lock{mutex_};
|
||||||
auto const iter = map_.find(pk);
|
auto const iter = map_.find(pk);
|
||||||
|
|
||||||
if (iter != map_.end() && !iter->second.revoked())
|
if (iter != map_.end() && !iter->second.revoked())
|
||||||
@@ -343,7 +346,7 @@ ManifestCache::getManifest(PublicKey const& pk) const
|
|||||||
bool
|
bool
|
||||||
ManifestCache::revoked(PublicKey const& pk) const
|
ManifestCache::revoked(PublicKey const& pk) const
|
||||||
{
|
{
|
||||||
std::lock_guard lock{read_mutex_};
|
std::shared_lock lock{mutex_};
|
||||||
auto const iter = map_.find(pk);
|
auto const iter = map_.find(pk);
|
||||||
|
|
||||||
if (iter != map_.end())
|
if (iter != map_.end())
|
||||||
@@ -355,86 +358,115 @@ ManifestCache::revoked(PublicKey const& pk) const
|
|||||||
ManifestDisposition
|
ManifestDisposition
|
||||||
ManifestCache::applyManifest(Manifest m)
|
ManifestCache::applyManifest(Manifest m)
|
||||||
{
|
{
|
||||||
std::lock_guard applyLock{apply_mutex_};
|
// Check the manifest against the conditions that do not require a
|
||||||
|
// `unique_lock` (write lock) on the `mutex_`. Since the signature can be
|
||||||
|
// relatively expensive, the `checkSignature` parameter determines if the
|
||||||
|
// signature should be checked. Since `prewriteCheck` is run twice (see
|
||||||
|
// comment below), `checkSignature` only needs to be set to true on the
|
||||||
|
// first run.
|
||||||
|
auto prewriteCheck =
|
||||||
|
[this, &m](auto const& iter, bool checkSignature, auto const& lock)
|
||||||
|
-> std::optional<ManifestDisposition> {
|
||||||
|
assert(lock.owns_lock());
|
||||||
|
(void)lock; // not used. parameter is present to ensure the mutex is
|
||||||
|
// locked when the lambda is called.
|
||||||
|
if (iter != map_.end() && m.sequence <= iter->second.sequence)
|
||||||
|
{
|
||||||
|
// We received a manifest whose sequence number is not strictly
|
||||||
|
// greater than the one we already know about. This can happen in
|
||||||
|
// several cases including when we receive manifests from a peer who
|
||||||
|
// doesn't have the latest data.
|
||||||
|
if (auto stream = j_.debug())
|
||||||
|
logMftAct(
|
||||||
|
stream,
|
||||||
|
"Stale",
|
||||||
|
m.masterKey,
|
||||||
|
m.sequence,
|
||||||
|
iter->second.sequence);
|
||||||
|
return ManifestDisposition::stale;
|
||||||
|
}
|
||||||
|
|
||||||
// Before we spend time checking the signature, make sure the
|
if (checkSignature && !m.verify())
|
||||||
// sequence number is newer than any we have.
|
{
|
||||||
auto const iter = map_.find(m.masterKey);
|
if (auto stream = j_.warn())
|
||||||
|
logMftAct(stream, "Invalid", m.masterKey, m.sequence);
|
||||||
|
return ManifestDisposition::invalid;
|
||||||
|
}
|
||||||
|
|
||||||
if (iter != map_.end() && m.sequence <= iter->second.sequence)
|
// If the master key associated with a manifest is or might be
|
||||||
{
|
// compromised and is, therefore, no longer trustworthy.
|
||||||
// We received a manifest whose sequence number is not strictly greater
|
//
|
||||||
// than the one we already know about. This can happen in several cases
|
// A manifest revocation essentially marks a manifest as compromised. By
|
||||||
// including when we receive manifests from a peer who doesn't have the
|
// setting the sequence number to the highest value possible, the
|
||||||
// latest data.
|
// manifest is effectively neutered and cannot be superseded by a forged
|
||||||
if (auto stream = j_.debug())
|
// one.
|
||||||
logMftAct(
|
bool const revoked = m.revoked();
|
||||||
stream,
|
|
||||||
"Stale",
|
|
||||||
m.masterKey,
|
|
||||||
m.sequence,
|
|
||||||
iter->second.sequence);
|
|
||||||
return ManifestDisposition::stale;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now check the signature
|
if (auto stream = j_.warn(); stream && revoked)
|
||||||
if (!m.verify())
|
logMftAct(stream, "Revoked", m.masterKey, m.sequence);
|
||||||
{
|
|
||||||
if (auto stream = j_.warn())
|
|
||||||
logMftAct(stream, "Invalid", m.masterKey, m.sequence);
|
|
||||||
return ManifestDisposition::invalid;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the master key associated with a manifest is or might be compromised
|
// Sanity check: the master key of this manifest should not be used as
|
||||||
// and is, therefore, no longer trustworthy.
|
// the ephemeral key of another manifest:
|
||||||
//
|
if (auto const x = signingToMasterKeys_.find(m.masterKey);
|
||||||
// A manifest revocation essentially marks a manifest as compromised. By
|
|
||||||
// setting the sequence number to the highest value possible, the manifest
|
|
||||||
// is effectively neutered and cannot be superseded by a forged one.
|
|
||||||
bool const revoked = m.revoked();
|
|
||||||
|
|
||||||
if (auto stream = j_.warn(); stream && revoked)
|
|
||||||
logMftAct(stream, "Revoked", m.masterKey, m.sequence);
|
|
||||||
|
|
||||||
std::lock_guard readLock{read_mutex_};
|
|
||||||
|
|
||||||
// Sanity check: the master key of this manifest should not be used as
|
|
||||||
// the ephemeral key of another manifest:
|
|
||||||
if (auto const x = signingToMasterKeys_.find(m.masterKey);
|
|
||||||
x != signingToMasterKeys_.end())
|
|
||||||
{
|
|
||||||
JLOG(j_.warn()) << to_string(m)
|
|
||||||
<< ": Master key already used as ephemeral key for "
|
|
||||||
<< toBase58(TokenType::NodePublic, x->second);
|
|
||||||
|
|
||||||
return ManifestDisposition::badMasterKey;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!revoked)
|
|
||||||
{
|
|
||||||
// Sanity check: the ephemeral key of this manifest should not be used
|
|
||||||
// as the master or ephemeral key of another manifest:
|
|
||||||
if (auto const x = signingToMasterKeys_.find(m.signingKey);
|
|
||||||
x != signingToMasterKeys_.end())
|
x != signingToMasterKeys_.end())
|
||||||
{
|
{
|
||||||
JLOG(j_.warn())
|
JLOG(j_.warn()) << to_string(m)
|
||||||
<< to_string(m)
|
<< ": Master key already used as ephemeral key for "
|
||||||
<< ": Ephemeral key already used as ephemeral key for "
|
<< toBase58(TokenType::NodePublic, x->second);
|
||||||
<< toBase58(TokenType::NodePublic, x->second);
|
|
||||||
|
|
||||||
return ManifestDisposition::badEphemeralKey;
|
return ManifestDisposition::badMasterKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (auto const x = map_.find(m.signingKey); x != map_.end())
|
if (!revoked)
|
||||||
{
|
{
|
||||||
JLOG(j_.warn())
|
// Sanity check: the ephemeral key of this manifest should not be
|
||||||
<< to_string(m) << ": Ephemeral key used as master key for "
|
// used as the master or ephemeral key of another manifest:
|
||||||
<< to_string(x->second);
|
if (auto const x = signingToMasterKeys_.find(m.signingKey);
|
||||||
|
x != signingToMasterKeys_.end())
|
||||||
|
{
|
||||||
|
JLOG(j_.warn())
|
||||||
|
<< to_string(m)
|
||||||
|
<< ": Ephemeral key already used as ephemeral key for "
|
||||||
|
<< toBase58(TokenType::NodePublic, x->second);
|
||||||
|
|
||||||
return ManifestDisposition::badEphemeralKey;
|
return ManifestDisposition::badEphemeralKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (auto const x = map_.find(m.signingKey); x != map_.end())
|
||||||
|
{
|
||||||
|
JLOG(j_.warn())
|
||||||
|
<< to_string(m) << ": Ephemeral key used as master key for "
|
||||||
|
<< to_string(x->second);
|
||||||
|
|
||||||
|
return ManifestDisposition::badEphemeralKey;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return std::nullopt;
|
||||||
|
};
|
||||||
|
|
||||||
|
{
|
||||||
|
std::shared_lock sl{mutex_};
|
||||||
|
if (auto d =
|
||||||
|
prewriteCheck(map_.find(m.masterKey), /*checkSig*/ true, sl))
|
||||||
|
return *d;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::unique_lock sl{mutex_};
|
||||||
|
auto const iter = map_.find(m.masterKey);
|
||||||
|
// Since we released the previously held read lock, it's possible that the
|
||||||
|
// collections have been written to. This means we need to run
|
||||||
|
// `prewriteCheck` again. This re-does work, but `prewriteCheck` is
|
||||||
|
// relatively inexpensive to run, and doing it this way allows us to run
|
||||||
|
// `prewriteCheck` under a `shared_lock` above.
|
||||||
|
// Note, the signature has already been checked above, so it
|
||||||
|
// doesn't need to happen again (signature checks are somewhat expensive).
|
||||||
|
// Note: It's a mistake to use an upgradable lock. This is a recipe for
|
||||||
|
// deadlock.
|
||||||
|
if (auto d = prewriteCheck(iter, /*checkSig*/ false, sl))
|
||||||
|
return *d;
|
||||||
|
|
||||||
|
bool const revoked = m.revoked();
|
||||||
// This is the first manifest we are seeing for a master key. This should
|
// This is the first manifest we are seeing for a master key. This should
|
||||||
// only ever happen once per validator run.
|
// only ever happen once per validator run.
|
||||||
if (iter == map_.end())
|
if (iter == map_.end())
|
||||||
@@ -543,7 +575,7 @@ ManifestCache::save(
|
|||||||
std::string const& dbTable,
|
std::string const& dbTable,
|
||||||
std::function<bool(PublicKey const&)> const& isTrusted)
|
std::function<bool(PublicKey const&)> const& isTrusted)
|
||||||
{
|
{
|
||||||
std::lock_guard lock{apply_mutex_};
|
std::shared_lock lock{mutex_};
|
||||||
auto db = dbCon.checkoutDb();
|
auto db = dbCon.checkoutDb();
|
||||||
|
|
||||||
saveManifests(*db, dbTable, isTrusted, map_, j_);
|
saveManifests(*db, dbTable, isTrusted, map_, j_);
|
||||||
|
|||||||
@@ -52,6 +52,7 @@ enum JobType {
|
|||||||
jtRPC, // A websocket command from the client
|
jtRPC, // A websocket command from the client
|
||||||
jtSWEEP, // Sweep for stale structures
|
jtSWEEP, // Sweep for stale structures
|
||||||
jtVALIDATION_ut, // A validation from an untrusted source
|
jtVALIDATION_ut, // A validation from an untrusted source
|
||||||
|
jtMANIFEST, // A validator's manifest
|
||||||
jtUPDATE_PF, // Update pathfinding requests
|
jtUPDATE_PF, // Update pathfinding requests
|
||||||
jtTRANSACTION_l, // A local transaction
|
jtTRANSACTION_l, // A local transaction
|
||||||
jtREPLAY_REQ, // Peer request a ledger delta or a skip list
|
jtREPLAY_REQ, // Peer request a ledger delta or a skip list
|
||||||
|
|||||||
@@ -72,6 +72,7 @@ private:
|
|||||||
add(jtPACK, "makeFetchPack", 1, 0ms, 0ms);
|
add(jtPACK, "makeFetchPack", 1, 0ms, 0ms);
|
||||||
add(jtPUBOLDLEDGER, "publishAcqLedger", 2, 10000ms, 15000ms);
|
add(jtPUBOLDLEDGER, "publishAcqLedger", 2, 10000ms, 15000ms);
|
||||||
add(jtVALIDATION_ut, "untrustedValidation", maxLimit, 2000ms, 5000ms);
|
add(jtVALIDATION_ut, "untrustedValidation", maxLimit, 2000ms, 5000ms);
|
||||||
|
add(jtMANIFEST, "manifest", maxLimit, 2000ms, 5000ms);
|
||||||
add(jtTRANSACTION_l, "localTransaction", maxLimit, 100ms, 500ms);
|
add(jtTRANSACTION_l, "localTransaction", maxLimit, 100ms, 500ms);
|
||||||
add(jtREPLAY_REQ, "ledgerReplayRequest", 10, 250ms, 1000ms);
|
add(jtREPLAY_REQ, "ledgerReplayRequest", 10, 250ms, 1000ms);
|
||||||
add(jtLEDGER_REQ, "ledgerRequest", 3, 0ms, 0ms);
|
add(jtLEDGER_REQ, "ledgerRequest", 3, 0ms, 0ms);
|
||||||
|
|||||||
@@ -603,7 +603,7 @@ PeerImp::fail(std::string const& reason)
|
|||||||
return post(
|
return post(
|
||||||
strand_,
|
strand_,
|
||||||
std::bind(
|
std::bind(
|
||||||
(void (Peer::*)(std::string const&)) & PeerImp::fail,
|
(void(Peer::*)(std::string const&)) & PeerImp::fail,
|
||||||
shared_from_this(),
|
shared_from_this(),
|
||||||
reason));
|
reason));
|
||||||
if (journal_.active(beast::severities::kWarning) && socket_.is_open())
|
if (journal_.active(beast::severities::kWarning) && socket_.is_open())
|
||||||
@@ -1067,10 +1067,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMManifests> const& m)
|
|||||||
if (s > 100)
|
if (s > 100)
|
||||||
fee_ = Resource::feeMediumBurdenPeer;
|
fee_ = Resource::feeMediumBurdenPeer;
|
||||||
|
|
||||||
// VFALCO What's the right job type?
|
|
||||||
auto that = shared_from_this();
|
|
||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
jtVALIDATION_ut, "receiveManifests", [this, that, m]() {
|
jtMANIFEST, "receiveManifests", [this, that = shared_from_this(), m]() {
|
||||||
overlay_.onManifests(m, that);
|
overlay_.onManifests(m, that);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user