Correct a technical flaw with the spinlock locking:

The existing spinlock code, used to protect SHAMapInnerNode
child lists, has a mistake that can allow the same child to
be repeatedly locked under some circumstances.

The bug was in the `SpinBitLock::lock` loop condition check
and would result in the loop terminating early.

This commit fixes this and further simplifies the lock loop
making the correctness of the code easier to verify without
sacrificing performance.

It also promotes the spinlock class from an implementation
detail to a more general purpose, easier to use lock class
with clearer semantics. Two different lock types now allow
developers to easily grab either a single spinlock from an
a group of spinlocks (packed in an unsigned integer) or to
grab all of the spinlocks at once.

While this commit makes spinlocks more widely available to
developers, they are rarely the best tool for the job. Use
them judiciously and only after careful consideration.
This commit is contained in:
Nik Bougalis
2022-06-08 12:18:04 -07:00
parent 59326bbbc5
commit 7e46f5342b
3 changed files with 229 additions and 88 deletions

View File

@@ -22,102 +22,19 @@
#include <ripple/basics/Log.h>
#include <ripple/basics/Slice.h>
#include <ripple/basics/contract.h>
#include <ripple/basics/spinlock.h>
#include <ripple/beast/core/LexicalCast.h>
#include <ripple/protocol/HashPrefix.h>
#include <ripple/protocol/digest.h>
#include <ripple/shamap/SHAMapTreeNode.h>
#include <ripple/shamap/impl/TaggedPointer.ipp>
#include <openssl/sha.h>
#include <algorithm>
#include <iterator>
#include <utility>
#ifndef __aarch64__
// This is used for the _mm_pause instruction:
#include <immintrin.h>
#endif
namespace ripple {
/** A specialized 16-way spinlock used to protect inner node branches.
This class packs 16 separate spinlocks into a single 16-bit value. It makes
it possible to lock any one lock at once or, alternatively, all together.
The implementation tries to use portable constructs but has to be low-level
for performance.
*/
class SpinBitlock
{
private:
std::atomic<std::uint16_t>& bits_;
std::uint16_t mask_;
public:
SpinBitlock(std::atomic<std::uint16_t>& lock) : bits_(lock), mask_(0xFFFF)
{
}
SpinBitlock(std::atomic<std::uint16_t>& lock, int index)
: bits_(lock), mask_(1 << index)
{
assert(index >= 0 && index < 16);
}
[[nodiscard]] bool
try_lock()
{
// If we want to grab all the individual bitlocks at once we cannot
// use `fetch_or`! To see why, imagine that `lock_ == 0x0020` which
// means that the `fetch_or` would return `0x0020` but all the bits
// would already be (incorrectly!) set. Oops!
std::uint16_t expected = 0;
if (mask_ != 0xFFFF)
return (bits_.fetch_or(mask_, std::memory_order_acquire) & mask_) ==
expected;
return bits_.compare_exchange_weak(
expected,
mask_,
std::memory_order_acquire,
std::memory_order_relaxed);
}
void
lock()
{
// Testing suggests that 99.9999% of the time this will succeed, so
// we try to optimize the fast path.
if (try_lock())
return;
do
{
// We try to spin for a few times:
for (int i = 0; i != 100; ++i)
{
if (try_lock())
return;
#ifndef __aarch64__
_mm_pause();
#endif
}
std::this_thread::yield();
} while ((bits_.load(std::memory_order_relaxed) & mask_) == 0);
}
void
unlock()
{
bits_.fetch_and(~mask_, std::memory_order_release);
}
};
SHAMapInnerNode::SHAMapInnerNode(
std::uint32_t cowid,
std::uint8_t numAllocatedChildren)
@@ -185,7 +102,7 @@ SHAMapInnerNode::clone(std::uint32_t cowid) const
});
}
SpinBitlock sl(lock_);
spinlock sl(lock_);
std::lock_guard lock(sl);
if (thisIsSparse)
@@ -422,7 +339,7 @@ SHAMapInnerNode::getChildPointer(int branch)
auto const index = *getChildIndex(branch);
SpinBitlock sl(lock_, index);
packed_spinlock sl(lock_, index);
std::lock_guard lock(sl);
return hashesAndChildren_.getChildren()[index].get();
}
@@ -435,7 +352,7 @@ SHAMapInnerNode::getChild(int branch)
auto const index = *getChildIndex(branch);
SpinBitlock sl(lock_, index);
packed_spinlock sl(lock_, index);
std::lock_guard lock(sl);
return hashesAndChildren_.getChildren()[index];
}
@@ -462,7 +379,7 @@ SHAMapInnerNode::canonicalizeChild(
auto [_, hashes, children] = hashesAndChildren_.getHashesAndChildren();
assert(node->getHash() == hashes[childIndex]);
SpinBitlock sl(lock_, childIndex);
packed_spinlock sl(lock_, childIndex);
std::lock_guard lock(sl);
if (children[childIndex])