mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-18 18:15:50 +00:00
Compare commits
6 Commits
vlntb/fix-
...
vlntb/remo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7ce692db58 | ||
|
|
9ec2d7f8ff | ||
|
|
4a084ce34c | ||
|
|
3502df2174 | ||
|
|
fa1e25abef | ||
|
|
217ba8dd4d |
@@ -132,7 +132,7 @@ enum LedgerSpecificFlags {
|
||||
lsfNoFreeze = 0x00200000, // True, cannot freeze ripple states
|
||||
lsfGlobalFreeze = 0x00400000, // True, all assets frozen
|
||||
lsfDefaultRipple =
|
||||
0x00800000, // True, trust lines allow rippling by default
|
||||
0x00800000, // True, incoming trust lines allow rippling by default
|
||||
lsfDepositAuth = 0x01000000, // True, all deposits require authorization
|
||||
/* // reserved for Hooks amendment
|
||||
lsfTshCollect = 0x02000000, // True, allow TSH collect-calls to acc hooks
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
//==============================================================================
|
||||
|
||||
#include <xrpld/app/misc/HashRouter.h>
|
||||
#include <xrpld/core/Config.h>
|
||||
|
||||
#include <xrpl/basics/chrono.h>
|
||||
#include <xrpl/beast/unit_test.h>
|
||||
@@ -27,12 +28,22 @@ namespace test {
|
||||
|
||||
class HashRouter_test : public beast::unit_test::suite
|
||||
{
|
||||
HashRouter::Setup
|
||||
getSetup(std::chrono::seconds hold, std::chrono::seconds relay)
|
||||
{
|
||||
HashRouter::Setup setup;
|
||||
setup.holdTime = hold;
|
||||
setup.relayTime = relay;
|
||||
return setup;
|
||||
}
|
||||
|
||||
void
|
||||
testNonExpiration()
|
||||
{
|
||||
testcase("Non-expiration");
|
||||
using namespace std::chrono_literals;
|
||||
TestStopwatch stopwatch;
|
||||
HashRouter router(stopwatch, 2s);
|
||||
HashRouter router(getSetup(2s, 1s), stopwatch);
|
||||
|
||||
uint256 const key1(1);
|
||||
uint256 const key2(2);
|
||||
@@ -67,9 +78,10 @@ class HashRouter_test : public beast::unit_test::suite
|
||||
void
|
||||
testExpiration()
|
||||
{
|
||||
testcase("Expiration");
|
||||
using namespace std::chrono_literals;
|
||||
TestStopwatch stopwatch;
|
||||
HashRouter router(stopwatch, 2s);
|
||||
HashRouter router(getSetup(2s, 1s), stopwatch);
|
||||
|
||||
uint256 const key1(1);
|
||||
uint256 const key2(2);
|
||||
@@ -144,10 +156,11 @@ class HashRouter_test : public beast::unit_test::suite
|
||||
void
|
||||
testSuppression()
|
||||
{
|
||||
testcase("Suppression");
|
||||
// Normal HashRouter
|
||||
using namespace std::chrono_literals;
|
||||
TestStopwatch stopwatch;
|
||||
HashRouter router(stopwatch, 2s);
|
||||
HashRouter router(getSetup(2s, 1s), stopwatch);
|
||||
|
||||
uint256 const key1(1);
|
||||
uint256 const key2(2);
|
||||
@@ -173,9 +186,10 @@ class HashRouter_test : public beast::unit_test::suite
|
||||
void
|
||||
testSetFlags()
|
||||
{
|
||||
testcase("Set Flags");
|
||||
using namespace std::chrono_literals;
|
||||
TestStopwatch stopwatch;
|
||||
HashRouter router(stopwatch, 2s);
|
||||
HashRouter router(getSetup(2s, 1s), stopwatch);
|
||||
|
||||
uint256 const key1(1);
|
||||
BEAST_EXPECT(router.setFlags(key1, 10));
|
||||
@@ -186,9 +200,10 @@ class HashRouter_test : public beast::unit_test::suite
|
||||
void
|
||||
testRelay()
|
||||
{
|
||||
testcase("Relay");
|
||||
using namespace std::chrono_literals;
|
||||
TestStopwatch stopwatch;
|
||||
HashRouter router(stopwatch, 1s);
|
||||
HashRouter router(getSetup(50s, 1s), stopwatch);
|
||||
|
||||
uint256 const key1(1);
|
||||
|
||||
@@ -229,9 +244,10 @@ class HashRouter_test : public beast::unit_test::suite
|
||||
void
|
||||
testProcess()
|
||||
{
|
||||
testcase("Process");
|
||||
using namespace std::chrono_literals;
|
||||
TestStopwatch stopwatch;
|
||||
HashRouter router(stopwatch, 5s);
|
||||
HashRouter router(getSetup(5s, 1s), stopwatch);
|
||||
uint256 const key(1);
|
||||
HashRouter::PeerShortID peer = 1;
|
||||
int flags;
|
||||
@@ -243,6 +259,111 @@ class HashRouter_test : public beast::unit_test::suite
|
||||
BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s));
|
||||
}
|
||||
|
||||
void
|
||||
testSetup()
|
||||
{
|
||||
testcase("setup_HashRouter");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
{
|
||||
Config cfg;
|
||||
// default
|
||||
auto const setup = setup_HashRouter(cfg);
|
||||
BEAST_EXPECT(setup.holdTime == 300s);
|
||||
BEAST_EXPECT(setup.relayTime == 30s);
|
||||
}
|
||||
{
|
||||
Config cfg;
|
||||
// non-default
|
||||
auto& h = cfg.section("hashrouter");
|
||||
h.set("hold_time", "600");
|
||||
h.set("relay_time", "15");
|
||||
auto const setup = setup_HashRouter(cfg);
|
||||
BEAST_EXPECT(setup.holdTime == 600s);
|
||||
BEAST_EXPECT(setup.relayTime == 15s);
|
||||
}
|
||||
{
|
||||
Config cfg;
|
||||
// equal
|
||||
auto& h = cfg.section("hashrouter");
|
||||
h.set("hold_time", "400");
|
||||
h.set("relay_time", "400");
|
||||
auto const setup = setup_HashRouter(cfg);
|
||||
BEAST_EXPECT(setup.holdTime == 400s);
|
||||
BEAST_EXPECT(setup.relayTime == 400s);
|
||||
}
|
||||
{
|
||||
Config cfg;
|
||||
// wrong order
|
||||
auto& h = cfg.section("hashrouter");
|
||||
h.set("hold_time", "60");
|
||||
h.set("relay_time", "120");
|
||||
try
|
||||
{
|
||||
setup_HashRouter(cfg);
|
||||
fail();
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
std::string expected =
|
||||
"HashRouter relay time must be less than or equal to hold "
|
||||
"time";
|
||||
BEAST_EXPECT(e.what() == expected);
|
||||
}
|
||||
}
|
||||
{
|
||||
Config cfg;
|
||||
// too small hold
|
||||
auto& h = cfg.section("hashrouter");
|
||||
h.set("hold_time", "10");
|
||||
h.set("relay_time", "120");
|
||||
try
|
||||
{
|
||||
setup_HashRouter(cfg);
|
||||
fail();
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
std::string expected =
|
||||
"HashRouter hold time must be at least 12 seconds (the "
|
||||
"approximate validation time for three "
|
||||
"ledgers).";
|
||||
BEAST_EXPECT(e.what() == expected);
|
||||
}
|
||||
}
|
||||
{
|
||||
Config cfg;
|
||||
// too small relay
|
||||
auto& h = cfg.section("hashrouter");
|
||||
h.set("hold_time", "500");
|
||||
h.set("relay_time", "6");
|
||||
try
|
||||
{
|
||||
setup_HashRouter(cfg);
|
||||
fail();
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
std::string expected =
|
||||
"HashRouter relay time must be at least 8 seconds (the "
|
||||
"approximate validation time for two ledgers).";
|
||||
BEAST_EXPECT(e.what() == expected);
|
||||
}
|
||||
}
|
||||
{
|
||||
Config cfg;
|
||||
// garbage
|
||||
auto& h = cfg.section("hashrouter");
|
||||
h.set("hold_time", "alice");
|
||||
h.set("relay_time", "bob");
|
||||
auto const setup = setup_HashRouter(cfg);
|
||||
// The set function ignores values that don't covert, so the
|
||||
// defaults are left unchanged
|
||||
BEAST_EXPECT(setup.holdTime == 300s);
|
||||
BEAST_EXPECT(setup.relayTime == 30s);
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
void
|
||||
run() override
|
||||
@@ -253,6 +374,7 @@ public:
|
||||
testSetFlags();
|
||||
testRelay();
|
||||
testProcess();
|
||||
testSetup();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -34,6 +34,11 @@ namespace ripple {
|
||||
class LocalTxs
|
||||
{
|
||||
public:
|
||||
// The number of ledgers to hold a transaction is essentially
|
||||
// arbitrary. It should be sufficient to allow the transaction to
|
||||
// get into a fully-validated ledger.
|
||||
static constexpr int holdLedgers = 5;
|
||||
|
||||
virtual ~LocalTxs() = default;
|
||||
|
||||
// Add a new local transaction
|
||||
|
||||
@@ -454,25 +454,17 @@ LedgerMaster::storeLedger(std::shared_ptr<Ledger const> ledger)
|
||||
void
|
||||
LedgerMaster::applyHeldTransactions()
|
||||
{
|
||||
std::lock_guard sl(m_mutex);
|
||||
CanonicalTXSet const set = [this]() {
|
||||
std::lock_guard sl(m_mutex);
|
||||
// VFALCO NOTE The hash for an open ledger is undefined so we use
|
||||
// something that is a reasonable substitute.
|
||||
CanonicalTXSet set(app_.openLedger().current()->info().parentHash);
|
||||
std::swap(mHeldTransactions, set);
|
||||
return set;
|
||||
}();
|
||||
|
||||
app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
|
||||
bool any = false;
|
||||
for (auto const& it : mHeldTransactions)
|
||||
{
|
||||
ApplyFlags flags = tapNONE;
|
||||
auto const result =
|
||||
app_.getTxQ().apply(app_, view, it.second, flags, j);
|
||||
any |= result.applied;
|
||||
}
|
||||
return any;
|
||||
});
|
||||
|
||||
// VFALCO TODO recreate the CanonicalTxSet object instead of resetting
|
||||
// it.
|
||||
// VFALCO NOTE The hash for an open ledger is undefined so we use
|
||||
// something that is a reasonable substitute.
|
||||
mHeldTransactions.reset(app_.openLedger().current()->info().parentHash);
|
||||
if (!set.empty())
|
||||
app_.getOPs().processTransactionSet(set);
|
||||
}
|
||||
|
||||
std::shared_ptr<STTx const>
|
||||
|
||||
@@ -53,14 +53,9 @@ namespace ripple {
|
||||
class LocalTx
|
||||
{
|
||||
public:
|
||||
// The number of ledgers to hold a transaction is essentially
|
||||
// arbitrary. It should be sufficient to allow the transaction to
|
||||
// get into a fully-validated ledger.
|
||||
static int const holdLedgers = 5;
|
||||
|
||||
LocalTx(LedgerIndex index, std::shared_ptr<STTx const> const& txn)
|
||||
: m_txn(txn)
|
||||
, m_expire(index + holdLedgers)
|
||||
, m_expire(index + LocalTxs::holdLedgers)
|
||||
, m_id(txn->getTransactionID())
|
||||
, m_account(txn->getAccountID(sfAccount))
|
||||
, m_seqProxy(txn->getSeqProxy())
|
||||
|
||||
@@ -442,8 +442,8 @@ public:
|
||||
std::make_unique<LoadFeeTrack>(logs_->journal("LoadManager")))
|
||||
|
||||
, hashRouter_(std::make_unique<HashRouter>(
|
||||
stopwatch(),
|
||||
HashRouter::getDefaultHoldTime()))
|
||||
setup_HashRouter(*config_),
|
||||
stopwatch()))
|
||||
|
||||
, mValidations(
|
||||
ValidationParms(),
|
||||
|
||||
@@ -20,11 +20,15 @@
|
||||
#ifndef RIPPLE_APP_MAIN_TUNING_H_INCLUDED
|
||||
#define RIPPLE_APP_MAIN_TUNING_H_INCLUDED
|
||||
|
||||
#include <chrono>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
constexpr std::size_t fullBelowTargetSize = 524288;
|
||||
constexpr std::chrono::seconds fullBelowExpiration = std::chrono::minutes{10};
|
||||
|
||||
constexpr std::size_t maxPoppedTransactions = 10;
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
#endif
|
||||
|
||||
@@ -66,18 +66,22 @@ CanonicalTXSet::popAcctTransaction(std::shared_ptr<STTx const> const& tx)
|
||||
// 1. Prioritize transactions with Sequences over transactions with
|
||||
// Tickets.
|
||||
//
|
||||
// 2. Don't worry about consecutive Sequence numbers. Creating Tickets
|
||||
// can introduce a discontinuity in Sequence numbers.
|
||||
// 2. For transactions not using Tickets, look for consecutive Sequence
|
||||
// numbers. For transactions using Tickets, don't worry about
|
||||
// consecutive Sequence numbers. Tickets can process out of order.
|
||||
//
|
||||
// 3. After handling all transactions with Sequences, return Tickets
|
||||
// with the lowest Ticket ID first.
|
||||
std::shared_ptr<STTx const> result;
|
||||
uint256 const effectiveAccount{accountKey(tx->getAccountID(sfAccount))};
|
||||
|
||||
Key const after(effectiveAccount, tx->getSeqProxy(), beast::zero);
|
||||
auto const seqProxy = tx->getSeqProxy();
|
||||
Key const after(effectiveAccount, seqProxy, beast::zero);
|
||||
auto const itrNext{map_.lower_bound(after)};
|
||||
if (itrNext != map_.end() &&
|
||||
itrNext->first.getAccount() == effectiveAccount)
|
||||
itrNext->first.getAccount() == effectiveAccount &&
|
||||
(!itrNext->second->getSeqProxy().isSeq() ||
|
||||
itrNext->second->getSeqProxy().value() == seqProxy.value() + 1))
|
||||
{
|
||||
result = std::move(itrNext->second);
|
||||
map_.erase(itrNext);
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
//==============================================================================
|
||||
|
||||
#include <xrpld/app/misc/HashRouter.h>
|
||||
#include <xrpld/core/Config.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
@@ -33,7 +34,7 @@ HashRouter::emplace(uint256 const& key) -> std::pair<Entry&, bool>
|
||||
}
|
||||
|
||||
// See if any supressions need to be expired
|
||||
expire(suppressionMap_, holdTime_);
|
||||
expire(suppressionMap_, setup_.holdTime);
|
||||
|
||||
return std::make_pair(
|
||||
std::ref(suppressionMap_.emplace(key, Entry()).first->second), true);
|
||||
@@ -122,10 +123,45 @@ HashRouter::shouldRelay(uint256 const& key)
|
||||
|
||||
auto& s = emplace(key).first;
|
||||
|
||||
if (!s.shouldRelay(suppressionMap_.clock().now(), holdTime_))
|
||||
if (!s.shouldRelay(suppressionMap_.clock().now(), setup_.relayTime))
|
||||
return {};
|
||||
|
||||
return s.releasePeerSet();
|
||||
}
|
||||
|
||||
HashRouter::Setup
|
||||
setup_HashRouter(Config const& config)
|
||||
{
|
||||
using namespace std::chrono;
|
||||
|
||||
HashRouter::Setup setup;
|
||||
auto const& section = config.section("hashrouter");
|
||||
|
||||
std::int32_t tmp;
|
||||
|
||||
if (set(tmp, "hold_time", section))
|
||||
{
|
||||
if (tmp < 12)
|
||||
Throw<std::runtime_error>(
|
||||
"HashRouter hold time must be at least 12 seconds (the "
|
||||
"approximate validation time for three ledgers).");
|
||||
setup.holdTime = seconds(tmp);
|
||||
}
|
||||
if (set(tmp, "relay_time", section))
|
||||
{
|
||||
if (tmp < 8)
|
||||
Throw<std::runtime_error>(
|
||||
"HashRouter relay time must be at least 8 seconds (the "
|
||||
"approximate validation time for two ledgers).");
|
||||
setup.relayTime = seconds(tmp);
|
||||
}
|
||||
if (setup.relayTime > setup.holdTime)
|
||||
{
|
||||
Throw<std::runtime_error>(
|
||||
"HashRouter relay time must be less than or equal to hold time");
|
||||
}
|
||||
|
||||
return setup;
|
||||
}
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
@@ -33,6 +33,7 @@ namespace ripple {
|
||||
// TODO convert these macros to int constants or an enum
|
||||
#define SF_BAD 0x02 // Temporarily bad
|
||||
#define SF_SAVED 0x04
|
||||
#define SF_HELD 0x08 // Held by LedgerMaster after potential processing failure
|
||||
#define SF_TRUSTED 0x10 // comes from trusted source
|
||||
|
||||
// Private flags, used internally in apply.cpp.
|
||||
@@ -44,6 +45,8 @@ namespace ripple {
|
||||
#define SF_PRIVATE5 0x1000
|
||||
#define SF_PRIVATE6 0x2000
|
||||
|
||||
class Config;
|
||||
|
||||
/** Routing table for objects identified by hash.
|
||||
|
||||
This table keeps track of which hashes have been received by which peers.
|
||||
@@ -56,6 +59,30 @@ public:
|
||||
// The type here *MUST* match the type of Peer::id_t
|
||||
using PeerShortID = std::uint32_t;
|
||||
|
||||
/** Structure used to customize @ref HashRouter behavior.
|
||||
*
|
||||
* Even though these items are configurable, they are undocumented. Don't
|
||||
* change them unless there is a good reason, and network-wide coordination
|
||||
* to do it.
|
||||
*
|
||||
* Configuration is processed in setup_HashRouter.
|
||||
*/
|
||||
struct Setup
|
||||
{
|
||||
/// Default constructor
|
||||
explicit Setup() = default;
|
||||
|
||||
using seconds = std::chrono::seconds;
|
||||
|
||||
/** Expiration time for a hash entry
|
||||
*/
|
||||
seconds holdTime{300};
|
||||
|
||||
/** Amount of time required before a relayed item will be relayed again.
|
||||
*/
|
||||
seconds relayTime{30};
|
||||
};
|
||||
|
||||
private:
|
||||
/** An entry in the routing table.
|
||||
*/
|
||||
@@ -108,9 +135,9 @@ private:
|
||||
bool
|
||||
shouldRelay(
|
||||
Stopwatch::time_point const& now,
|
||||
std::chrono::seconds holdTime)
|
||||
std::chrono::seconds relayTime)
|
||||
{
|
||||
if (relayed_ && *relayed_ + holdTime > now)
|
||||
if (relayed_ && *relayed_ + relayTime > now)
|
||||
return false;
|
||||
relayed_.emplace(now);
|
||||
return true;
|
||||
@@ -135,16 +162,8 @@ private:
|
||||
};
|
||||
|
||||
public:
|
||||
static inline std::chrono::seconds
|
||||
getDefaultHoldTime()
|
||||
{
|
||||
using namespace std::chrono;
|
||||
|
||||
return 300s;
|
||||
}
|
||||
|
||||
HashRouter(Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds)
|
||||
: suppressionMap_(clock), holdTime_(entryHoldTimeInSeconds)
|
||||
HashRouter(Setup const& setup, Stopwatch& clock)
|
||||
: setup_(setup), suppressionMap_(clock)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -195,11 +214,11 @@ public:
|
||||
Effects:
|
||||
|
||||
If the item should be relayed, this function will not
|
||||
return `true` again until the hold time has expired.
|
||||
return a seated optional again until the relay time has expired.
|
||||
The internal set of peers will also be reset.
|
||||
|
||||
@return A `std::optional` set of peers which do not need to be
|
||||
relayed to. If the result is uninitialized, the item should
|
||||
relayed to. If the result is unseated, the item should
|
||||
_not_ be relayed.
|
||||
*/
|
||||
std::optional<std::set<PeerShortID>>
|
||||
@@ -212,6 +231,9 @@ private:
|
||||
|
||||
std::mutex mutable mutex_;
|
||||
|
||||
// Configurable parameters
|
||||
Setup const setup_;
|
||||
|
||||
// Stores all suppressed hashes and their expiration time
|
||||
beast::aged_unordered_map<
|
||||
uint256,
|
||||
@@ -219,10 +241,11 @@ private:
|
||||
Stopwatch::clock_type,
|
||||
hardened_hash<strong_hash>>
|
||||
suppressionMap_;
|
||||
|
||||
std::chrono::seconds const holdTime_;
|
||||
};
|
||||
|
||||
HashRouter::Setup
|
||||
setup_HashRouter(Config const&);
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
#endif
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
#include <xrpld/app/ledger/OrderBookDB.h>
|
||||
#include <xrpld/app/ledger/TransactionMaster.h>
|
||||
#include <xrpld/app/main/LoadManager.h>
|
||||
#include <xrpld/app/main/Tuning.h>
|
||||
#include <xrpld/app/misc/AmendmentTable.h>
|
||||
#include <xrpld/app/misc/DeliverMax.h>
|
||||
#include <xrpld/app/misc/HashRouter.h>
|
||||
@@ -298,6 +299,9 @@ public:
|
||||
bool bLocal,
|
||||
FailHard failType) override;
|
||||
|
||||
void
|
||||
processTransactionSet(CanonicalTXSet const& set) override;
|
||||
|
||||
/**
|
||||
* For transactions submitted directly by a client, apply batch of
|
||||
* transactions and wait for this transaction to complete.
|
||||
@@ -327,6 +331,16 @@ public:
|
||||
bool bUnlimited,
|
||||
FailHard failtype);
|
||||
|
||||
private:
|
||||
bool
|
||||
preProcessTransaction(std::shared_ptr<Transaction>& transaction);
|
||||
|
||||
void
|
||||
doTransactionSyncBatch(
|
||||
std::unique_lock<std::mutex>& lock,
|
||||
std::function<bool(std::unique_lock<std::mutex> const&)> retryCallback);
|
||||
|
||||
public:
|
||||
/**
|
||||
* Apply transactions in batches. Continue until none are queued.
|
||||
*/
|
||||
@@ -1221,14 +1235,9 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
NetworkOPsImp::processTransaction(
|
||||
std::shared_ptr<Transaction>& transaction,
|
||||
bool bUnlimited,
|
||||
bool bLocal,
|
||||
FailHard failType)
|
||||
bool
|
||||
NetworkOPsImp::preProcessTransaction(std::shared_ptr<Transaction>& transaction)
|
||||
{
|
||||
auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN");
|
||||
auto const newFlags = app_.getHashRouter().getFlags(transaction->getID());
|
||||
|
||||
if ((newFlags & SF_BAD) != 0)
|
||||
@@ -1237,7 +1246,7 @@ NetworkOPsImp::processTransaction(
|
||||
JLOG(m_journal.warn()) << transaction->getID() << ": cached bad!\n";
|
||||
transaction->setStatus(INVALID);
|
||||
transaction->setResult(temBAD_SIGNATURE);
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
// NOTE eahennis - I think this check is redundant,
|
||||
@@ -1260,12 +1269,28 @@ NetworkOPsImp::processTransaction(
|
||||
transaction->setStatus(INVALID);
|
||||
transaction->setResult(temBAD_SIGNATURE);
|
||||
app_.getHashRouter().setFlags(transaction->getID(), SF_BAD);
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
// canonicalize can change our pointer
|
||||
app_.getMasterTransaction().canonicalize(&transaction);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
NetworkOPsImp::processTransaction(
|
||||
std::shared_ptr<Transaction>& transaction,
|
||||
bool bUnlimited,
|
||||
bool bLocal,
|
||||
FailHard failType)
|
||||
{
|
||||
auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN");
|
||||
|
||||
// preProcessTransaction can change our pointer
|
||||
if (!preProcessTransaction(transaction))
|
||||
return;
|
||||
|
||||
if (bLocal)
|
||||
doTransactionSync(transaction, bUnlimited, failType);
|
||||
else
|
||||
@@ -1312,6 +1337,17 @@ NetworkOPsImp::doTransactionSync(
|
||||
transaction->setApplying();
|
||||
}
|
||||
|
||||
doTransactionSyncBatch(
|
||||
lock, [&transaction](std::unique_lock<std::mutex> const&) {
|
||||
return transaction->getApplying();
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
NetworkOPsImp::doTransactionSyncBatch(
|
||||
std::unique_lock<std::mutex>& lock,
|
||||
std::function<bool(std::unique_lock<std::mutex> const&)> retryCallback)
|
||||
{
|
||||
do
|
||||
{
|
||||
if (mDispatchState == DispatchState::running)
|
||||
@@ -1334,7 +1370,70 @@ NetworkOPsImp::doTransactionSync(
|
||||
}
|
||||
}
|
||||
}
|
||||
} while (transaction->getApplying());
|
||||
} while (retryCallback(lock));
|
||||
}
|
||||
|
||||
void
|
||||
NetworkOPsImp::processTransactionSet(CanonicalTXSet const& set)
|
||||
{
|
||||
auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXNSet");
|
||||
std::vector<std::shared_ptr<Transaction>> candidates;
|
||||
candidates.reserve(set.size());
|
||||
for (auto const& [_, tx] : set)
|
||||
{
|
||||
std::string reason;
|
||||
auto transaction = std::make_shared<Transaction>(tx, reason, app_);
|
||||
|
||||
if (transaction->getStatus() == INVALID)
|
||||
{
|
||||
if (!reason.empty())
|
||||
{
|
||||
JLOG(m_journal.trace())
|
||||
<< "Exception checking transaction: " << reason;
|
||||
}
|
||||
app_.getHashRouter().setFlags(tx->getTransactionID(), SF_BAD);
|
||||
continue;
|
||||
}
|
||||
|
||||
// preProcessTransaction can change our pointer
|
||||
if (!preProcessTransaction(transaction))
|
||||
continue;
|
||||
|
||||
candidates.emplace_back(transaction);
|
||||
}
|
||||
|
||||
std::vector<TransactionStatus> transactions;
|
||||
transactions.reserve(candidates.size());
|
||||
|
||||
std::unique_lock lock(mMutex);
|
||||
|
||||
for (auto& transaction : candidates)
|
||||
{
|
||||
if (!transaction->getApplying())
|
||||
{
|
||||
transactions.emplace_back(transaction, false, false, FailHard::no);
|
||||
transaction->setApplying();
|
||||
}
|
||||
}
|
||||
|
||||
if (mTransactions.empty())
|
||||
mTransactions.swap(transactions);
|
||||
else
|
||||
{
|
||||
mTransactions.reserve(mTransactions.size() + transactions.size());
|
||||
for (auto& t : transactions)
|
||||
mTransactions.push_back(std::move(t));
|
||||
}
|
||||
|
||||
doTransactionSyncBatch(lock, [&](std::unique_lock<std::mutex> const&) {
|
||||
XRPL_ASSERT(
|
||||
lock.owns_lock(),
|
||||
"ripple::NetworkOPsImp::processTransactionSet has lock");
|
||||
return std::any_of(
|
||||
mTransactions.begin(), mTransactions.end(), [](auto const& t) {
|
||||
return t.transaction->getApplying();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
@@ -1441,16 +1540,28 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
|
||||
<< "Transaction is now included in open ledger";
|
||||
e.transaction->setStatus(INCLUDED);
|
||||
|
||||
// Pop as many "reasonable" transactions for this account as
|
||||
// possible. "Reasonable" means they have sequential sequence
|
||||
// numbers, or use tickets.
|
||||
auto const& txCur = e.transaction->getSTransaction();
|
||||
auto const txNext = m_ledgerMaster.popAcctTransaction(txCur);
|
||||
if (txNext)
|
||||
|
||||
std::size_t count = 0;
|
||||
for (auto txNext = m_ledgerMaster.popAcctTransaction(txCur);
|
||||
txNext && count < maxPoppedTransactions;
|
||||
txNext = m_ledgerMaster.popAcctTransaction(txCur), ++count)
|
||||
{
|
||||
if (!batchLock.owns_lock())
|
||||
batchLock.lock();
|
||||
std::string reason;
|
||||
auto const trans = sterilize(*txNext);
|
||||
auto t = std::make_shared<Transaction>(trans, reason, app_);
|
||||
if (t->getApplying())
|
||||
break;
|
||||
submit_held.emplace_back(t, false, false, FailHard::no);
|
||||
t->setApplying();
|
||||
}
|
||||
if (batchLock.owns_lock())
|
||||
batchLock.unlock();
|
||||
}
|
||||
else if (e.result == tefPAST_SEQ)
|
||||
{
|
||||
@@ -1472,16 +1583,54 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
|
||||
e.transaction->setQueued();
|
||||
e.transaction->setKept();
|
||||
}
|
||||
else if (isTerRetry(e.result))
|
||||
else if (
|
||||
isTerRetry(e.result) || isTelLocal(e.result) ||
|
||||
isTefFailure(e.result))
|
||||
{
|
||||
if (e.failType != FailHard::yes)
|
||||
{
|
||||
// transaction should be held
|
||||
JLOG(m_journal.debug())
|
||||
<< "Transaction should be held: " << e.result;
|
||||
e.transaction->setStatus(HELD);
|
||||
m_ledgerMaster.addHeldTransaction(e.transaction);
|
||||
e.transaction->setKept();
|
||||
auto const lastLedgerSeq =
|
||||
e.transaction->getSTransaction()->at(
|
||||
~sfLastLedgerSequence);
|
||||
auto const ledgersLeft = lastLedgerSeq
|
||||
? *lastLedgerSeq -
|
||||
m_ledgerMaster.getCurrentLedgerIndex()
|
||||
: std::optional<LedgerIndex>{};
|
||||
// If any of these conditions are met, the transaction can
|
||||
// be held:
|
||||
// 1. It was submitted locally. (Note that this flag is only
|
||||
// true on the initial submission.)
|
||||
// 2. The transaction has a LastLedgerSequence, and the
|
||||
// LastLedgerSequence is fewer than LocalTxs::holdLedgers
|
||||
// (5) ledgers into the future. (Remember that an
|
||||
// unseated optional compares as less than all seated
|
||||
// values, so it has to be checked explicitly first.)
|
||||
// 3. The SF_HELD flag is not set on the txID. (setFlags
|
||||
// checks before setting. If the flag is set, it returns
|
||||
// false, which means it's been held once without one of
|
||||
// the other conditions, so don't hold it again. Time's
|
||||
// up!)
|
||||
//
|
||||
if (e.local ||
|
||||
(ledgersLeft && ledgersLeft <= LocalTxs::holdLedgers) ||
|
||||
app_.getHashRouter().setFlags(
|
||||
e.transaction->getID(), SF_HELD))
|
||||
{
|
||||
// transaction should be held
|
||||
JLOG(m_journal.debug())
|
||||
<< "Transaction should be held: " << e.result;
|
||||
e.transaction->setStatus(HELD);
|
||||
m_ledgerMaster.addHeldTransaction(e.transaction);
|
||||
e.transaction->setKept();
|
||||
}
|
||||
else
|
||||
JLOG(m_journal.debug())
|
||||
<< "Not holding transaction "
|
||||
<< e.transaction->getID() << ": "
|
||||
<< (e.local ? "local" : "network") << ", "
|
||||
<< "result: " << e.result << " ledgers left: "
|
||||
<< (ledgersLeft ? to_string(*ledgersLeft)
|
||||
: "unspecified");
|
||||
}
|
||||
}
|
||||
else
|
||||
@@ -1549,8 +1698,11 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
|
||||
if (mTransactions.empty())
|
||||
mTransactions.swap(submit_held);
|
||||
else
|
||||
{
|
||||
mTransactions.reserve(mTransactions.size() + submit_held.size());
|
||||
for (auto& e : submit_held)
|
||||
mTransactions.push_back(std::move(e));
|
||||
}
|
||||
}
|
||||
|
||||
mCond.notify_all();
|
||||
|
||||
@@ -42,6 +42,7 @@ class Peer;
|
||||
class LedgerMaster;
|
||||
class Transaction;
|
||||
class ValidatorKeys;
|
||||
class CanonicalTXSet;
|
||||
|
||||
// This is the primary interface into the "client" portion of the program.
|
||||
// Code that wants to do normal operations on the network such as
|
||||
@@ -140,6 +141,15 @@ public:
|
||||
bool bLocal,
|
||||
FailHard failType) = 0;
|
||||
|
||||
/**
|
||||
* Process a set of transactions synchronously, and ensuring that they are
|
||||
* processed in one batch.
|
||||
*
|
||||
* @param set Transaction object set
|
||||
*/
|
||||
virtual void
|
||||
processTransactionSet(CanonicalTXSet const& set) = 0;
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
//
|
||||
// Owner functions
|
||||
|
||||
@@ -152,6 +152,8 @@ public:
|
||||
void
|
||||
setApplying()
|
||||
{
|
||||
// Note that all access to mApplying are made by NetworkOPsImp, and must
|
||||
// be done under that class's lock.
|
||||
mApplying = true;
|
||||
}
|
||||
|
||||
@@ -163,6 +165,8 @@ public:
|
||||
bool
|
||||
getApplying()
|
||||
{
|
||||
// Note that all access to mApplying are made by NetworkOPsImp, and must
|
||||
// be done under that class's lock.
|
||||
return mApplying;
|
||||
}
|
||||
|
||||
@@ -172,6 +176,8 @@ public:
|
||||
void
|
||||
clearApplying()
|
||||
{
|
||||
// Note that all access to mApplying are made by NetworkOPsImp, and must
|
||||
// be done under that class's lock.
|
||||
mApplying = false;
|
||||
}
|
||||
|
||||
@@ -396,6 +402,24 @@ private:
|
||||
std::optional<uint32_t> mNetworkID;
|
||||
TransStatus mStatus = INVALID;
|
||||
TER mResult = temUNCERTAIN;
|
||||
/* Note that all access to mApplying are made by NetworkOPsImp,
|
||||
and must be done under that class's lock. This avoids the overhead of
|
||||
taking a separate lock, and the consequences of a race condition are
|
||||
nearly-zero.
|
||||
|
||||
1. If there is a race condition, and getApplying returns false when it
|
||||
should be true, the transaction will be processed again. Not that
|
||||
big a deal if it's a rare one-off. Most of the time, it'll get
|
||||
tefALREADY or tefPAST_SEQ.
|
||||
2. On the flip side, if it returns true, when it should be false, then
|
||||
the transaction must have been attempted recently, so no big deal if
|
||||
it doesn't immediately get tried right away.
|
||||
3. If there's a race between setApplying and clearApplying, and the
|
||||
flag ends up set, then a batch is about to try to process the
|
||||
transaction and will call clearApplying later. If it ends up
|
||||
cleared, then it might get attempted again later as is the case with
|
||||
item 1.
|
||||
*/
|
||||
bool mApplying = false;
|
||||
|
||||
/** different ways for transaction to be accepted */
|
||||
|
||||
@@ -41,8 +41,7 @@ convertBlobsToTxResult(
|
||||
|
||||
auto tr = std::make_shared<Transaction>(txn, reason, app);
|
||||
|
||||
auto metaset =
|
||||
std::make_shared<TxMeta>(tr->getID(), tr->getLedger(), rawMeta);
|
||||
auto metaset = std::make_shared<TxMeta>(tr->getID(), ledger_index, rawMeta);
|
||||
|
||||
// if properly formed meta is available we can use it to generate ctid
|
||||
if (metaset->getAsObject().isFieldPresent(sfTransactionIndex))
|
||||
|
||||
@@ -102,7 +102,7 @@ ValidatorSite::ValidatorSite(
|
||||
ValidatorSite::~ValidatorSite()
|
||||
{
|
||||
std::unique_lock<std::mutex> lock{state_mutex_};
|
||||
if (timer_.expires_at() > clock_type::time_point{})
|
||||
if (timer_.expiry() > clock_type::time_point{})
|
||||
{
|
||||
if (!stopping_)
|
||||
{
|
||||
@@ -168,7 +168,7 @@ ValidatorSite::start()
|
||||
{
|
||||
std::lock_guard l0{sites_mutex_};
|
||||
std::lock_guard l1{state_mutex_};
|
||||
if (timer_.expires_at() == clock_type::time_point{})
|
||||
if (timer_.expiry() == clock_type::time_point{})
|
||||
setTimer(l0, l1);
|
||||
}
|
||||
|
||||
|
||||
@@ -96,6 +96,9 @@ public:
|
||||
void
|
||||
onResolve(error_code const& ec, results_type results);
|
||||
|
||||
void
|
||||
onConnect(error_code const& ec, endpoint_type const& endpoint);
|
||||
|
||||
void
|
||||
onStart();
|
||||
|
||||
@@ -195,46 +198,26 @@ WorkBase<Impl>::onResolve(error_code const& ec, results_type results)
|
||||
if (ec)
|
||||
return fail(ec);
|
||||
|
||||
// Use last endpoint if it is successfully connected
|
||||
// and is in the list, otherwise pick a random endpoint
|
||||
// from the list (excluding last endpoint). If there is
|
||||
// only one endpoint and it is the last endpoint then
|
||||
// use the last endpoint.
|
||||
lastEndpoint_ = [&]() -> endpoint_type {
|
||||
int foundIndex = 0;
|
||||
auto const foundIt = std::find_if(
|
||||
results.begin(), results.end(), [&](endpoint_type const& e) {
|
||||
if (e == lastEndpoint_)
|
||||
return true;
|
||||
foundIndex++;
|
||||
return false;
|
||||
});
|
||||
if (foundIt != results.end() && lastStatus_)
|
||||
return lastEndpoint_;
|
||||
else if (results.size() == 1)
|
||||
return *results.begin();
|
||||
else if (foundIt == results.end())
|
||||
return *std::next(results.begin(), rand_int(results.size() - 1));
|
||||
|
||||
// lastEndpoint_ is part of the collection
|
||||
// Pick a random number from the n-1 valid choices, if we use
|
||||
// this as an index, note the last element will never be chosen
|
||||
// and the `lastEndpoint_` index may be chosen. So when the
|
||||
// `lastEndpoint_` index is chosen, that is treated as if the
|
||||
// last element was chosen.
|
||||
auto randIndex =
|
||||
(results.size() > 2) ? rand_int(results.size() - 2) : 0;
|
||||
if (randIndex == foundIndex)
|
||||
randIndex = results.size() - 1;
|
||||
return *std::next(results.begin(), randIndex);
|
||||
}();
|
||||
|
||||
socket_.async_connect(
|
||||
lastEndpoint_,
|
||||
boost::asio::async_connect(
|
||||
socket_,
|
||||
results,
|
||||
strand_.wrap(std::bind(
|
||||
&Impl::onConnect,
|
||||
&WorkBase::onConnect,
|
||||
impl().shared_from_this(),
|
||||
std::placeholders::_1)));
|
||||
std::placeholders::_1,
|
||||
std::placeholders::_2)));
|
||||
}
|
||||
|
||||
template <class Impl>
|
||||
void
|
||||
WorkBase<Impl>::onConnect(error_code const& ec, endpoint_type const& endpoint)
|
||||
{
|
||||
lastEndpoint_ = endpoint;
|
||||
|
||||
if (ec)
|
||||
return fail(ec);
|
||||
|
||||
impl().onConnect(ec);
|
||||
}
|
||||
|
||||
template <class Impl>
|
||||
|
||||
@@ -45,8 +45,8 @@ static constexpr auto IDLED = std::chrono::seconds{8};
|
||||
// of messages from the validator. We add peers who reach
|
||||
// MIN_MESSAGE_THRESHOLD to considered pool once MAX_SELECTED_PEERS
|
||||
// reach MAX_MESSAGE_THRESHOLD.
|
||||
static constexpr uint16_t MIN_MESSAGE_THRESHOLD = 9;
|
||||
static constexpr uint16_t MAX_MESSAGE_THRESHOLD = 10;
|
||||
static constexpr uint16_t MIN_MESSAGE_THRESHOLD = 19;
|
||||
static constexpr uint16_t MAX_MESSAGE_THRESHOLD = 20;
|
||||
// Max selected peers to choose as the source of messages from validator
|
||||
static constexpr uint16_t MAX_SELECTED_PEERS = 5;
|
||||
// Wait before reduce-relay feature is enabled on boot up to let
|
||||
|
||||
@@ -58,9 +58,6 @@ std::chrono::milliseconds constexpr peerHighLatency{300};
|
||||
|
||||
/** How often we PING the peer to check for latency and sendq probe */
|
||||
std::chrono::seconds constexpr peerTimerInterval{60};
|
||||
|
||||
uint16_t constexpr maxPingNumber = 5;
|
||||
|
||||
} // namespace
|
||||
|
||||
// TODO: Remove this exclusion once unit tests are added after the hotfix
|
||||
@@ -97,7 +94,6 @@ PeerImp::PeerImp(
|
||||
, publicKey_(publicKey)
|
||||
, lastPingTime_(clock_type::now())
|
||||
, creationTime_(clock_type::now())
|
||||
, lastMessageTime_(clock_type::now())
|
||||
, squelch_(app_.journal("Squelch"))
|
||||
, usage_(consumer)
|
||||
, fee_{Resource::feeTrivialPeer, ""}
|
||||
@@ -116,10 +112,7 @@ PeerImp::PeerImp(
|
||||
headers_,
|
||||
FEATURE_TXRR,
|
||||
app_.config().TX_REDUCE_RELAY_ENABLE))
|
||||
, vpReduceRelayEnabled_(peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_VPRR,
|
||||
app_.config().VP_REDUCE_RELAY_ENABLE))
|
||||
, vpReduceRelayEnabled_(app_.config().VP_REDUCE_RELAY_ENABLE)
|
||||
, ledgerReplayEnabled_(peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_LEDGER_REPLAY,
|
||||
@@ -736,32 +729,8 @@ PeerImp::onTimer(error_code const& ec)
|
||||
// Already waiting for PONG
|
||||
if (lastPingSeq_)
|
||||
{
|
||||
pingAttempts_++;
|
||||
|
||||
auto now = clock_type::now();
|
||||
auto lastMessageTimeCopy =
|
||||
lastMessageTime_.load(std::memory_order_relaxed);
|
||||
|
||||
if ((now - lastMessageTimeCopy) < peerTimerInterval)
|
||||
{
|
||||
JLOG(journal_.info()) << "Message received within PingPong window, "
|
||||
"skipping disconnect.";
|
||||
pingAttempts_ = 0; // Reset attempts
|
||||
}
|
||||
else if (pingAttempts_ >= maxPingNumber)
|
||||
{
|
||||
fail("Ping Timeout");
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
JLOG(journal_.info()) << "Missing Pong, sending Ping, attempt "
|
||||
<< pingAttempts_ << " of " << maxPingNumber;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
pingAttempts_ = 0;
|
||||
fail("Ping Timeout");
|
||||
return;
|
||||
}
|
||||
|
||||
lastPingTime_ = clock_type::now();
|
||||
@@ -1096,8 +1065,6 @@ PeerImp::onMessageEnd(
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMManifests> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
auto const s = m->list_size();
|
||||
|
||||
if (s == 0)
|
||||
@@ -1135,7 +1102,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMPing> const& m)
|
||||
if (m->seq() == lastPingSeq_)
|
||||
{
|
||||
lastPingSeq_.reset();
|
||||
pingAttempts_ = 0;
|
||||
|
||||
// Update latency estimate
|
||||
auto const rtt = std::chrono::round<std::chrono::milliseconds>(
|
||||
@@ -1156,8 +1122,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMPing> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMCluster> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
// VFALCO NOTE I think we should drop the peer immediately
|
||||
if (!cluster())
|
||||
{
|
||||
@@ -1230,8 +1194,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMCluster> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
// Don't allow endpoints from peers that are not known tracking or are
|
||||
// not using a version of the message that we support:
|
||||
if (tracking_.load() != Tracking::converged || m->version() != 2)
|
||||
@@ -1289,8 +1251,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMTransaction> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
handleTransaction(m, true, false);
|
||||
}
|
||||
|
||||
@@ -1407,8 +1367,6 @@ PeerImp::handleTransaction(
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
auto badData = [&](std::string const& msg) {
|
||||
fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
|
||||
JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
|
||||
@@ -1498,8 +1456,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
|
||||
if (!ledgerReplayEnabled_)
|
||||
{
|
||||
@@ -1539,8 +1495,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
if (!ledgerReplayEnabled_)
|
||||
{
|
||||
fee_.update(
|
||||
@@ -1557,8 +1511,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
|
||||
if (!ledgerReplayEnabled_)
|
||||
{
|
||||
@@ -1598,8 +1550,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
if (!ledgerReplayEnabled_)
|
||||
{
|
||||
fee_.update(
|
||||
@@ -1616,8 +1566,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
auto badData = [&](std::string const& msg) {
|
||||
fee_.update(Resource::feeInvalidData, msg);
|
||||
JLOG(p_journal_.warn()) << "TMLedgerData: " << msg;
|
||||
@@ -1709,8 +1657,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
protocol::TMProposeSet& set = *m;
|
||||
|
||||
auto const sig = makeSlice(set.signature());
|
||||
@@ -1833,8 +1779,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
JLOG(p_journal_.trace()) << "Status: Change";
|
||||
|
||||
if (!m->has_networktime())
|
||||
@@ -2051,8 +1995,6 @@ PeerImp::checkTracking(std::uint32_t seq1, std::uint32_t seq2)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactionSet> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
if (!stringIsUint256Sized(m->hash()))
|
||||
{
|
||||
fee_.update(Resource::feeMalformedRequest, "bad hash");
|
||||
@@ -2293,8 +2235,6 @@ PeerImp::onValidatorListMessage(
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMValidatorList> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
try
|
||||
{
|
||||
if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
|
||||
@@ -2325,8 +2265,6 @@ void
|
||||
PeerImp::onMessage(
|
||||
std::shared_ptr<protocol::TMValidatorListCollection> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
try
|
||||
{
|
||||
if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
|
||||
@@ -2366,8 +2304,6 @@ PeerImp::onMessage(
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
if (m->validation().size() < 50)
|
||||
{
|
||||
JLOG(p_journal_.warn()) << "Validation: Too small";
|
||||
@@ -2495,8 +2431,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
protocol::TMGetObjectByHash& packet = *m;
|
||||
|
||||
JLOG(p_journal_.trace()) << "received TMGetObjectByHash " << packet.type()
|
||||
@@ -2653,8 +2587,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactions> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
if (!txReduceRelayEnabled())
|
||||
{
|
||||
JLOG(p_journal_.error())
|
||||
@@ -2724,8 +2656,6 @@ PeerImp::handleHaveTransactions(
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMTransactions> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
if (!txReduceRelayEnabled())
|
||||
{
|
||||
JLOG(p_journal_.error())
|
||||
@@ -2750,8 +2680,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMTransactions> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
|
||||
{
|
||||
lastMessageTime_ = clock_type::now();
|
||||
|
||||
using on_message_fn =
|
||||
void (PeerImp::*)(std::shared_ptr<protocol::TMSquelch> const&);
|
||||
if (!strand_.running_in_this_thread())
|
||||
@@ -2774,16 +2702,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
|
||||
}
|
||||
PublicKey key(slice);
|
||||
|
||||
// Ignore non-validator squelch
|
||||
if (!app_.validators().listed(key))
|
||||
{
|
||||
fee_.update(Resource::feeInvalidData, "squelch non-validator");
|
||||
JLOG(p_journal_.debug())
|
||||
<< "onMessage: TMSquelch discarding non-validator squelch "
|
||||
<< slice;
|
||||
return;
|
||||
}
|
||||
|
||||
// Ignore the squelch for validator's own messages.
|
||||
if (key == app_.getValidationPublicKey())
|
||||
{
|
||||
|
||||
@@ -112,10 +112,8 @@ private:
|
||||
|
||||
std::optional<std::chrono::milliseconds> latency_;
|
||||
std::optional<std::uint32_t> lastPingSeq_;
|
||||
uint16_t pingAttempts_ = 0;
|
||||
clock_type::time_point lastPingTime_;
|
||||
clock_type::time_point const creationTime_;
|
||||
std::atomic<clock_type::time_point> lastMessageTime_;
|
||||
|
||||
reduce_relay::Squelch<UptimeClock> squelch_;
|
||||
inline static std::atomic_bool reduceRelayReady_{false};
|
||||
@@ -707,10 +705,7 @@ PeerImp::PeerImp(
|
||||
headers_,
|
||||
FEATURE_TXRR,
|
||||
app_.config().TX_REDUCE_RELAY_ENABLE))
|
||||
, vpReduceRelayEnabled_(peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_VPRR,
|
||||
app_.config().VP_REDUCE_RELAY_ENABLE))
|
||||
, vpReduceRelayEnabled_(app_.config().VP_REDUCE_RELAY_ENABLE)
|
||||
, ledgerReplayEnabled_(peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_LEDGER_REPLAY,
|
||||
|
||||
@@ -43,19 +43,10 @@ public:
|
||||
updateHash();
|
||||
}
|
||||
|
||||
SHAMapAccountStateLeafNode(
|
||||
boost::intrusive_ptr<SHAMapItem const> item,
|
||||
std::uint32_t cowid,
|
||||
SHAMapHash const& hash)
|
||||
: SHAMapLeafNode(std::move(item), cowid, hash)
|
||||
{
|
||||
}
|
||||
|
||||
intr_ptr::SharedPtr<SHAMapTreeNode>
|
||||
clone(std::uint32_t cowid) const final override
|
||||
{
|
||||
return intr_ptr::make_shared<SHAMapAccountStateLeafNode>(
|
||||
item_, cowid, hash_);
|
||||
return intr_ptr::make_shared<SHAMapAccountStateLeafNode>(item_, cowid);
|
||||
}
|
||||
|
||||
SHAMapNodeType
|
||||
@@ -67,8 +58,11 @@ public:
|
||||
void
|
||||
updateHash() final override
|
||||
{
|
||||
hash_ = SHAMapHash{
|
||||
sha512Half(HashPrefix::leafNode, item_->slice(), item_->key())};
|
||||
// hash_ = SHAMapHash{
|
||||
// sha512Half(HashPrefix::leafNode, item_->slice(), item_->key())};
|
||||
|
||||
// TODO return SHAMapHash{
|
||||
// sha512Half(HashPrefix::leafNode, item_->slice(), item_->key())};
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -36,11 +36,6 @@ protected:
|
||||
boost::intrusive_ptr<SHAMapItem const> item,
|
||||
std::uint32_t cowid);
|
||||
|
||||
SHAMapLeafNode(
|
||||
boost::intrusive_ptr<SHAMapItem const> item,
|
||||
std::uint32_t cowid,
|
||||
SHAMapHash const& hash);
|
||||
|
||||
public:
|
||||
SHAMapLeafNode(const SHAMapLeafNode&) = delete;
|
||||
SHAMapLeafNode&
|
||||
|
||||
@@ -51,8 +51,6 @@ enum class SHAMapNodeType {
|
||||
class SHAMapTreeNode : public IntrusiveRefCounts
|
||||
{
|
||||
protected:
|
||||
SHAMapHash hash_;
|
||||
|
||||
/** Determines the owning SHAMap, if any. Used for copy-on-write semantics.
|
||||
|
||||
If this value is 0, the node is not dirty and does not need to be
|
||||
@@ -75,13 +73,6 @@ protected:
|
||||
explicit SHAMapTreeNode(std::uint32_t cowid) noexcept : cowid_(cowid)
|
||||
{
|
||||
}
|
||||
|
||||
explicit SHAMapTreeNode(
|
||||
std::uint32_t cowid,
|
||||
SHAMapHash const& hash) noexcept
|
||||
: hash_(hash), cowid_(cowid)
|
||||
{
|
||||
}
|
||||
/** @} */
|
||||
|
||||
public:
|
||||
@@ -144,7 +135,9 @@ public:
|
||||
SHAMapHash const&
|
||||
getHash() const
|
||||
{
|
||||
return hash_;
|
||||
// TODO
|
||||
static SHAMapHash s;
|
||||
return s;
|
||||
}
|
||||
|
||||
/** Determines the type of node. */
|
||||
|
||||
@@ -42,18 +42,10 @@ public:
|
||||
updateHash();
|
||||
}
|
||||
|
||||
SHAMapTxLeafNode(
|
||||
boost::intrusive_ptr<SHAMapItem const> item,
|
||||
std::uint32_t cowid,
|
||||
SHAMapHash const& hash)
|
||||
: SHAMapLeafNode(std::move(item), cowid, hash)
|
||||
{
|
||||
}
|
||||
|
||||
intr_ptr::SharedPtr<SHAMapTreeNode>
|
||||
clone(std::uint32_t cowid) const final override
|
||||
{
|
||||
return intr_ptr::make_shared<SHAMapTxLeafNode>(item_, cowid, hash_);
|
||||
return intr_ptr::make_shared<SHAMapTxLeafNode>(item_, cowid);
|
||||
}
|
||||
|
||||
SHAMapNodeType
|
||||
@@ -65,8 +57,8 @@ public:
|
||||
void
|
||||
updateHash() final override
|
||||
{
|
||||
hash_ =
|
||||
SHAMapHash{sha512Half(HashPrefix::transactionID, item_->slice())};
|
||||
// hash_ =
|
||||
// SHAMapHash{sha512Half(HashPrefix::transactionID, item_->slice())};
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -43,19 +43,10 @@ public:
|
||||
updateHash();
|
||||
}
|
||||
|
||||
SHAMapTxPlusMetaLeafNode(
|
||||
boost::intrusive_ptr<SHAMapItem const> item,
|
||||
std::uint32_t cowid,
|
||||
SHAMapHash const& hash)
|
||||
: SHAMapLeafNode(std::move(item), cowid, hash)
|
||||
{
|
||||
}
|
||||
|
||||
intr_ptr::SharedPtr<SHAMapTreeNode>
|
||||
clone(std::uint32_t cowid) const override
|
||||
{
|
||||
return intr_ptr::make_shared<SHAMapTxPlusMetaLeafNode>(
|
||||
item_, cowid, hash_);
|
||||
return intr_ptr::make_shared<SHAMapTxPlusMetaLeafNode>(item_, cowid);
|
||||
}
|
||||
|
||||
SHAMapNodeType
|
||||
@@ -67,8 +58,8 @@ public:
|
||||
void
|
||||
updateHash() final override
|
||||
{
|
||||
hash_ = SHAMapHash{
|
||||
sha512Half(HashPrefix::txNode, item_->slice(), item_->key())};
|
||||
// hash_ = SHAMapHash{
|
||||
// sha512Half(HashPrefix::txNode, item_->slice(), item_->key())};
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -83,7 +83,6 @@ SHAMapInnerNode::clone(std::uint32_t cowid) const
|
||||
auto const branchCount = getBranchCount();
|
||||
auto const thisIsSparse = !hashesAndChildren_.isDense();
|
||||
auto p = intr_ptr::make_shared<SHAMapInnerNode>(cowid, branchCount);
|
||||
p->hash_ = hash_;
|
||||
p->isBranch_ = isBranch_;
|
||||
p->fullBelowGen_ = fullBelowGen_;
|
||||
SHAMapHash *cloneHashes, *thisHashes;
|
||||
@@ -154,10 +153,11 @@ SHAMapInnerNode::makeFullInner(
|
||||
|
||||
ret->resizeChildArrays(ret->getBranchCount());
|
||||
|
||||
if (hashValid)
|
||||
ret->hash_ = hash;
|
||||
else
|
||||
ret->updateHash();
|
||||
// TODO: do this outside?
|
||||
// if (hashValid)
|
||||
// ret->hash_ = hash;
|
||||
// else
|
||||
// ret->updateHash();
|
||||
|
||||
return ret;
|
||||
}
|
||||
@@ -210,7 +210,9 @@ SHAMapInnerNode::updateHash()
|
||||
iterChildren([&](SHAMapHash const& hh) { hash_append(h, hh); });
|
||||
nh = static_cast<typename sha512_half_hasher::result_type>(h);
|
||||
}
|
||||
hash_ = SHAMapHash{nh};
|
||||
|
||||
// TODO return SHAMapHash{nh};
|
||||
// hash_ = SHAMapHash{nh};
|
||||
}
|
||||
|
||||
void
|
||||
@@ -314,8 +316,6 @@ SHAMapInnerNode::setChild(int m, intr_ptr::SharedPtr<SHAMapTreeNode> child)
|
||||
children[childIndex] = std::move(child);
|
||||
}
|
||||
|
||||
hash_.zero();
|
||||
|
||||
XRPL_ASSERT(
|
||||
getBranchCount() <= hashesAndChildren_.capacity(),
|
||||
"ripple::SHAMapInnerNode::setChild : maximum branch count");
|
||||
@@ -472,15 +472,9 @@ SHAMapInnerNode::invariants(bool is_root) const
|
||||
|
||||
if (!is_root)
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
hash_.isNonZero(),
|
||||
"ripple::SHAMapInnerNode::invariants : nonzero hash");
|
||||
XRPL_ASSERT(
|
||||
count >= 1, "ripple::SHAMapInnerNode::invariants : minimum count");
|
||||
}
|
||||
XRPL_ASSERT(
|
||||
(count == 0) ? hash_.isZero() : hash_.isNonZero(),
|
||||
"ripple::SHAMapInnerNode::invariants : hash and count do match");
|
||||
}
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
@@ -32,19 +32,6 @@ SHAMapLeafNode::SHAMapLeafNode(
|
||||
"SHAMapItem const>, std::uint32_t) : minimum input size");
|
||||
}
|
||||
|
||||
SHAMapLeafNode::SHAMapLeafNode(
|
||||
boost::intrusive_ptr<SHAMapItem const> item,
|
||||
std::uint32_t cowid,
|
||||
SHAMapHash const& hash)
|
||||
: SHAMapTreeNode(cowid, hash), item_(std::move(item))
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
item_->size() >= 12,
|
||||
"ripple::SHAMapLeafNode::SHAMapLeafNode(boost::intrusive_ptr<"
|
||||
"SHAMapItem const>, std::uint32_t, SHAMapHash const&) : minimum input "
|
||||
"size");
|
||||
}
|
||||
|
||||
boost::intrusive_ptr<SHAMapItem const> const&
|
||||
SHAMapLeafNode::peekItem() const
|
||||
{
|
||||
@@ -57,11 +44,14 @@ SHAMapLeafNode::setItem(boost::intrusive_ptr<SHAMapItem const> item)
|
||||
XRPL_ASSERT(cowid_, "ripple::SHAMapLeafNode::setItem : nonzero cowid");
|
||||
item_ = std::move(item);
|
||||
|
||||
auto const oldHash = hash_;
|
||||
// auto const oldHash = hash_;
|
||||
|
||||
updateHash();
|
||||
// updateHash();
|
||||
|
||||
return (oldHash != hash_);
|
||||
// return (oldHash != hash_);
|
||||
|
||||
// TODO : return updateHash();
|
||||
return true;
|
||||
}
|
||||
|
||||
std::string
|
||||
@@ -82,8 +72,8 @@ SHAMapLeafNode::getString(const SHAMapNodeID& id) const
|
||||
|
||||
ret += " Tag=";
|
||||
ret += to_string(item_->key());
|
||||
ret += "\n Hash=";
|
||||
ret += to_string(hash_);
|
||||
// ret += "\n Hash=";
|
||||
// ret += to_string(hash_);
|
||||
ret += "/";
|
||||
ret += std::to_string(item_->size());
|
||||
return ret;
|
||||
@@ -92,8 +82,6 @@ SHAMapLeafNode::getString(const SHAMapNodeID& id) const
|
||||
void
|
||||
SHAMapLeafNode::invariants(bool) const
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
hash_.isNonZero(), "ripple::SHAMapLeafNode::invariants : nonzero hash");
|
||||
XRPL_ASSERT(item_, "ripple::SHAMapLeafNode::invariants : non-null item");
|
||||
}
|
||||
|
||||
|
||||
@@ -42,8 +42,7 @@ SHAMapTreeNode::makeTransaction(
|
||||
make_shamapitem(sha512Half(HashPrefix::transactionID, data), data);
|
||||
|
||||
if (hashValid)
|
||||
return intr_ptr::make_shared<SHAMapTxLeafNode>(
|
||||
std::move(item), 0, hash);
|
||||
return intr_ptr::make_shared<SHAMapTxLeafNode>(std::move(item), 0);
|
||||
|
||||
return intr_ptr::make_shared<SHAMapTxLeafNode>(std::move(item), 0);
|
||||
}
|
||||
@@ -72,7 +71,7 @@ SHAMapTreeNode::makeTransactionWithMeta(
|
||||
|
||||
if (hashValid)
|
||||
return intr_ptr::make_shared<SHAMapTxPlusMetaLeafNode>(
|
||||
std::move(item), 0, hash);
|
||||
std::move(item), 0);
|
||||
|
||||
return intr_ptr::make_shared<SHAMapTxPlusMetaLeafNode>(std::move(item), 0);
|
||||
}
|
||||
@@ -104,7 +103,7 @@ SHAMapTreeNode::makeAccountState(
|
||||
|
||||
if (hashValid)
|
||||
return intr_ptr::make_shared<SHAMapAccountStateLeafNode>(
|
||||
std::move(item), 0, hash);
|
||||
std::move(item), 0);
|
||||
|
||||
return intr_ptr::make_shared<SHAMapAccountStateLeafNode>(
|
||||
std::move(item), 0);
|
||||
|
||||
Reference in New Issue
Block a user