Improve directory insertion & deletion (RIPD-1353, RIPD-1488):

This commit introduces the "SortedDirectories" amendment, which
addresses two distinct issues:

First, it corrects a technical flaw that could, in some edge cases,
prevent an empty intermediate page from being deleted.

Second, it sorts directory entries within a page (other than order
book page entries, which remain strictly FIFO). This makes insert
operations deterministic, instead of pseudo-random and reliant on
temporal ordering.

Lastly, it removes the ability to perform a "soft delete" where
the page number of the item to delete need not be known if the
item is in the first 20 pages, and enforces a maximum limit to
the number of pages that a directory can span.
This commit is contained in:
Nik Bougalis
2017-06-13 19:06:55 -07:00
committed by seelabs
parent 3666948610
commit 463b154e3d
24 changed files with 1186 additions and 529 deletions

View File

@@ -19,11 +19,13 @@
#include <BeastConfig.h>
#include <ripple/basics/chrono.h>
#include <ripple/ledger/BookDirs.h>
#include <ripple/ledger/ReadView.h>
#include <ripple/ledger/View.h>
#include <ripple/basics/contract.h>
#include <ripple/basics/Log.h>
#include <ripple/basics/StringUtilities.h>
#include <ripple/protocol/Feature.h>
#include <ripple/protocol/st.h>
#include <ripple/protocol/Protocol.h>
#include <ripple/protocol/Quality.h>
@@ -803,19 +805,28 @@ describeOwnerDir(AccountID const& account)
};
}
std::pair<TER, bool>
boost::optional<std::uint64_t>
dirAdd (ApplyView& view,
std::uint64_t& uNodeDir,
Keylet const& dir,
uint256 const& uLedgerIndex,
bool strictOrder,
std::function<void (SLE::ref)> fDescriber,
beast::Journal j)
{
if (view.rules().enabled(featureSortedDirectories))
{
if (strictOrder)
return view.dirAppend(dir, uLedgerIndex, fDescriber);
else
return view.dirInsert(dir, uLedgerIndex, fDescriber);
}
JLOG (j.trace()) << "dirAdd:" <<
" dir=" << to_string (dir.key) <<
" uLedgerIndex=" << to_string (uLedgerIndex);
auto sleRoot = view.peek(dir);
std::uint64_t uNodeDir = 0;
if (! sleRoot)
{
@@ -833,9 +844,7 @@ dirAdd (ApplyView& view,
"dirAdd: created root " << to_string (dir.key) <<
" for entry " << to_string (uLedgerIndex);
uNodeDir = 0;
return { tesSUCCESS, true };
return uNodeDir;
}
SLE::pointer sleNode;
@@ -865,7 +874,7 @@ dirAdd (ApplyView& view,
// Add to new node.
else if (!++uNodeDir)
{
return { tecDIR_FULL, false };
return boost::none;
}
else
{
@@ -901,28 +910,36 @@ dirAdd (ApplyView& view,
JLOG (j.trace()) <<
"dirAdd: appending: Node: " << strHex (uNodeDir);
return { tesSUCCESS, false };
return uNodeDir;
}
// Ledger must be in a state for this to work.
TER
dirDelete (ApplyView& view,
const bool bKeepRoot, // --> True, if we never completely clean up, after we overflow the root node.
const std::uint64_t& uNodeDir, // --> Node containing entry.
uint256 const& uRootIndex, // --> The index of the base of the directory. Nodes are based off of this.
std::uint64_t uNodeDir, // --> Node containing entry.
Keylet const& root, // --> The index of the base of the directory. Nodes are based off of this.
uint256 const& uLedgerIndex, // --> Value to remove from directory.
const bool bStable, // --> True, not to change relative order of entries.
const bool bSoft, // --> True, uNodeDir is not hard and fast (pass uNodeDir=0).
beast::Journal j)
{
if (view.rules().enabled(featureSortedDirectories))
{
if (view.dirRemove(root, uNodeDir, uLedgerIndex, bKeepRoot))
return tesSUCCESS;
return tefBAD_LEDGER;
}
std::uint64_t uNodeCur = uNodeDir;
SLE::pointer sleNode =
view.peek(keylet::page(uRootIndex, uNodeCur));
view.peek(keylet::page(root, uNodeCur));
if (!sleNode)
{
JLOG (j.warn()) << "dirDelete: no such node:" <<
" uRootIndex=" << to_string (uRootIndex) <<
" root=" << to_string (root.key) <<
" uNodeDir=" << strHex (uNodeDir) <<
" uLedgerIndex=" << to_string (uLedgerIndex);
@@ -936,7 +953,7 @@ dirDelete (ApplyView& view,
// Go the extra mile. Even if node doesn't exist, try the next node.
return dirDelete (view, bKeepRoot,
uNodeDir + 1, uRootIndex, uLedgerIndex, bStable, true, j);
uNodeDir + 1, root, uLedgerIndex, bStable, true, j);
}
else
{
@@ -961,7 +978,7 @@ dirDelete (ApplyView& view,
{
// Go the extra mile. Even if entry not in node, try the next node.
return dirDelete (view, bKeepRoot, uNodeDir + 1,
uRootIndex, uLedgerIndex, bStable, true, j);
root, uLedgerIndex, bStable, true, j);
}
return tefBAD_LEDGER;
@@ -1015,7 +1032,7 @@ dirDelete (ApplyView& view,
else
{
// Have only a root node and a last node.
auto sleLast = view.peek(keylet::page(uRootIndex, uNodeNext));
auto sleLast = view.peek(keylet::page(root, uNodeNext));
assert (sleLast);
@@ -1037,9 +1054,8 @@ dirDelete (ApplyView& view,
{
// Not root and not last node. Can delete node.
auto slePrevious =
view.peek(keylet::page(uRootIndex, uNodePrevious));
auto sleNext = view.peek(keylet::page(uRootIndex, uNodeNext));
auto slePrevious = view.peek(keylet::page(root, uNodePrevious));
auto sleNext = view.peek(keylet::page(root, uNodeNext));
assert (slePrevious);
if (!slePrevious)
{
@@ -1072,8 +1088,7 @@ dirDelete (ApplyView& view,
else
{
// Last and only node besides the root.
auto sleRoot = view.peek (keylet::page(uRootIndex));
auto sleRoot = view.peek(root);
assert (sleRoot);
if (sleRoot->getFieldV256 (sfIndexes).empty ())
@@ -1122,86 +1137,77 @@ trustCreate (ApplyView& view,
ltRIPPLE_STATE, uIndex);
view.insert (sleRippleState);
std::uint64_t uLowNode;
std::uint64_t uHighNode;
auto lowNode = dirAdd (view, keylet::ownerDir (uLowAccountID),
sleRippleState->key(), false, describeOwnerDir (uLowAccountID), j);
TER terResult;
if (!lowNode)
return tecDIR_FULL;
std::tie (terResult, std::ignore) = dirAdd (view,
uLowNode, keylet::ownerDir (uLowAccountID),
sleRippleState->key(),
describeOwnerDir (uLowAccountID), j);
auto highNode = dirAdd (view, keylet::ownerDir (uHighAccountID),
sleRippleState->key(), false, describeOwnerDir (uHighAccountID), j);
if (tesSUCCESS == terResult)
if (!highNode)
return tecDIR_FULL;
const bool bSetDst = saLimit.getIssuer () == uDstAccountID;
const bool bSetHigh = bSrcHigh ^ bSetDst;
assert (sleAccount->getAccountID (sfAccount) ==
(bSetHigh ? uHighAccountID : uLowAccountID));
auto slePeer = view.peek (keylet::account(
bSetHigh ? uLowAccountID : uHighAccountID));
assert (slePeer);
// Remember deletion hints.
sleRippleState->setFieldU64 (sfLowNode, *lowNode);
sleRippleState->setFieldU64 (sfHighNode, *highNode);
sleRippleState->setFieldAmount (
bSetHigh ? sfHighLimit : sfLowLimit, saLimit);
sleRippleState->setFieldAmount (
bSetHigh ? sfLowLimit : sfHighLimit,
STAmount ({saBalance.getCurrency (),
bSetDst ? uSrcAccountID : uDstAccountID}));
if (uQualityIn)
sleRippleState->setFieldU32 (
bSetHigh ? sfHighQualityIn : sfLowQualityIn, uQualityIn);
if (uQualityOut)
sleRippleState->setFieldU32 (
bSetHigh ? sfHighQualityOut : sfLowQualityOut, uQualityOut);
std::uint32_t uFlags = bSetHigh ? lsfHighReserve : lsfLowReserve;
if (bAuth)
{
std::tie (terResult, std::ignore) = dirAdd (view,
uHighNode, keylet::ownerDir (uHighAccountID),
sleRippleState->key(),
describeOwnerDir (uHighAccountID), j);
uFlags |= (bSetHigh ? lsfHighAuth : lsfLowAuth);
}
if (bNoRipple)
{
uFlags |= (bSetHigh ? lsfHighNoRipple : lsfLowNoRipple);
}
if (bFreeze)
{
uFlags |= (!bSetHigh ? lsfLowFreeze : lsfHighFreeze);
}
if (tesSUCCESS == terResult)
if ((slePeer->getFlags() & lsfDefaultRipple) == 0)
{
const bool bSetDst = saLimit.getIssuer () == uDstAccountID;
const bool bSetHigh = bSrcHigh ^ bSetDst;
assert (sleAccount->getAccountID (sfAccount) ==
(bSetHigh ? uHighAccountID : uLowAccountID));
auto slePeer = view.peek (keylet::account(
bSetHigh ? uLowAccountID : uHighAccountID));
assert (slePeer);
// Remember deletion hints.
sleRippleState->setFieldU64 (sfLowNode, uLowNode);
sleRippleState->setFieldU64 (sfHighNode, uHighNode);
sleRippleState->setFieldAmount (
bSetHigh ? sfHighLimit : sfLowLimit, saLimit);
sleRippleState->setFieldAmount (
bSetHigh ? sfLowLimit : sfHighLimit,
STAmount ({saBalance.getCurrency (),
bSetDst ? uSrcAccountID : uDstAccountID}));
if (uQualityIn)
sleRippleState->setFieldU32 (
bSetHigh ? sfHighQualityIn : sfLowQualityIn, uQualityIn);
if (uQualityOut)
sleRippleState->setFieldU32 (
bSetHigh ? sfHighQualityOut : sfLowQualityOut, uQualityOut);
std::uint32_t uFlags = bSetHigh ? lsfHighReserve : lsfLowReserve;
if (bAuth)
{
uFlags |= (bSetHigh ? lsfHighAuth : lsfLowAuth);
}
if (bNoRipple)
{
uFlags |= (bSetHigh ? lsfHighNoRipple : lsfLowNoRipple);
}
if (bFreeze)
{
uFlags |= (!bSetHigh ? lsfLowFreeze : lsfHighFreeze);
}
if ((slePeer->getFlags() & lsfDefaultRipple) == 0)
{
// The other side's default is no rippling
uFlags |= (bSetHigh ? lsfLowNoRipple : lsfHighNoRipple);
}
sleRippleState->setFieldU32 (sfFlags, uFlags);
adjustOwnerCount(view, sleAccount, 1, j);
// ONLY: Create ripple balance.
sleRippleState->setFieldAmount (sfBalance, bSetHigh ? -saBalance : saBalance);
view.creditHook (uSrcAccountID,
uDstAccountID, saBalance, saBalance.zeroed());
// The other side's default is no rippling
uFlags |= (bSetHigh ? lsfLowNoRipple : lsfHighNoRipple);
}
return terResult;
sleRippleState->setFieldU32 (sfFlags, uFlags);
adjustOwnerCount(view, sleAccount, 1, j);
// ONLY: Create ripple balance.
sleRippleState->setFieldAmount (sfBalance, bSetHigh ? -saBalance : saBalance);
view.creditHook (uSrcAccountID,
uDstAccountID, saBalance, saBalance.zeroed());
return tesSUCCESS;
}
TER
@@ -1223,7 +1229,7 @@ trustDelete (ApplyView& view,
terResult = dirDelete(view,
false,
uLowNode,
getOwnerDirIndex (uLowAccountID),
keylet::ownerDir (uLowAccountID),
sleRippleState->key(),
false,
!bLowNode,
@@ -1236,7 +1242,7 @@ trustDelete (ApplyView& view,
terResult = dirDelete (view,
false,
uHighNode,
getOwnerDirIndex (uHighAccountID),
keylet::ownerDir (uHighAccountID),
sleRippleState->key(),
false,
!bHighNode,
@@ -1261,14 +1267,12 @@ offerDelete (ApplyView& view,
// Detect legacy directories.
bool bOwnerNode = sle->isFieldPresent (sfOwnerNode);
std::uint64_t uOwnerNode = sle->getFieldU64 (sfOwnerNode);
uint256 uDirectory = sle->getFieldH256 (sfBookDirectory);
std::uint64_t uBookNode = sle->getFieldU64 (sfBookNode);
TER terResult = dirDelete (view, false, uOwnerNode,
getOwnerDirIndex (owner), offerIndex, false, !bOwnerNode, j);
TER terResult2 = dirDelete (view, false, uBookNode,
uDirectory, offerIndex, true, false, j);
TER terResult = dirDelete (view, false, sle->getFieldU64 (sfOwnerNode),
keylet::ownerDir (owner), offerIndex, false, !bOwnerNode, j);
TER terResult2 = dirDelete (view, false, sle->getFieldU64 (sfBookNode),
keylet::page (uDirectory), offerIndex, true, false, j);
if (tesSUCCESS == terResult)
adjustOwnerCount(view, view.peek(