mirror of
https://github.com/XRPLF/rippled.git
synced 2026-01-30 03:25:27 +00:00
Compare commits
35 Commits
bthomee/di
...
ximinez/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4f98033c84 | ||
|
|
0f4b201379 | ||
|
|
561f2a29d0 | ||
|
|
2e14e453ae | ||
|
|
b54539fb59 | ||
|
|
83036e4f77 | ||
|
|
0f231c3cd2 | ||
|
|
58febcb683 | ||
|
|
754757bac9 | ||
|
|
859467455c | ||
|
|
60dc20042c | ||
|
|
108b3e5e2c | ||
|
|
599f197109 | ||
|
|
36000188b1 | ||
|
|
944d758a39 | ||
|
|
de661f8ef8 | ||
|
|
8b70664d4c | ||
|
|
4f03625b75 | ||
|
|
fb0379f93d | ||
|
|
8e795197c9 | ||
|
|
cf7665137e | ||
|
|
0aa43e1772 | ||
|
|
0834c23f27 | ||
|
|
cd62ba13ab | ||
|
|
31fc348446 | ||
|
|
009f17b9bf | ||
|
|
b0872bae95 | ||
|
|
0c69b23b93 | ||
|
|
a2e93188fc | ||
|
|
5406d28357 | ||
|
|
7804c09494 | ||
|
|
79d294bd2d | ||
|
|
acace507d0 | ||
|
|
98732100fb | ||
|
|
b186516d0a |
@@ -940,7 +940,23 @@
|
||||
#
|
||||
# path Location to store the database
|
||||
#
|
||||
# Optional keys for NuDB and RocksDB:
|
||||
# Optional keys
|
||||
#
|
||||
# cache_size Size of cache for database records. Default is 16384.
|
||||
# Setting this value to 0 will use the default value.
|
||||
#
|
||||
# cache_age Length of time in minutes to keep database records
|
||||
# cached. Default is 5 minutes. Setting this value to
|
||||
# 0 will use the default value.
|
||||
#
|
||||
# Note: if neither cache_size nor cache_age is
|
||||
# specified, the cache for database records will not
|
||||
# be created. If only one of cache_size or cache_age
|
||||
# is specified, the cache will be created using the
|
||||
# default value for the unspecified parameter.
|
||||
#
|
||||
# Note: the cache will not be created if online_delete
|
||||
# is specified.
|
||||
#
|
||||
# fast_load Boolean. If set, load the last persisted ledger
|
||||
# from disk upon process start before syncing to
|
||||
@@ -948,6 +964,8 @@
|
||||
# if sufficient IOPS capacity is available.
|
||||
# Default 0.
|
||||
#
|
||||
# Optional keys for NuDB or RocksDB:
|
||||
#
|
||||
# earliest_seq The default is 32570 to match the XRP ledger
|
||||
# network's earliest allowed sequence. Alternate
|
||||
# networks may set this value. Minimum value of 1.
|
||||
|
||||
133
include/xrpl/basics/CanProcess.h
Normal file
133
include/xrpl/basics/CanProcess.h
Normal file
@@ -0,0 +1,133 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2024 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
|
||||
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED
|
||||
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <set>
|
||||
|
||||
/** RAII class to check if an Item is already being processed on another thread,
|
||||
* as indicated by it's presence in a Collection.
|
||||
*
|
||||
* If the Item is not in the Collection, it will be added under lock in the
|
||||
* ctor, and removed under lock in the dtor. The object will be considered
|
||||
* "usable" and evaluate to `true`.
|
||||
*
|
||||
* If the Item is in the Collection, no changes will be made to the collection,
|
||||
* and the CanProcess object will be considered "unusable".
|
||||
*
|
||||
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
|
||||
* Process or skip a block of code, or set a flag.)
|
||||
*
|
||||
* The current use is to avoid lock contention that would be involved in
|
||||
* processing something associated with the Item.
|
||||
*
|
||||
* Examples:
|
||||
*
|
||||
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
|
||||
* {
|
||||
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
|
||||
* {
|
||||
* acquire(hash, ...);
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* bool
|
||||
* NetworkOPsImp::recvValidation(
|
||||
* std::shared_ptr<STValidation> const& val,
|
||||
* std::string const& source)
|
||||
* {
|
||||
* CanProcess check(
|
||||
* validationsMutex_, pendingValidations_, val->getLedgerHash());
|
||||
* BypassAccept bypassAccept =
|
||||
* check ? BypassAccept::no : BypassAccept::yes;
|
||||
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
|
||||
* }
|
||||
*
|
||||
*/
|
||||
class CanProcess
|
||||
{
|
||||
public:
|
||||
template <class Mutex, class Collection, class Item>
|
||||
CanProcess(Mutex& mtx, Collection& collection, Item const& item) : cleanup_(insert(mtx, collection, item))
|
||||
{
|
||||
}
|
||||
|
||||
~CanProcess()
|
||||
{
|
||||
if (cleanup_)
|
||||
cleanup_();
|
||||
}
|
||||
|
||||
explicit
|
||||
operator bool() const
|
||||
{
|
||||
return static_cast<bool>(cleanup_);
|
||||
}
|
||||
|
||||
private:
|
||||
template <bool useIterator, class Mutex, class Collection, class Item>
|
||||
std::function<void()>
|
||||
doInsert(Mutex& mtx, Collection& collection, Item const& item)
|
||||
{
|
||||
std::unique_lock<Mutex> lock(mtx);
|
||||
// TODO: Use structured binding once LLVM 16 is the minimum supported
|
||||
// version. See also: https://github.com/llvm/llvm-project/issues/48582
|
||||
// https://github.com/llvm/llvm-project/commit/127bf44385424891eb04cff8e52d3f157fc2cb7c
|
||||
auto const insertResult = collection.insert(item);
|
||||
auto const it = insertResult.first;
|
||||
if (!insertResult.second)
|
||||
return {};
|
||||
if constexpr (useIterator)
|
||||
return [&, it]() {
|
||||
std::unique_lock<Mutex> lock(mtx);
|
||||
collection.erase(it);
|
||||
};
|
||||
else
|
||||
return [&]() {
|
||||
std::unique_lock<Mutex> lock(mtx);
|
||||
collection.erase(item);
|
||||
};
|
||||
}
|
||||
|
||||
// Generic insert() function doesn't use iterators because they may get
|
||||
// invalidated
|
||||
template <class Mutex, class Collection, class Item>
|
||||
std::function<void()>
|
||||
insert(Mutex& mtx, Collection& collection, Item const& item)
|
||||
{
|
||||
return doInsert<false>(mtx, collection, item);
|
||||
}
|
||||
|
||||
// Specialize insert() for std::set, which does not invalidate iterators for
|
||||
// insert and erase
|
||||
template <class Mutex, class Item>
|
||||
std::function<void()>
|
||||
insert(Mutex& mtx, std::set<Item>& collection, Item const& item)
|
||||
{
|
||||
return doInsert<true>(mtx, collection, item);
|
||||
}
|
||||
|
||||
// If set, then the item is "usable"
|
||||
std::function<void()> cleanup_;
|
||||
};
|
||||
|
||||
#endif
|
||||
@@ -24,6 +24,32 @@ public:
|
||||
beast::Journal j)
|
||||
: Database(scheduler, readThreads, config, j), backend_(std::move(backend))
|
||||
{
|
||||
std::optional<int> cacheSize, cacheAge;
|
||||
|
||||
if (config.exists("cache_size"))
|
||||
{
|
||||
cacheSize = get<int>(config, "cache_size");
|
||||
if (cacheSize.value() < 0)
|
||||
{
|
||||
Throw<std::runtime_error>("Specified negative value for cache_size");
|
||||
}
|
||||
}
|
||||
|
||||
if (config.exists("cache_age"))
|
||||
{
|
||||
cacheAge = get<int>(config, "cache_age");
|
||||
if (cacheAge.value() < 0)
|
||||
{
|
||||
Throw<std::runtime_error>("Specified negative value for cache_age");
|
||||
}
|
||||
}
|
||||
|
||||
if (cacheSize != 0 || cacheAge != 0)
|
||||
{
|
||||
cache_ = std::make_shared<TaggedCache<uint256, NodeObject>>(
|
||||
"DatabaseNodeImp", cacheSize.value_or(0), std::chrono::minutes(cacheAge.value_or(0)), stopwatch(), j);
|
||||
}
|
||||
|
||||
XRPL_ASSERT(
|
||||
backend_,
|
||||
"xrpl::NodeStore::DatabaseNodeImp::DatabaseNodeImp : non-null "
|
||||
@@ -82,6 +108,9 @@ public:
|
||||
sweep() override;
|
||||
|
||||
private:
|
||||
// Cache for database objects. This cache is not always initialized. Check
|
||||
// for null before using.
|
||||
std::shared_ptr<TaggedCache<uint256, NodeObject>> cache_;
|
||||
// Persistent key/value storage
|
||||
std::shared_ptr<Backend> backend_;
|
||||
|
||||
|
||||
@@ -286,8 +286,18 @@ message TMLedgerData {
|
||||
required uint32 ledgerSeq = 2;
|
||||
required TMLedgerInfoType type = 3;
|
||||
repeated TMLedgerNode nodes = 4;
|
||||
// If the peer supports "responseCookies", this field will
|
||||
// never be populated.
|
||||
optional uint32 requestCookie = 5;
|
||||
optional TMReplyError error = 6;
|
||||
// The old field is called "requestCookie", but this is
|
||||
// a response, so this name makes more sense
|
||||
repeated uint32 responseCookies = 7;
|
||||
// If a TMGetLedger request was received without a "requestCookie",
|
||||
// and the peer supports it, this flag will be set to true to
|
||||
// indicate that the receiver should process the result in addition
|
||||
// to forwarding it to its "responseCookies" peers.
|
||||
optional bool directResponse = 8;
|
||||
}
|
||||
|
||||
message TMPing {
|
||||
|
||||
@@ -36,6 +36,8 @@ struct LedgerHeader
|
||||
|
||||
// If validated is false, it means "not yet validated."
|
||||
// Once validated is true, it will never be set false at a later time.
|
||||
// NOTE: If you are accessing this directly, you are probably doing it
|
||||
// wrong. Use LedgerMaster::isValidated().
|
||||
// VFALCO TODO Make this not mutable
|
||||
bool mutable validated = false;
|
||||
bool accepted = false;
|
||||
|
||||
@@ -10,6 +10,11 @@ DatabaseNodeImp::store(NodeObjectType type, Blob&& data, uint256 const& hash, st
|
||||
|
||||
auto obj = NodeObject::createObject(type, std::move(data), hash);
|
||||
backend_->store(obj);
|
||||
if (cache_)
|
||||
{
|
||||
// After the store, replace a negative cache entry if there is one
|
||||
cache_->canonicalize(hash, obj, [](std::shared_ptr<NodeObject> const& n) { return n->getType() == hotDUMMY; });
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
@@ -18,41 +23,77 @@ DatabaseNodeImp::asyncFetch(
|
||||
std::uint32_t ledgerSeq,
|
||||
std::function<void(std::shared_ptr<NodeObject> const&)>&& callback)
|
||||
{
|
||||
if (cache_)
|
||||
{
|
||||
std::shared_ptr<NodeObject> obj = cache_->fetch(hash);
|
||||
if (obj)
|
||||
{
|
||||
callback(obj->getType() == hotDUMMY ? nullptr : obj);
|
||||
return;
|
||||
}
|
||||
}
|
||||
Database::asyncFetch(hash, ledgerSeq, std::move(callback));
|
||||
}
|
||||
|
||||
void
|
||||
DatabaseNodeImp::sweep()
|
||||
{
|
||||
if (cache_)
|
||||
cache_->sweep();
|
||||
}
|
||||
|
||||
std::shared_ptr<NodeObject>
|
||||
DatabaseNodeImp::fetchNodeObject(uint256 const& hash, std::uint32_t, FetchReport& fetchReport, bool duplicate)
|
||||
{
|
||||
std::shared_ptr<NodeObject> nodeObject = nullptr;
|
||||
Status status;
|
||||
std::shared_ptr<NodeObject> nodeObject = cache_ ? cache_->fetch(hash) : nullptr;
|
||||
|
||||
try
|
||||
if (!nodeObject)
|
||||
{
|
||||
status = backend_->fetch(hash.data(), &nodeObject);
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
JLOG(j_.fatal()) << "fetchNodeObject " << hash << ": Exception fetching from backend: " << e.what();
|
||||
Rethrow();
|
||||
}
|
||||
JLOG(j_.trace()) << "fetchNodeObject " << hash << ": record not " << (cache_ ? "cached" : "found");
|
||||
|
||||
switch (status)
|
||||
Status status;
|
||||
|
||||
try
|
||||
{
|
||||
status = backend_->fetch(hash.data(), &nodeObject);
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
JLOG(j_.fatal()) << "fetchNodeObject " << hash << ": Exception fetching from backend: " << e.what();
|
||||
Rethrow();
|
||||
}
|
||||
|
||||
switch (status)
|
||||
{
|
||||
case ok:
|
||||
if (cache_)
|
||||
{
|
||||
if (nodeObject)
|
||||
cache_->canonicalize_replace_client(hash, nodeObject);
|
||||
else
|
||||
{
|
||||
auto notFound = NodeObject::createObject(hotDUMMY, {}, hash);
|
||||
cache_->canonicalize_replace_client(hash, notFound);
|
||||
if (notFound->getType() != hotDUMMY)
|
||||
nodeObject = notFound;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case notFound:
|
||||
break;
|
||||
case dataCorrupt:
|
||||
JLOG(j_.fatal()) << "fetchNodeObject " << hash << ": nodestore data is corrupted";
|
||||
break;
|
||||
default:
|
||||
JLOG(j_.warn()) << "fetchNodeObject " << hash << ": backend returns unknown result " << status;
|
||||
break;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
case ok:
|
||||
case notFound:
|
||||
break;
|
||||
case dataCorrupt:
|
||||
JLOG(j_.fatal()) << "fetchNodeObject " << hash << ": nodestore data is corrupted";
|
||||
break;
|
||||
default:
|
||||
JLOG(j_.warn()) << "fetchNodeObject " << hash << ": backend returns unknown result " << status;
|
||||
break;
|
||||
JLOG(j_.trace()) << "fetchNodeObject " << hash << ": record found in cache";
|
||||
if (nodeObject->getType() == hotDUMMY)
|
||||
nodeObject.reset();
|
||||
}
|
||||
|
||||
if (nodeObject)
|
||||
@@ -64,29 +105,66 @@ DatabaseNodeImp::fetchNodeObject(uint256 const& hash, std::uint32_t, FetchReport
|
||||
std::vector<std::shared_ptr<NodeObject>>
|
||||
DatabaseNodeImp::fetchBatch(std::vector<uint256> const& hashes)
|
||||
{
|
||||
std::vector<std::shared_ptr<NodeObject>> results{hashes.size()};
|
||||
using namespace std::chrono;
|
||||
auto const before = steady_clock::now();
|
||||
|
||||
std::vector<uint256 const*> batch{hashes.size()};
|
||||
std::unordered_map<uint256 const*, size_t> indexMap;
|
||||
std::vector<uint256 const*> cacheMisses;
|
||||
uint64_t hits = 0;
|
||||
uint64_t fetches = 0;
|
||||
for (size_t i = 0; i < hashes.size(); ++i)
|
||||
{
|
||||
auto const& hash = hashes[i];
|
||||
batch.push_back(&hash);
|
||||
}
|
||||
|
||||
std::vector<std::shared_ptr<NodeObject>> results{hashes.size()};
|
||||
results = backend_->fetchBatch(batch).first;
|
||||
for (size_t i = 0; i < results.size(); ++i)
|
||||
{
|
||||
if (!results[i])
|
||||
// See if the object already exists in the cache
|
||||
auto nObj = cache_ ? cache_->fetch(hash) : nullptr;
|
||||
++fetches;
|
||||
if (!nObj)
|
||||
{
|
||||
JLOG(j_.error()) << "fetchBatch - "
|
||||
<< "record not found in db. hash = " << strHex(hashes[i]);
|
||||
// Try the database
|
||||
indexMap[&hash] = i;
|
||||
cacheMisses.push_back(&hash);
|
||||
}
|
||||
else
|
||||
{
|
||||
results[i] = nObj->getType() == hotDUMMY ? nullptr : nObj;
|
||||
// It was in the cache.
|
||||
++hits;
|
||||
}
|
||||
}
|
||||
|
||||
JLOG(j_.debug()) << "fetchBatch - cache hits = " << (hashes.size() - cacheMisses.size())
|
||||
<< " - cache misses = " << cacheMisses.size();
|
||||
auto dbResults = backend_->fetchBatch(cacheMisses).first;
|
||||
|
||||
for (size_t i = 0; i < dbResults.size(); ++i)
|
||||
{
|
||||
auto nObj = std::move(dbResults[i]);
|
||||
size_t index = indexMap[cacheMisses[i]];
|
||||
auto const& hash = hashes[index];
|
||||
|
||||
if (nObj)
|
||||
{
|
||||
// Ensure all threads get the same object
|
||||
if (cache_)
|
||||
cache_->canonicalize_replace_client(hash, nObj);
|
||||
}
|
||||
else
|
||||
{
|
||||
JLOG(j_.error()) << "fetchBatch - "
|
||||
<< "record not found in db or cache. hash = " << strHex(hash);
|
||||
if (cache_)
|
||||
{
|
||||
auto notFound = NodeObject::createObject(hotDUMMY, {}, hash);
|
||||
cache_->canonicalize_replace_client(hash, notFound);
|
||||
if (notFound->getType() != hotDUMMY)
|
||||
nObj = std::move(notFound);
|
||||
}
|
||||
}
|
||||
results[index] = std::move(nObj);
|
||||
}
|
||||
|
||||
auto fetchDurationUs = std::chrono::duration_cast<std::chrono::microseconds>(steady_clock::now() - before).count();
|
||||
updateFetchMetrics(hashes.size(), 0, fetchDurationUs);
|
||||
updateFetchMetrics(fetches, hits, fetchDurationUs);
|
||||
return results;
|
||||
}
|
||||
|
||||
|
||||
@@ -387,6 +387,33 @@ class HashRouter_test : public beast::unit_test::suite
|
||||
BEAST_EXPECT(!any(HF::UNDEFINED));
|
||||
}
|
||||
|
||||
void
|
||||
testProcessPeer()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
TestStopwatch stopwatch;
|
||||
HashRouter router(getSetup(5s, 5s), stopwatch);
|
||||
uint256 const key(1);
|
||||
HashRouter::PeerShortID peer1 = 1;
|
||||
HashRouter::PeerShortID peer2 = 2;
|
||||
auto const timeout = 2s;
|
||||
|
||||
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
|
||||
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
|
||||
++stopwatch;
|
||||
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
|
||||
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
|
||||
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
|
||||
++stopwatch;
|
||||
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
|
||||
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
|
||||
++stopwatch;
|
||||
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
|
||||
++stopwatch;
|
||||
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
|
||||
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
|
||||
}
|
||||
|
||||
public:
|
||||
void
|
||||
run() override
|
||||
@@ -399,6 +426,7 @@ public:
|
||||
testProcess();
|
||||
testSetup();
|
||||
testFlagsOps();
|
||||
testProcessPeer();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -288,6 +288,11 @@ public:
|
||||
{
|
||||
return false;
|
||||
}
|
||||
std::set<std::optional<uint64_t>>
|
||||
releaseRequestCookies(uint256 const& requestHash) override
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
std::string const&
|
||||
fingerprint() const override
|
||||
|
||||
@@ -490,8 +490,19 @@ public:
|
||||
Env env(*this, envconfig(onlineDelete));
|
||||
|
||||
/////////////////////////////////////////////////////////////
|
||||
// Create NodeStore with two backends to allow online deletion of data.
|
||||
// Normally, SHAMapStoreImp handles all these details.
|
||||
// Create the backend. Normally, SHAMapStoreImp handles all these
|
||||
// details
|
||||
auto nscfg = env.app().config().section(ConfigSection::nodeDatabase());
|
||||
|
||||
// Provide default values:
|
||||
if (!nscfg.exists("cache_size"))
|
||||
nscfg.set(
|
||||
"cache_size", std::to_string(env.app().config().getValueFor(SizedItem::treeCacheSize, std::nullopt)));
|
||||
|
||||
if (!nscfg.exists("cache_age"))
|
||||
nscfg.set(
|
||||
"cache_age", std::to_string(env.app().config().getValueFor(SizedItem::treeCacheAge, std::nullopt)));
|
||||
|
||||
NodeStoreScheduler scheduler(env.app().getJobQueue());
|
||||
|
||||
std::string const writableDb = "write";
|
||||
@@ -499,8 +510,9 @@ public:
|
||||
auto writableBackend = makeBackendRotating(env, scheduler, writableDb);
|
||||
auto archiveBackend = makeBackendRotating(env, scheduler, archiveDb);
|
||||
|
||||
// Create NodeStore with two backends to allow online deletion of
|
||||
// data
|
||||
constexpr int readThreads = 4;
|
||||
auto nscfg = env.app().config().section(ConfigSection::nodeDatabase());
|
||||
auto dbr = std::make_unique<NodeStore::DatabaseRotatingImp>(
|
||||
scheduler,
|
||||
readThreads,
|
||||
|
||||
@@ -64,7 +64,9 @@ public:
|
||||
BEAST_EXPECT(negotiateProtocolVersion("RTXP/1.2") == std::nullopt);
|
||||
BEAST_EXPECT(negotiateProtocolVersion("RTXP/1.2, XRPL/2.0, XRPL/2.1") == make_protocol(2, 1));
|
||||
BEAST_EXPECT(negotiateProtocolVersion("XRPL/2.2") == make_protocol(2, 2));
|
||||
BEAST_EXPECT(negotiateProtocolVersion("RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") == make_protocol(2, 2));
|
||||
BEAST_EXPECT(
|
||||
negotiateProtocolVersion("RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/2.4, XRPL/999.999") ==
|
||||
make_protocol(2, 3));
|
||||
BEAST_EXPECT(negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") == std::nullopt);
|
||||
BEAST_EXPECT(negotiateProtocolVersion("") == std::nullopt);
|
||||
}
|
||||
|
||||
@@ -167,6 +167,11 @@ public:
|
||||
removeTxQueue(uint256 const&) override
|
||||
{
|
||||
}
|
||||
std::set<std::optional<uint64_t>>
|
||||
releaseRequestCookies(uint256 const& requestHash) override
|
||||
{
|
||||
return {};
|
||||
}
|
||||
};
|
||||
|
||||
/** Manually advanced clock. */
|
||||
|
||||
@@ -918,7 +918,7 @@ void
|
||||
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
|
||||
{
|
||||
if (!positions && app_.getOPs().isFull())
|
||||
app_.getOPs().setMode(OperatingMode::CONNECTED);
|
||||
app_.getOPs().setMode(OperatingMode::CONNECTED, "updateOperatingMode: no positions");
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -172,6 +172,24 @@ private:
|
||||
std::unique_ptr<PeerSet> mPeerSet;
|
||||
};
|
||||
|
||||
inline std::string
|
||||
to_string(InboundLedger::Reason reason)
|
||||
{
|
||||
using enum InboundLedger::Reason;
|
||||
switch (reason)
|
||||
{
|
||||
case HISTORY:
|
||||
return "HISTORY";
|
||||
case GENERIC:
|
||||
return "GENERIC";
|
||||
case CONSENSUS:
|
||||
return "CONSENSUS";
|
||||
default:
|
||||
UNREACHABLE("ripple::to_string(InboundLedger::Reason) : unknown value");
|
||||
return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace xrpl
|
||||
|
||||
#endif
|
||||
|
||||
@@ -344,7 +344,14 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
|
||||
|
||||
if (!wasProgress)
|
||||
{
|
||||
checkLocal();
|
||||
if (checkLocal())
|
||||
{
|
||||
// Done. Something else (probably consensus) built the ledger
|
||||
// locally while waiting for data (or possibly before requesting)
|
||||
XRPL_ASSERT(isDone(), "ripple::InboundLedger::onTimer : done");
|
||||
JLOG(journal_.info()) << "Finished while waiting " << hash_;
|
||||
return;
|
||||
}
|
||||
|
||||
mByHash = true;
|
||||
|
||||
|
||||
@@ -3,9 +3,9 @@
|
||||
#include <xrpld/app/main/Application.h>
|
||||
#include <xrpld/app/misc/NetworkOPs.h>
|
||||
|
||||
#include <xrpl/basics/CanProcess.h>
|
||||
#include <xrpl/basics/DecayingSample.h>
|
||||
#include <xrpl/basics/Log.h>
|
||||
#include <xrpl/basics/scope.h>
|
||||
#include <xrpl/beast/container/aged_map.h>
|
||||
#include <xrpl/core/JobQueue.h>
|
||||
#include <xrpl/core/PerfLog.h>
|
||||
@@ -53,10 +53,77 @@ public:
|
||||
auto doAcquire = [&, seq, reason]() -> std::shared_ptr<Ledger const> {
|
||||
XRPL_ASSERT(hash.isNonZero(), "xrpl::InboundLedgersImp::acquire::doAcquire : nonzero hash");
|
||||
|
||||
// probably not the right rule
|
||||
if (app_.getOPs().isNeedNetworkLedger() && (reason != InboundLedger::Reason::GENERIC) &&
|
||||
(reason != InboundLedger::Reason::CONSENSUS))
|
||||
bool const needNetworkLedger = app_.getOPs().isNeedNetworkLedger();
|
||||
bool const shouldAcquire = [&]() {
|
||||
if (!needNetworkLedger)
|
||||
return true;
|
||||
if (reason == InboundLedger::Reason::GENERIC)
|
||||
return true;
|
||||
if (reason == InboundLedger::Reason::CONSENSUS)
|
||||
return true;
|
||||
return false;
|
||||
}();
|
||||
|
||||
std::stringstream ss;
|
||||
ss << "InboundLedger::acquire: "
|
||||
<< "Request: " << to_string(hash) << ", " << seq
|
||||
<< " NeedNetworkLedger: " << (needNetworkLedger ? "yes" : "no") << " Reason: " << to_string(reason)
|
||||
<< " Should acquire: " << (shouldAcquire ? "true." : "false.");
|
||||
|
||||
/* Acquiring ledgers is somewhat expensive. It requires lots of
|
||||
* computation and network communication. Avoid it when it's not
|
||||
* appropriate. Every validation from a peer for a ledger that
|
||||
* we do not have locally results in a call to this function: even
|
||||
* if we are moments away from validating the same ledger.
|
||||
*/
|
||||
bool const shouldBroadcast = [&]() {
|
||||
// If the node is not in "full" state, it needs to sync to
|
||||
// the network, and doesn't have the necessary tx's and
|
||||
// ledger entries to build the ledger.
|
||||
bool const isFull = app_.getOPs().isFull();
|
||||
// If everything else is ok, don't try to acquire the ledger
|
||||
// if the requested seq is in the near future relative to
|
||||
// the validated ledger. If the requested ledger is between
|
||||
// 1 and 19 inclusive ledgers ahead of the valid ledger this
|
||||
// node has not built it yet, but it's possible/likely it
|
||||
// has the tx's necessary to build it and get caught up.
|
||||
// Plus it might not become validated. On the other hand, if
|
||||
// it's more than 20 in the future, this node should request
|
||||
// it so that it can jump ahead and get caught up.
|
||||
LedgerIndex const validSeq = app_.getLedgerMaster().getValidLedgerIndex();
|
||||
constexpr std::size_t lagLeeway = 20;
|
||||
bool const nearFuture = (seq > validSeq) && (seq < validSeq + lagLeeway);
|
||||
// If everything else is ok, don't try to acquire the ledger
|
||||
// if the request is related to consensus. (Note that
|
||||
// consensus calls usually pass a seq of 0, so nearFuture
|
||||
// will be false other than on a brand new network.)
|
||||
bool const consensus = reason == InboundLedger::Reason::CONSENSUS;
|
||||
ss << " Evaluating whether to broadcast requests to peers"
|
||||
<< ". full: " << (isFull ? "true" : "false") << ". ledger sequence " << seq
|
||||
<< ". Valid sequence: " << validSeq << ". Lag leeway: " << lagLeeway
|
||||
<< ". request for near future ledger: " << (nearFuture ? "true" : "false")
|
||||
<< ". Consensus: " << (consensus ? "true" : "false");
|
||||
|
||||
// If the node is not synced, send requests.
|
||||
if (!isFull)
|
||||
return true;
|
||||
// If the ledger is in the near future, do NOT send requests.
|
||||
// This node is probably about to build it.
|
||||
if (nearFuture)
|
||||
return false;
|
||||
// If the request is because of consensus, do NOT send requests.
|
||||
// This node is probably about to build it.
|
||||
if (consensus)
|
||||
return false;
|
||||
return true;
|
||||
}();
|
||||
ss << ". Would broadcast to peers? " << (shouldBroadcast ? "true." : "false.");
|
||||
|
||||
if (!shouldAcquire)
|
||||
{
|
||||
JLOG(j_.debug()) << "Abort(rule): " << ss.str();
|
||||
return {};
|
||||
}
|
||||
|
||||
bool isNew = true;
|
||||
std::shared_ptr<InboundLedger> inbound;
|
||||
@@ -64,6 +131,7 @@ public:
|
||||
ScopedLockType sl(mLock);
|
||||
if (stopping_)
|
||||
{
|
||||
JLOG(j_.debug()) << "Abort(stopping): " << ss.str();
|
||||
return {};
|
||||
}
|
||||
|
||||
@@ -82,46 +150,50 @@ public:
|
||||
++mCounter;
|
||||
}
|
||||
}
|
||||
ss << " IsNew: " << (isNew ? "true" : "false");
|
||||
|
||||
if (inbound->isFailed())
|
||||
{
|
||||
JLOG(j_.debug()) << "Abort(failed): " << ss.str();
|
||||
return {};
|
||||
}
|
||||
|
||||
if (!isNew)
|
||||
inbound->update(seq);
|
||||
|
||||
if (!inbound->isComplete())
|
||||
{
|
||||
JLOG(j_.debug()) << "InProgress: " << ss.str();
|
||||
return {};
|
||||
}
|
||||
|
||||
JLOG(j_.debug()) << "Complete: " << ss.str();
|
||||
return inbound->getLedger();
|
||||
};
|
||||
using namespace std::chrono_literals;
|
||||
std::shared_ptr<Ledger const> ledger =
|
||||
perf::measureDurationAndLog(doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
|
||||
|
||||
return ledger;
|
||||
return perf::measureDurationAndLog(doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
|
||||
}
|
||||
|
||||
void
|
||||
acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) override
|
||||
{
|
||||
std::unique_lock lock(acquiresMutex_);
|
||||
try
|
||||
if (CanProcess const check{acquiresMutex_, pendingAcquires_, hash})
|
||||
{
|
||||
if (pendingAcquires_.contains(hash))
|
||||
return;
|
||||
pendingAcquires_.insert(hash);
|
||||
scope_unlock unlock(lock);
|
||||
acquire(hash, seq, reason);
|
||||
try
|
||||
{
|
||||
acquire(hash, seq, reason);
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
JLOG(j_.warn()) << "Exception thrown for acquiring new inbound ledger " << hash << ": " << e.what();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new "
|
||||
"inbound ledger "
|
||||
<< hash;
|
||||
}
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
JLOG(j_.warn()) << "Exception thrown for acquiring new inbound ledger " << hash << ": " << e.what();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new inbound ledger " << hash;
|
||||
}
|
||||
pendingAcquires_.erase(hash);
|
||||
}
|
||||
|
||||
std::shared_ptr<InboundLedger>
|
||||
|
||||
@@ -890,8 +890,8 @@ LedgerMaster::checkAccept(std::shared_ptr<Ledger const> const& ledger)
|
||||
return;
|
||||
}
|
||||
|
||||
JLOG(m_journal.info()) << "Advancing accepted ledger to " << ledger->header().seq << " with >= " << minVal
|
||||
<< " validations";
|
||||
JLOG(m_journal.info()) << "Advancing accepted ledger to " << ledger->header().seq << " ("
|
||||
<< to_short_string(ledger->header().hash) << ") with >= " << minVal << " validations";
|
||||
|
||||
ledger->setValidated();
|
||||
ledger->setFull();
|
||||
|
||||
@@ -13,7 +13,8 @@ TimeoutCounter::TimeoutCounter(
|
||||
QueueJobParameter&& jobParameter,
|
||||
beast::Journal journal)
|
||||
: app_(app)
|
||||
, journal_(journal)
|
||||
, sink_(journal, to_short_string(hash) + " ")
|
||||
, journal_(sink_)
|
||||
, hash_(hash)
|
||||
, timeouts_(0)
|
||||
, complete_(false)
|
||||
@@ -33,6 +34,7 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
|
||||
{
|
||||
if (isDone())
|
||||
return;
|
||||
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count() << "ms";
|
||||
timer_.expires_after(timerInterval_);
|
||||
timer_.async_wait([wptr = pmDowncast()](boost::system::error_code const& ec) {
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
@@ -40,6 +42,9 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
|
||||
|
||||
if (auto ptr = wptr.lock())
|
||||
{
|
||||
JLOG(ptr->journal_.debug()) << "timer: ec: " << ec
|
||||
<< " (operation_aborted: " << boost::asio::error::operation_aborted << " - "
|
||||
<< (ec == boost::asio::error::operation_aborted ? "aborted" : "other") << ")";
|
||||
ScopedLockType sl(ptr->mtx_);
|
||||
ptr->queueJob(sl);
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
#include <xrpld/app/main/Application.h>
|
||||
|
||||
#include <xrpl/beast/utility/Journal.h>
|
||||
#include <xrpl/beast/utility/WrappedSink.h>
|
||||
#include <xrpl/core/Job.h>
|
||||
|
||||
#include <boost/asio/basic_waitable_timer.hpp>
|
||||
@@ -104,6 +105,7 @@ protected:
|
||||
// Used in this class for access to boost::asio::io_context and
|
||||
// xrpl::Overlay. Used in subtypes for the kitchen sink.
|
||||
Application& app_;
|
||||
beast::WrappedSink sink_;
|
||||
beast::Journal journal_;
|
||||
mutable std::recursive_mutex mtx_;
|
||||
|
||||
|
||||
@@ -71,6 +71,16 @@ HashRouter::shouldProcess(
|
||||
return s.shouldProcess(suppressionMap_.clock().now(), tx_interval);
|
||||
}
|
||||
|
||||
bool
|
||||
HashRouter::shouldProcessForPeer(uint256 const& key, PeerShortID peer, std::chrono::seconds interval)
|
||||
{
|
||||
std::lock_guard lock(mutex_);
|
||||
|
||||
auto& entry = emplace(key).first;
|
||||
|
||||
return entry.shouldProcessForPeer(peer, suppressionMap_.clock().now(), interval);
|
||||
}
|
||||
|
||||
HashRouterFlags
|
||||
HashRouter::getFlags(uint256 const& key)
|
||||
{
|
||||
@@ -142,4 +152,13 @@ setup_HashRouter(Config const& config)
|
||||
return setup;
|
||||
}
|
||||
|
||||
auto
|
||||
HashRouter::getPeers(uint256 const& key) -> std::set<PeerShortID>
|
||||
{
|
||||
std::lock_guard lock(mutex_);
|
||||
|
||||
auto& s = emplace(key).first;
|
||||
return s.peekPeerSet();
|
||||
}
|
||||
|
||||
} // namespace xrpl
|
||||
|
||||
@@ -140,6 +140,13 @@ private:
|
||||
return std::move(peers_);
|
||||
}
|
||||
|
||||
/** Return set of peers waiting for reply. Leaves list unchanged. */
|
||||
std::set<PeerShortID> const&
|
||||
peekPeerSet()
|
||||
{
|
||||
return peers_;
|
||||
}
|
||||
|
||||
/** Return seated relay time point if the message has been relayed */
|
||||
std::optional<Stopwatch::time_point>
|
||||
relayed() const
|
||||
@@ -171,6 +178,17 @@ private:
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
shouldProcessForPeer(PeerShortID peer, Stopwatch::time_point now, std::chrono::seconds interval)
|
||||
{
|
||||
if (peerProcessed_.contains(peer) && ((peerProcessed_[peer] + interval) > now))
|
||||
return false;
|
||||
// Peer may already be in the list, but adding it again doesn't hurt
|
||||
addPeer(peer);
|
||||
peerProcessed_[peer] = now;
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
HashRouterFlags flags_ = HashRouterFlags::UNDEFINED;
|
||||
std::set<PeerShortID> peers_;
|
||||
@@ -178,6 +196,7 @@ private:
|
||||
// than one flag needs to expire independently.
|
||||
std::optional<Stopwatch::time_point> relayed_;
|
||||
std::optional<Stopwatch::time_point> processed_;
|
||||
std::map<PeerShortID, Stopwatch::time_point> peerProcessed_;
|
||||
};
|
||||
|
||||
public:
|
||||
@@ -200,7 +219,7 @@ public:
|
||||
|
||||
/** Add a suppression peer and get message's relay status.
|
||||
* Return pair:
|
||||
* element 1: true if the peer is added.
|
||||
* element 1: true if the key is added.
|
||||
* element 2: optional is seated to the relay time point or
|
||||
* is unseated if has not relayed yet. */
|
||||
std::pair<bool, std::optional<Stopwatch::time_point>>
|
||||
@@ -213,6 +232,15 @@ public:
|
||||
bool
|
||||
shouldProcess(uint256 const& key, PeerShortID peer, HashRouterFlags& flags, std::chrono::seconds tx_interval);
|
||||
|
||||
/** Determines whether the hashed item should be processed for the given
|
||||
peer. Could be an incoming or outgoing message.
|
||||
|
||||
Items filtered with this function should only be processed for the given
|
||||
peer once. Unlike shouldProcess, it can be processed for other peers.
|
||||
*/
|
||||
bool
|
||||
shouldProcessForPeer(uint256 const& key, PeerShortID peer, std::chrono::seconds interval);
|
||||
|
||||
/** Set the flags on a hash.
|
||||
|
||||
@return `true` if the flags were changed. `false` if unchanged.
|
||||
@@ -238,6 +266,11 @@ public:
|
||||
std::optional<std::set<PeerShortID>>
|
||||
shouldRelay(uint256 const& key);
|
||||
|
||||
/** Returns a copy of the set of peers in the Entry for the key
|
||||
*/
|
||||
std::set<PeerShortID>
|
||||
getPeers(uint256 const& key);
|
||||
|
||||
private:
|
||||
// pair.second indicates whether the entry was created
|
||||
std::pair<Entry&, bool>
|
||||
|
||||
@@ -33,10 +33,10 @@
|
||||
#include <xrpld/rpc/MPTokenIssuanceID.h>
|
||||
#include <xrpld/rpc/ServerHandler.h>
|
||||
|
||||
#include <xrpl/basics/CanProcess.h>
|
||||
#include <xrpl/basics/UptimeClock.h>
|
||||
#include <xrpl/basics/mulDiv.h>
|
||||
#include <xrpl/basics/safe_cast.h>
|
||||
#include <xrpl/basics/scope.h>
|
||||
#include <xrpl/beast/utility/rngfill.h>
|
||||
#include <xrpl/core/PerfLog.h>
|
||||
#include <xrpl/crypto/RFC1751.h>
|
||||
@@ -378,7 +378,7 @@ public:
|
||||
isFull() override;
|
||||
|
||||
void
|
||||
setMode(OperatingMode om) override;
|
||||
setMode(OperatingMode om, char const* reason) override;
|
||||
|
||||
bool
|
||||
isBlocked() override;
|
||||
@@ -809,7 +809,7 @@ NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
|
||||
inline void
|
||||
NetworkOPsImp::setStandAlone()
|
||||
{
|
||||
setMode(OperatingMode::FULL);
|
||||
setMode(OperatingMode::FULL, "setStandAlone");
|
||||
}
|
||||
|
||||
inline void
|
||||
@@ -945,7 +945,7 @@ NetworkOPsImp::processHeartbeatTimer()
|
||||
{
|
||||
if (mMode != OperatingMode::DISCONNECTED)
|
||||
{
|
||||
setMode(OperatingMode::DISCONNECTED);
|
||||
setMode(OperatingMode::DISCONNECTED, "Heartbeat: insufficient peers");
|
||||
std::stringstream ss;
|
||||
ss << "Node count (" << numPeers << ") has fallen "
|
||||
<< "below required minimum (" << minPeerCount_ << ").";
|
||||
@@ -969,7 +969,7 @@ NetworkOPsImp::processHeartbeatTimer()
|
||||
|
||||
if (mMode == OperatingMode::DISCONNECTED)
|
||||
{
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
setMode(OperatingMode::CONNECTED, "Heartbeat: sufficient peers");
|
||||
JLOG(m_journal.info()) << "Node count (" << numPeers << ") is sufficient.";
|
||||
CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers << " peers. ";
|
||||
}
|
||||
@@ -979,9 +979,9 @@ NetworkOPsImp::processHeartbeatTimer()
|
||||
auto origMode = mMode.load();
|
||||
CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
|
||||
if (mMode == OperatingMode::SYNCING)
|
||||
setMode(OperatingMode::SYNCING);
|
||||
setMode(OperatingMode::SYNCING, "Heartbeat: check syncing");
|
||||
else if (mMode == OperatingMode::CONNECTED)
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
setMode(OperatingMode::CONNECTED, "Heartbeat: check connected");
|
||||
auto newMode = mMode.load();
|
||||
if (origMode != newMode)
|
||||
{
|
||||
@@ -1650,7 +1650,7 @@ void
|
||||
NetworkOPsImp::setAmendmentBlocked()
|
||||
{
|
||||
amendmentBlocked_ = true;
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
setMode(OperatingMode::CONNECTED, "setAmendmentBlocked");
|
||||
}
|
||||
|
||||
inline bool
|
||||
@@ -1681,7 +1681,7 @@ void
|
||||
NetworkOPsImp::setUNLBlocked()
|
||||
{
|
||||
unlBlocked_ = true;
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
setMode(OperatingMode::CONNECTED, "setUNLBlocked");
|
||||
}
|
||||
|
||||
inline void
|
||||
@@ -1776,7 +1776,7 @@ NetworkOPsImp::checkLastClosedLedger(Overlay::PeerSequence const& peerList, uint
|
||||
|
||||
if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
|
||||
{
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
setMode(OperatingMode::CONNECTED, "check LCL: not on consensus ledger");
|
||||
}
|
||||
|
||||
if (consensus)
|
||||
@@ -1856,8 +1856,8 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed, std::unique_ptr<std:
|
||||
// this shouldn't happen unless we jump ledgers
|
||||
if (mMode == OperatingMode::FULL)
|
||||
{
|
||||
JLOG(m_journal.warn()) << "Don't have LCL, going to tracking";
|
||||
setMode(OperatingMode::TRACKING);
|
||||
JLOG(m_journal.warn()) << "beginConsensus Don't have LCL, going to tracking";
|
||||
setMode(OperatingMode::TRACKING, "beginConsensus: No LCL");
|
||||
CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. ";
|
||||
}
|
||||
|
||||
@@ -1981,7 +1981,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
|
||||
// validations we have for LCL. If the ledger is good enough, go to
|
||||
// TRACKING - TODO
|
||||
if (!needNetworkLedger_)
|
||||
setMode(OperatingMode::TRACKING);
|
||||
setMode(OperatingMode::TRACKING, "endConsensus: check tracking");
|
||||
}
|
||||
|
||||
if (((mMode == OperatingMode::CONNECTED) || (mMode == OperatingMode::TRACKING)) && !ledgerChange)
|
||||
@@ -1992,7 +1992,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
|
||||
auto current = m_ledgerMaster.getCurrentLedger();
|
||||
if (app_.timeKeeper().now() < (current->header().parentCloseTime + 2 * current->header().closeTimeResolution))
|
||||
{
|
||||
setMode(OperatingMode::FULL);
|
||||
setMode(OperatingMode::FULL, "endConsensus: check full");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2004,7 +2004,7 @@ NetworkOPsImp::consensusViewChange()
|
||||
{
|
||||
if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING))
|
||||
{
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
setMode(OperatingMode::CONNECTED, "consensusViewChange");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2302,7 +2302,7 @@ NetworkOPsImp::pubPeerStatus(std::function<Json::Value(void)> const& func)
|
||||
}
|
||||
|
||||
void
|
||||
NetworkOPsImp::setMode(OperatingMode om)
|
||||
NetworkOPsImp::setMode(OperatingMode om, char const* reason)
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
if (om == OperatingMode::CONNECTED)
|
||||
@@ -2322,11 +2322,12 @@ NetworkOPsImp::setMode(OperatingMode om)
|
||||
if (mMode == om)
|
||||
return;
|
||||
|
||||
auto const sink = om < mMode ? m_journal.warn() : m_journal.info();
|
||||
mMode = om;
|
||||
|
||||
accounting_.mode(om);
|
||||
|
||||
JLOG(m_journal.info()) << "STATE->" << strOperatingMode();
|
||||
JLOG(sink) << "STATE->" << strOperatingMode() << " - " << reason;
|
||||
pubServer();
|
||||
}
|
||||
|
||||
@@ -2335,31 +2336,23 @@ NetworkOPsImp::recvValidation(std::shared_ptr<STValidation> const& val, std::str
|
||||
{
|
||||
JLOG(m_journal.trace()) << "recvValidation " << val->getLedgerHash() << " from " << source;
|
||||
|
||||
std::unique_lock lock(validationsMutex_);
|
||||
BypassAccept bypassAccept = BypassAccept::no;
|
||||
try
|
||||
{
|
||||
if (pendingValidations_.contains(val->getLedgerHash()))
|
||||
bypassAccept = BypassAccept::yes;
|
||||
else
|
||||
pendingValidations_.insert(val->getLedgerHash());
|
||||
scope_unlock unlock(lock);
|
||||
handleNewValidation(app_, val, source, bypassAccept, m_journal);
|
||||
CanProcess const check(validationsMutex_, pendingValidations_, val->getLedgerHash());
|
||||
try
|
||||
{
|
||||
BypassAccept bypassAccept = check ? BypassAccept::no : BypassAccept::yes;
|
||||
handleNewValidation(app_, val, source, bypassAccept, m_journal);
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
JLOG(m_journal.warn()) << "Exception thrown for handling new validation " << val->getLedgerHash() << ": "
|
||||
<< e.what();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
JLOG(m_journal.warn()) << "Unknown exception thrown for handling new validation " << val->getLedgerHash();
|
||||
}
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
JLOG(m_journal.warn()) << "Exception thrown for handling new validation " << val->getLedgerHash() << ": "
|
||||
<< e.what();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
JLOG(m_journal.warn()) << "Unknown exception thrown for handling new validation " << val->getLedgerHash();
|
||||
}
|
||||
if (bypassAccept == BypassAccept::no)
|
||||
{
|
||||
pendingValidations_.erase(val->getLedgerHash());
|
||||
}
|
||||
lock.unlock();
|
||||
|
||||
pubValidation(val);
|
||||
|
||||
|
||||
@@ -180,7 +180,7 @@ public:
|
||||
virtual bool
|
||||
isFull() = 0;
|
||||
virtual void
|
||||
setMode(OperatingMode om) = 0;
|
||||
setMode(OperatingMode om, char const* reason) = 0;
|
||||
virtual bool
|
||||
isBlocked() = 0;
|
||||
virtual bool
|
||||
|
||||
@@ -130,6 +130,14 @@ std::unique_ptr<NodeStore::Database>
|
||||
SHAMapStoreImp::makeNodeStore(int readThreads)
|
||||
{
|
||||
auto nscfg = app_.config().section(ConfigSection::nodeDatabase());
|
||||
|
||||
// Provide default values:
|
||||
if (!nscfg.exists("cache_size"))
|
||||
nscfg.set("cache_size", std::to_string(app_.config().getValueFor(SizedItem::treeCacheSize, std::nullopt)));
|
||||
|
||||
if (!nscfg.exists("cache_age"))
|
||||
nscfg.set("cache_age", std::to_string(app_.config().getValueFor(SizedItem::treeCacheAge, std::nullopt)));
|
||||
|
||||
std::unique_ptr<NodeStore::Database> db;
|
||||
|
||||
if (deleteInterval_)
|
||||
@@ -218,6 +226,8 @@ SHAMapStoreImp::run()
|
||||
LedgerIndex lastRotated = state_db_.getState().lastRotated;
|
||||
netOPs_ = &app_.getOPs();
|
||||
ledgerMaster_ = &app_.getLedgerMaster();
|
||||
fullBelowCache_ = &(*app_.getNodeFamily().getFullBelowCache());
|
||||
treeNodeCache_ = &(*app_.getNodeFamily().getTreeNodeCache());
|
||||
|
||||
if (advisoryDelete_)
|
||||
canDelete_ = state_db_.getCanDelete();
|
||||
@@ -480,13 +490,16 @@ void
|
||||
SHAMapStoreImp::clearCaches(LedgerIndex validatedSeq)
|
||||
{
|
||||
ledgerMaster_->clearLedgerCachePrior(validatedSeq);
|
||||
fullBelowCache_->clear();
|
||||
}
|
||||
|
||||
void
|
||||
SHAMapStoreImp::freshenCaches()
|
||||
{
|
||||
freshenCache(*app_.getNodeFamily().getTreeNodeCache());
|
||||
freshenCache(app_.getMasterTransaction().getCache());
|
||||
if (freshenCache(*treeNodeCache_))
|
||||
return;
|
||||
if (freshenCache(app_.getMasterTransaction().getCache()))
|
||||
return;
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -94,6 +94,8 @@ private:
|
||||
// as of run() or before
|
||||
NetworkOPs* netOPs_ = nullptr;
|
||||
LedgerMaster* ledgerMaster_ = nullptr;
|
||||
FullBelowCache* fullBelowCache_ = nullptr;
|
||||
TreeNodeCache* treeNodeCache_ = nullptr;
|
||||
|
||||
static constexpr auto nodeStoreName_ = "NodeStore";
|
||||
|
||||
|
||||
@@ -14,11 +14,7 @@ namespace Resource {
|
||||
class Charge;
|
||||
}
|
||||
|
||||
enum class ProtocolFeature {
|
||||
ValidatorListPropagation,
|
||||
ValidatorList2Propagation,
|
||||
LedgerReplay,
|
||||
};
|
||||
enum class ProtocolFeature { ValidatorListPropagation, ValidatorList2Propagation, LedgerReplay, LedgerDataCookies };
|
||||
|
||||
/** Represents a peer connection in the overlay. */
|
||||
class Peer
|
||||
@@ -117,6 +113,13 @@ public:
|
||||
|
||||
virtual bool
|
||||
txReduceRelayEnabled() const = 0;
|
||||
|
||||
//
|
||||
// Messages
|
||||
//
|
||||
|
||||
virtual std::set<std::optional<uint64_t>>
|
||||
releaseRequestCookies(uint256 const& requestHash) = 0;
|
||||
};
|
||||
|
||||
} // namespace xrpl
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include <xrpld/app/tx/apply.h>
|
||||
#include <xrpld/overlay/Cluster.h>
|
||||
#include <xrpld/overlay/detail/PeerImp.h>
|
||||
#include <xrpld/overlay/detail/ProtocolMessage.h>
|
||||
#include <xrpld/overlay/detail/Tuning.h>
|
||||
|
||||
#include <xrpl/basics/UptimeClock.h>
|
||||
@@ -45,6 +46,8 @@ std::chrono::seconds constexpr peerTimerInterval{60};
|
||||
/** The timeout for a shutdown timer */
|
||||
std::chrono::seconds constexpr shutdownTimerInterval{5};
|
||||
|
||||
/** How often we process duplicate incoming TMGetLedger messages */
|
||||
std::chrono::seconds constexpr getledgerInterval{15};
|
||||
} // namespace
|
||||
|
||||
// TODO: Remove this exclusion once unit tests are added after the hotfix
|
||||
@@ -461,6 +464,8 @@ PeerImp::supportsFeature(ProtocolFeature f) const
|
||||
return protocol_ >= make_protocol(2, 2);
|
||||
case ProtocolFeature::LedgerReplay:
|
||||
return ledgerReplayEnabled_;
|
||||
case ProtocolFeature::LedgerDataCookies:
|
||||
return protocol_ >= make_protocol(2, 3);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
@@ -1337,8 +1342,9 @@ PeerImp::handleTransaction(std::shared_ptr<protocol::TMTransaction> const& m, bo
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
|
||||
{
|
||||
auto badData = [&](std::string const& msg) {
|
||||
fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
|
||||
auto badData = [&](std::string const& msg, bool chargefee = true) {
|
||||
if (chargefee)
|
||||
fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
|
||||
JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
|
||||
};
|
||||
auto const itype{m->itype()};
|
||||
@@ -1411,11 +1417,66 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
|
||||
}
|
||||
}
|
||||
|
||||
// Drop duplicate requests from the same peer for at least
|
||||
// `getLedgerInterval` seconds.
|
||||
// Append a little junk to prevent the hash of an incoming messsage
|
||||
// from matching the hash of the same outgoing message.
|
||||
// `shouldProcessForPeer` does not distingish between incoming and
|
||||
// outgoing, and some of the message relay logic checks the hash to see
|
||||
// if the message has been relayed already. If the hashes are the same,
|
||||
// a duplicate will be detected when sending the message is attempted,
|
||||
// so it will fail.
|
||||
auto const messageHash = sha512Half(*m, nullptr);
|
||||
// Request cookies are not included in the hash. Track them here.
|
||||
auto const requestCookie = [&m]() -> std::optional<uint64_t> {
|
||||
if (m->has_requestcookie())
|
||||
return m->requestcookie();
|
||||
return std::nullopt;
|
||||
}();
|
||||
auto const [inserted, pending] = [&] {
|
||||
std::lock_guard lock{cookieLock_};
|
||||
auto& cookies = messageRequestCookies_[messageHash];
|
||||
bool const pending = !cookies.empty();
|
||||
return std::pair{cookies.emplace(requestCookie).second, pending};
|
||||
}();
|
||||
// Check if the request has been seen from this peer.
|
||||
if (!app_.getHashRouter().shouldProcessForPeer(messageHash, id_, getledgerInterval))
|
||||
{
|
||||
// This request has already been seen from this peer.
|
||||
// Has it been seen with this request cookie (or lack thereof)?
|
||||
|
||||
if (inserted)
|
||||
{
|
||||
// This is a duplicate request, but with a new cookie. When a
|
||||
// response is ready, one will be sent for each request cookie.
|
||||
JLOG(p_journal_.debug()) << "TMGetLedger: duplicate request with new request cookie: "
|
||||
<< requestCookie.value_or(0) << ". Job pending: " << (pending ? "yes" : "no")
|
||||
<< ": " << messageHash;
|
||||
if (pending)
|
||||
{
|
||||
// Don't bother queueing up a new job if other requests are
|
||||
// already pending. This should limit entries in the job queue
|
||||
// to one per peer per unique request.
|
||||
JLOG(p_journal_.debug()) << "TMGetLedger: Suppressing recvGetLedger job, since one "
|
||||
"is pending: "
|
||||
<< messageHash;
|
||||
return;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Don't punish nodes that don't know any better
|
||||
return badData(
|
||||
"duplicate request: " + to_string(messageHash), supportsFeature(ProtocolFeature::LedgerDataCookies));
|
||||
}
|
||||
}
|
||||
|
||||
// Queue a job to process the request
|
||||
JLOG(p_journal_.debug()) << "TMGetLedger: Adding recvGetLedger job: " << messageHash;
|
||||
std::weak_ptr<PeerImp> weak = shared_from_this();
|
||||
app_.getJobQueue().addJob(jtLEDGER_REQ, "RcvGetLedger", [weak, m]() {
|
||||
app_.getJobQueue().addJob(jtLEDGER_REQ, "RcvGetLedger", [weak, m, messageHash]() {
|
||||
if (auto peer = weak.lock())
|
||||
peer->processLedgerRequest(m);
|
||||
peer->processLedgerRequest(m, messageHash);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1514,8 +1575,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
|
||||
{
|
||||
auto badData = [&](std::string const& msg) {
|
||||
fee_.update(Resource::feeInvalidData, msg);
|
||||
auto badData = [&](std::string const& msg, bool charge = true) {
|
||||
if (charge)
|
||||
fee_.update(Resource::feeInvalidData, msg);
|
||||
JLOG(p_journal_.warn()) << "TMLedgerData: " << msg;
|
||||
};
|
||||
|
||||
@@ -1561,23 +1623,94 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
|
||||
return badData("Invalid Ledger/TXset nodes " + std::to_string(m->nodes_size()));
|
||||
}
|
||||
|
||||
// If there is a request cookie, attempt to relay the message
|
||||
if (m->has_requestcookie())
|
||||
auto const messageHash = sha512Half(*m);
|
||||
if (!app_.getHashRouter().addSuppressionPeer(messageHash, id_))
|
||||
{
|
||||
if (auto peer = overlay_.findPeerByShortID(m->requestcookie()))
|
||||
// Don't punish nodes that don't know any better
|
||||
return badData(
|
||||
"Duplicate message: " + to_string(messageHash), supportsFeature(ProtocolFeature::LedgerDataCookies));
|
||||
}
|
||||
|
||||
bool const routed = m->has_directresponse() || m->responsecookies_size() || m->has_requestcookie();
|
||||
|
||||
{
|
||||
// Check if this message needs to be forwarded to one or more peers.
|
||||
// Maximum of one of the relevant fields should be populated.
|
||||
XRPL_ASSERT(
|
||||
!m->has_requestcookie() || !m->responsecookies_size(),
|
||||
"ripple::PeerImp::onMessage(TMLedgerData) : valid cookie fields");
|
||||
|
||||
// Make a copy of the response cookies, then wipe the list so it can be
|
||||
// forwarded cleanly
|
||||
auto const responseCookies = m->responsecookies();
|
||||
m->clear_responsecookies();
|
||||
// Flag indicating if this response should be processed locally,
|
||||
// possibly in addition to being forwarded.
|
||||
bool const directResponse = m->has_directresponse() && m->directresponse();
|
||||
m->clear_directresponse();
|
||||
|
||||
auto const relay = [this, m, &messageHash](auto const cookie) {
|
||||
if (auto peer = overlay_.findPeerByShortID(cookie))
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
!m->has_requestcookie() && !m->responsecookies_size(),
|
||||
"ripple::PeerImp::onMessage(TMLedgerData) relay : no "
|
||||
"cookies");
|
||||
if (peer->supportsFeature(ProtocolFeature::LedgerDataCookies))
|
||||
// Setting this flag is not _strictly_ necessary for peers
|
||||
// that support it if there are no cookies included in the
|
||||
// message, but it is more accurate.
|
||||
m->set_directresponse(true);
|
||||
else
|
||||
m->clear_directresponse();
|
||||
peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
|
||||
}
|
||||
else
|
||||
JLOG(p_journal_.info()) << "Unable to route TX/ledger data reply to peer [" << cookie
|
||||
<< "]: " << messageHash;
|
||||
};
|
||||
// If there is a request cookie, attempt to relay the message
|
||||
if (m->has_requestcookie())
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
responseCookies.empty(),
|
||||
"ripple::PeerImp::onMessage(TMLedgerData) : no response "
|
||||
"cookies");
|
||||
m->clear_requestcookie();
|
||||
peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
|
||||
relay(m->requestcookie());
|
||||
if (!directResponse && responseCookies.empty())
|
||||
return;
|
||||
}
|
||||
else
|
||||
// If there's a list of request cookies, attempt to relay the message to
|
||||
// all of them.
|
||||
if (responseCookies.size())
|
||||
{
|
||||
JLOG(p_journal_.info()) << "Unable to route TX/ledger data reply";
|
||||
for (auto const cookie : responseCookies)
|
||||
relay(cookie);
|
||||
if (!directResponse)
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Now that any forwarding is done check the base message (data only, no
|
||||
// routing info for duplicates)
|
||||
if (routed)
|
||||
{
|
||||
m->clear_directresponse();
|
||||
XRPL_ASSERT(
|
||||
!m->has_requestcookie() && !m->responsecookies_size(),
|
||||
"ripple::PeerImp::onMessage(TMLedgerData) : no cookies");
|
||||
auto const baseMessageHash = sha512Half(*m);
|
||||
if (!app_.getHashRouter().addSuppressionPeer(baseMessageHash, id_))
|
||||
{
|
||||
// Don't punish nodes that don't know any better
|
||||
return badData(
|
||||
"Duplicate message: " + to_string(baseMessageHash),
|
||||
supportsFeature(ProtocolFeature::LedgerDataCookies));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
uint256 const ledgerHash{m->ledgerhash()};
|
||||
|
||||
// Otherwise check if received data for a candidate transaction set
|
||||
if (m->type() == protocol::liTS_CANDIDATE)
|
||||
{
|
||||
@@ -2849,16 +2982,21 @@ PeerImp::checkValidation(
|
||||
// the TX tree with the specified root hash.
|
||||
//
|
||||
static std::shared_ptr<PeerImp>
|
||||
getPeerWithTree(OverlayImpl& ov, uint256 const& rootHash, PeerImp const* skip)
|
||||
getPeerWithTree(
|
||||
OverlayImpl& ov,
|
||||
uint256 const& rootHash,
|
||||
PeerImp const* skip,
|
||||
std::function<bool(Peer::id_t)> shouldProcessCallback)
|
||||
{
|
||||
std::shared_ptr<PeerImp> ret;
|
||||
int retScore = 0;
|
||||
|
||||
XRPL_ASSERT(shouldProcessCallback, "ripple::getPeerWithTree : callback provided");
|
||||
ov.for_each([&](std::shared_ptr<PeerImp>&& p) {
|
||||
if (p->hasTxSet(rootHash) && p.get() != skip)
|
||||
{
|
||||
auto score = p->getScore(true);
|
||||
if (!ret || (score > retScore))
|
||||
if (!ret || (score > retScore && shouldProcessCallback(p->id())))
|
||||
{
|
||||
ret = std::move(p);
|
||||
retScore = score;
|
||||
@@ -2873,16 +3011,22 @@ getPeerWithTree(OverlayImpl& ov, uint256 const& rootHash, PeerImp const* skip)
|
||||
// have the ledger and how responsive it is.
|
||||
//
|
||||
static std::shared_ptr<PeerImp>
|
||||
getPeerWithLedger(OverlayImpl& ov, uint256 const& ledgerHash, LedgerIndex ledger, PeerImp const* skip)
|
||||
getPeerWithLedger(
|
||||
OverlayImpl& ov,
|
||||
uint256 const& ledgerHash,
|
||||
LedgerIndex ledger,
|
||||
PeerImp const* skip,
|
||||
std::function<bool(Peer::id_t)> shouldProcessCallback)
|
||||
{
|
||||
std::shared_ptr<PeerImp> ret;
|
||||
int retScore = 0;
|
||||
|
||||
XRPL_ASSERT(shouldProcessCallback, "ripple::getPeerWithLedger : callback provided");
|
||||
ov.for_each([&](std::shared_ptr<PeerImp>&& p) {
|
||||
if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
|
||||
{
|
||||
auto score = p->getScore(true);
|
||||
if (!ret || (score > retScore))
|
||||
if (!ret || (score > retScore && shouldProcessCallback(p->id())))
|
||||
{
|
||||
ret = std::move(p);
|
||||
retScore = score;
|
||||
@@ -2894,7 +3038,10 @@ getPeerWithLedger(OverlayImpl& ov, uint256 const& ledgerHash, LedgerIndex ledger
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::sendLedgerBase(std::shared_ptr<Ledger const> const& ledger, protocol::TMLedgerData& ledgerData)
|
||||
PeerImp::sendLedgerBase(
|
||||
std::shared_ptr<Ledger const> const& ledger,
|
||||
protocol::TMLedgerData& ledgerData,
|
||||
PeerCookieMap const& destinations)
|
||||
{
|
||||
JLOG(p_journal_.trace()) << "sendLedgerBase: Base data";
|
||||
|
||||
@@ -2924,14 +3071,90 @@ PeerImp::sendLedgerBase(std::shared_ptr<Ledger const> const& ledger, protocol::T
|
||||
}
|
||||
}
|
||||
|
||||
auto message{std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
|
||||
send(message);
|
||||
sendToMultiple(ledgerData, destinations);
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::sendToMultiple(protocol::TMLedgerData& ledgerData, PeerCookieMap const& destinations)
|
||||
{
|
||||
bool foundSelf = false;
|
||||
for (auto const& [peer, cookies] : destinations)
|
||||
{
|
||||
if (peer.get() == this)
|
||||
foundSelf = true;
|
||||
bool const multipleCookies = peer->supportsFeature(ProtocolFeature::LedgerDataCookies);
|
||||
std::vector<std::uint64_t> sendCookies;
|
||||
|
||||
bool directResponse = false;
|
||||
if (!multipleCookies)
|
||||
{
|
||||
JLOG(p_journal_.debug()) << "sendToMultiple: Sending " << cookies.size()
|
||||
<< " TMLedgerData messages to peer [" << peer->id()
|
||||
<< "]: " << sha512Half(ledgerData);
|
||||
}
|
||||
for (auto const& cookie : cookies)
|
||||
{
|
||||
// Unfortunately, need a separate Message object for every
|
||||
// combination
|
||||
if (cookie)
|
||||
{
|
||||
if (multipleCookies)
|
||||
{
|
||||
// Save this one for later to send a single message
|
||||
sendCookies.emplace_back(*cookie);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Feature not supported, so send a single message with a
|
||||
// single cookie
|
||||
ledgerData.set_requestcookie(*cookie);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (multipleCookies)
|
||||
{
|
||||
// Set this flag later on the single message
|
||||
directResponse = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
ledgerData.clear_requestcookie();
|
||||
}
|
||||
XRPL_ASSERT(
|
||||
!multipleCookies,
|
||||
"ripple::PeerImp::sendToMultiple : ledger data cookies "
|
||||
"unsupported");
|
||||
auto message{std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
|
||||
peer->send(message);
|
||||
}
|
||||
if (multipleCookies)
|
||||
{
|
||||
// Send a single message with all the cookies and/or the direct
|
||||
// response flag, so the receiver can farm out the single message to
|
||||
// multiple peers and/or itself
|
||||
XRPL_ASSERT(
|
||||
sendCookies.size() || directResponse, "ripple::PeerImp::sendToMultiple : valid response options");
|
||||
ledgerData.clear_requestcookie();
|
||||
ledgerData.clear_responsecookies();
|
||||
ledgerData.set_directresponse(directResponse);
|
||||
for (auto const& cookie : sendCookies)
|
||||
ledgerData.add_responsecookies(cookie);
|
||||
auto message{std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
|
||||
peer->send(message);
|
||||
|
||||
JLOG(p_journal_.debug()) << "sendToMultiple: Sent 1 TMLedgerData message to peer [" << peer->id()
|
||||
<< "]: including " << (directResponse ? "the direct response flag and " : "")
|
||||
<< sendCookies.size() << " response cookies. "
|
||||
<< ": " << sha512Half(ledgerData);
|
||||
}
|
||||
}
|
||||
XRPL_ASSERT(foundSelf, "ripple::PeerImp::sendToMultiple : current peer included");
|
||||
}
|
||||
|
||||
std::shared_ptr<Ledger const>
|
||||
PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
|
||||
PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m, uint256 const& mHash)
|
||||
{
|
||||
JLOG(p_journal_.trace()) << "getLedger: Ledger";
|
||||
JLOG(p_journal_.trace()) << "getLedger: Ledger " << mHash;
|
||||
|
||||
std::shared_ptr<Ledger const> ledger;
|
||||
|
||||
@@ -2947,16 +3170,23 @@ PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
|
||||
if (m->has_querytype() && !m->has_requestcookie())
|
||||
{
|
||||
// Attempt to relay the request to a peer
|
||||
if (auto const peer =
|
||||
getPeerWithLedger(overlay_, ledgerHash, m->has_ledgerseq() ? m->ledgerseq() : 0, this))
|
||||
// Note repeated messages will not relay to the same peer
|
||||
// before `getLedgerInterval` seconds. This prevents one
|
||||
// peer from getting flooded, and distributes the request
|
||||
// load. If a request has been relayed to all eligible
|
||||
// peers, then this message will not be relayed.
|
||||
if (auto const peer = getPeerWithLedger(
|
||||
overlay_, ledgerHash, m->has_ledgerseq() ? m->ledgerseq() : 0, this, [&](Peer::id_t id) {
|
||||
return app_.getHashRouter().shouldProcessForPeer(mHash, id, getledgerInterval);
|
||||
}))
|
||||
{
|
||||
m->set_requestcookie(id());
|
||||
peer->send(std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
|
||||
JLOG(p_journal_.debug()) << "getLedger: Request relayed to peer";
|
||||
JLOG(p_journal_.debug()) << "getLedger: Request relayed to peer [" << peer->id() << "]: " << mHash;
|
||||
return ledger;
|
||||
}
|
||||
|
||||
JLOG(p_journal_.trace()) << "getLedger: Failed to find peer to relay request";
|
||||
JLOG(p_journal_.trace()) << "getLedger: Don't have ledger with hash " << ledgerHash << ": " << mHash;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2965,14 +3195,15 @@ PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
|
||||
// Attempt to find ledger by sequence
|
||||
if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
|
||||
{
|
||||
JLOG(p_journal_.debug()) << "getLedger: Early ledger sequence request";
|
||||
JLOG(p_journal_.debug()) << "getLedger: Early ledger sequence request " << mHash;
|
||||
}
|
||||
else
|
||||
{
|
||||
ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
|
||||
if (!ledger)
|
||||
{
|
||||
JLOG(p_journal_.debug()) << "getLedger: Don't have ledger with sequence " << m->ledgerseq();
|
||||
JLOG(p_journal_.debug()) << "getLedger: Don't have ledger with sequence " << m->ledgerseq() << ": "
|
||||
<< mHash;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2994,27 +3225,27 @@ PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
|
||||
charge(Resource::feeMalformedRequest, "get_ledger ledgerSeq");
|
||||
|
||||
ledger.reset();
|
||||
JLOG(p_journal_.warn()) << "getLedger: Invalid ledger sequence " << ledgerSeq;
|
||||
JLOG(p_journal_.warn()) << "getLedger: Invalid ledger sequence " << ledgerSeq << ": " << mHash;
|
||||
}
|
||||
}
|
||||
else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
|
||||
{
|
||||
ledger.reset();
|
||||
JLOG(p_journal_.debug()) << "getLedger: Early ledger sequence request " << ledgerSeq;
|
||||
JLOG(p_journal_.debug()) << "getLedger: Early ledger sequence request " << ledgerSeq << ": " << mHash;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
JLOG(p_journal_.debug()) << "getLedger: Unable to find ledger";
|
||||
JLOG(p_journal_.debug()) << "getLedger: Unable to find ledger " << mHash;
|
||||
}
|
||||
|
||||
return ledger;
|
||||
}
|
||||
|
||||
std::shared_ptr<SHAMap const>
|
||||
PeerImp::getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const
|
||||
PeerImp::getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m, uint256 const& mHash) const
|
||||
{
|
||||
JLOG(p_journal_.trace()) << "getTxSet: TX set";
|
||||
JLOG(p_journal_.trace()) << "getTxSet: TX set " << mHash;
|
||||
|
||||
uint256 const txSetHash{m->ledgerhash()};
|
||||
std::shared_ptr<SHAMap> shaMap{app_.getInboundTransactions().getSet(txSetHash, false)};
|
||||
@@ -3023,20 +3254,27 @@ PeerImp::getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const
|
||||
if (m->has_querytype() && !m->has_requestcookie())
|
||||
{
|
||||
// Attempt to relay the request to a peer
|
||||
if (auto const peer = getPeerWithTree(overlay_, txSetHash, this))
|
||||
// Note repeated messages will not relay to the same peer
|
||||
// before `getLedgerInterval` seconds. This prevents one
|
||||
// peer from getting flooded, and distributes the request
|
||||
// load. If a request has been relayed to all eligible
|
||||
// peers, then this message will not be relayed.
|
||||
if (auto const peer = getPeerWithTree(overlay_, txSetHash, this, [&](Peer::id_t id) {
|
||||
return app_.getHashRouter().shouldProcessForPeer(mHash, id, getledgerInterval);
|
||||
}))
|
||||
{
|
||||
m->set_requestcookie(id());
|
||||
peer->send(std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
|
||||
JLOG(p_journal_.debug()) << "getTxSet: Request relayed";
|
||||
JLOG(p_journal_.debug()) << "getTxSet: Request relayed to peer [" << peer->id() << "]: " << mHash;
|
||||
}
|
||||
else
|
||||
{
|
||||
JLOG(p_journal_.debug()) << "getTxSet: Failed to find relay peer";
|
||||
JLOG(p_journal_.debug()) << "getTxSet: Failed to find relay peer: " << mHash;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
JLOG(p_journal_.debug()) << "getTxSet: Failed to find TX set";
|
||||
JLOG(p_journal_.debug()) << "getTxSet: Failed to find TX set " << mHash;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3044,7 +3282,7 @@ PeerImp::getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
|
||||
PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m, uint256 const& mHash)
|
||||
{
|
||||
// Do not resource charge a peer responding to a relay
|
||||
if (!m->has_requestcookie())
|
||||
@@ -3057,9 +3295,71 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
|
||||
bool fatLeaves{true};
|
||||
auto const itype{m->itype()};
|
||||
|
||||
auto getDestinations = [&] {
|
||||
// If a ledger data message is generated, it's going to be sent to every
|
||||
// peer that is waiting for it.
|
||||
|
||||
PeerCookieMap result;
|
||||
|
||||
std::size_t numCookies = 0;
|
||||
{
|
||||
// Don't do the work under this peer if this peer is not waiting for
|
||||
// any replies
|
||||
auto myCookies = releaseRequestCookies(mHash);
|
||||
if (myCookies.empty())
|
||||
{
|
||||
JLOG(p_journal_.debug()) << "TMGetLedger: peer is no longer "
|
||||
"waiting for response to request: "
|
||||
<< mHash;
|
||||
return result;
|
||||
}
|
||||
numCookies += myCookies.size();
|
||||
result[shared_from_this()] = myCookies;
|
||||
}
|
||||
|
||||
std::set<HashRouter::PeerShortID> const peers = app_.getHashRouter().getPeers(mHash);
|
||||
for (auto const peerID : peers)
|
||||
{
|
||||
// This loop does not need to be done under the HashRouter
|
||||
// lock because findPeerByShortID and releaseRequestCookies
|
||||
// are thread safe, and everything else is local
|
||||
if (auto p = overlay_.findPeerByShortID(peerID))
|
||||
{
|
||||
auto cookies = p->releaseRequestCookies(mHash);
|
||||
numCookies += cookies.size();
|
||||
if (result.contains(p))
|
||||
{
|
||||
// Unlikely, but if a request came in to this peer while
|
||||
// iterating, add the items instead of copying /
|
||||
// overwriting.
|
||||
XRPL_ASSERT(
|
||||
p.get() == this,
|
||||
"ripple::PeerImp::processLedgerRequest : found self in "
|
||||
"map");
|
||||
for (auto const& cookie : cookies)
|
||||
result[p].emplace(cookie);
|
||||
}
|
||||
else if (cookies.size())
|
||||
result[p] = cookies;
|
||||
}
|
||||
}
|
||||
|
||||
JLOG(p_journal_.debug()) << "TMGetLedger: Processing request for " << result.size() << " peers. Will send "
|
||||
<< numCookies << " messages if successful: " << mHash;
|
||||
|
||||
return result;
|
||||
};
|
||||
// Will only populate this if we're going to do work.
|
||||
PeerCookieMap destinations;
|
||||
|
||||
if (itype == protocol::liTS_CANDIDATE)
|
||||
{
|
||||
if (sharedMap = getTxSet(m); !sharedMap)
|
||||
destinations = getDestinations();
|
||||
if (destinations.empty())
|
||||
// Nowhere to send the response!
|
||||
return;
|
||||
|
||||
if (sharedMap = getTxSet(m, mHash); !sharedMap)
|
||||
return;
|
||||
map = sharedMap.get();
|
||||
|
||||
@@ -3067,8 +3367,6 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
|
||||
ledgerData.set_ledgerseq(0);
|
||||
ledgerData.set_ledgerhash(m->ledgerhash());
|
||||
ledgerData.set_type(protocol::liTS_CANDIDATE);
|
||||
if (m->has_requestcookie())
|
||||
ledgerData.set_requestcookie(m->requestcookie());
|
||||
|
||||
// We'll already have most transactions
|
||||
fatLeaves = false;
|
||||
@@ -3086,7 +3384,12 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
|
||||
return;
|
||||
}
|
||||
|
||||
if (ledger = getLedger(m); !ledger)
|
||||
destinations = getDestinations();
|
||||
if (destinations.empty())
|
||||
// Nowhere to send the response!
|
||||
return;
|
||||
|
||||
if (ledger = getLedger(m, mHash); !ledger)
|
||||
return;
|
||||
|
||||
// Fill out the reply
|
||||
@@ -3094,13 +3397,11 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
|
||||
ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
|
||||
ledgerData.set_ledgerseq(ledger->header().seq);
|
||||
ledgerData.set_type(itype);
|
||||
if (m->has_requestcookie())
|
||||
ledgerData.set_requestcookie(m->requestcookie());
|
||||
|
||||
switch (itype)
|
||||
{
|
||||
case protocol::liBASE:
|
||||
sendLedgerBase(ledger, ledgerData);
|
||||
sendLedgerBase(ledger, ledgerData, destinations);
|
||||
return;
|
||||
|
||||
case protocol::liTX_NODE:
|
||||
@@ -3203,7 +3504,7 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
|
||||
if (ledgerData.nodes_size() == 0)
|
||||
return;
|
||||
|
||||
send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
|
||||
sendToMultiple(ledgerData, destinations);
|
||||
}
|
||||
|
||||
int
|
||||
@@ -3251,6 +3552,19 @@ PeerImp::isHighLatency() const
|
||||
return latency_ >= peerHighLatency;
|
||||
}
|
||||
|
||||
std::set<std::optional<uint64_t>>
|
||||
PeerImp::releaseRequestCookies(uint256 const& requestHash)
|
||||
{
|
||||
std::set<std::optional<uint64_t>> result;
|
||||
std::lock_guard lock(cookieLock_);
|
||||
if (messageRequestCookies_.contains(requestHash))
|
||||
{
|
||||
std::swap(result, messageRequestCookies_[requestHash]);
|
||||
messageRequestCookies_.erase(requestHash);
|
||||
}
|
||||
return result;
|
||||
};
|
||||
|
||||
void
|
||||
PeerImp::Metrics::add_message(std::uint64_t bytes)
|
||||
{
|
||||
|
||||
@@ -248,6 +248,13 @@ private:
|
||||
bool ledgerReplayEnabled_ = false;
|
||||
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
|
||||
|
||||
// Track message requests and responses
|
||||
// TODO: Use an expiring cache or something
|
||||
using MessageCookieMap = std::map<uint256, std::set<std::optional<uint64_t>>>;
|
||||
using PeerCookieMap = std::map<std::shared_ptr<Peer>, std::set<std::optional<uint64_t>>>;
|
||||
std::mutex mutable cookieLock_;
|
||||
MessageCookieMap messageRequestCookies_;
|
||||
|
||||
friend class OverlayImpl;
|
||||
|
||||
class Metrics
|
||||
@@ -489,6 +496,13 @@ public:
|
||||
return txReduceRelayEnabled_;
|
||||
}
|
||||
|
||||
//
|
||||
// Messages
|
||||
//
|
||||
|
||||
std::set<std::optional<uint64_t>>
|
||||
releaseRequestCookies(uint256 const& requestHash) override;
|
||||
|
||||
private:
|
||||
/**
|
||||
* @brief Handles a failure associated with a specific error code.
|
||||
@@ -774,16 +788,22 @@ private:
|
||||
std::shared_ptr<protocol::TMValidation> const& packet);
|
||||
|
||||
void
|
||||
sendLedgerBase(std::shared_ptr<Ledger const> const& ledger, protocol::TMLedgerData& ledgerData);
|
||||
|
||||
std::shared_ptr<Ledger const>
|
||||
getLedger(std::shared_ptr<protocol::TMGetLedger> const& m);
|
||||
|
||||
std::shared_ptr<SHAMap const>
|
||||
getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const;
|
||||
sendLedgerBase(
|
||||
std::shared_ptr<Ledger const> const& ledger,
|
||||
protocol::TMLedgerData& ledgerData,
|
||||
PeerCookieMap const& destinations);
|
||||
|
||||
void
|
||||
processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m);
|
||||
sendToMultiple(protocol::TMLedgerData& ledgerData, PeerCookieMap const& destinations);
|
||||
|
||||
std::shared_ptr<Ledger const>
|
||||
getLedger(std::shared_ptr<protocol::TMGetLedger> const& m, uint256 const& mHash);
|
||||
|
||||
std::shared_ptr<SHAMap const>
|
||||
getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m, uint256 const& mHash) const;
|
||||
|
||||
void
|
||||
processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m, uint256 const& mHash);
|
||||
};
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
#include <xrpld/app/main/Application.h>
|
||||
#include <xrpld/app/misc/HashRouter.h>
|
||||
#include <xrpld/core/JobQueue.h>
|
||||
#include <xrpld/overlay/Overlay.h>
|
||||
#include <xrpld/overlay/PeerSet.h>
|
||||
|
||||
#include <xrpl/core/JobQueue.h>
|
||||
#include <xrpl/protocol/digest.h>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
@@ -81,16 +84,44 @@ PeerSetImpl::sendRequest(
|
||||
std::shared_ptr<Peer> const& peer)
|
||||
{
|
||||
auto packet = std::make_shared<Message>(message, type);
|
||||
|
||||
auto const messageHash = [&]() {
|
||||
auto const packetBuffer = packet->getBuffer(compression::Compressed::Off);
|
||||
return sha512Half(Slice(packetBuffer.data(), packetBuffer.size()));
|
||||
}();
|
||||
|
||||
// Allow messages to be re-sent to the same peer after a delay
|
||||
using namespace std::chrono_literals;
|
||||
constexpr std::chrono::seconds interval = 30s;
|
||||
|
||||
if (peer)
|
||||
{
|
||||
peer->send(packet);
|
||||
if (app_.getHashRouter().shouldProcessForPeer(messageHash, peer->id(), interval))
|
||||
{
|
||||
JLOG(journal_.trace()) << "Sending " << protocolMessageName(type) << " message to [" << peer->id()
|
||||
<< "]: " << messageHash;
|
||||
peer->send(packet);
|
||||
}
|
||||
else
|
||||
JLOG(journal_.debug()) << "Suppressing sending duplicate " << protocolMessageName(type) << " message to ["
|
||||
<< peer->id() << "]: " << messageHash;
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto id : peers_)
|
||||
{
|
||||
if (auto p = app_.overlay().findPeerByShortID(id))
|
||||
p->send(packet);
|
||||
{
|
||||
if (app_.getHashRouter().shouldProcessForPeer(messageHash, p->id(), interval))
|
||||
{
|
||||
JLOG(journal_.trace()) << "Sending " << protocolMessageName(type) << " message to [" << p->id()
|
||||
<< "]: " << messageHash;
|
||||
p->send(packet);
|
||||
}
|
||||
else
|
||||
JLOG(journal_.debug()) << "Suppressing sending duplicate " << protocolMessageName(type)
|
||||
<< " message to [" << p->id() << "]: " << messageHash;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -24,6 +24,12 @@ protocolMessageType(protocol::TMGetLedger const&)
|
||||
return protocol::mtGET_LEDGER;
|
||||
}
|
||||
|
||||
inline protocol::MessageType
|
||||
protocolMessageType(protocol::TMLedgerData const&)
|
||||
{
|
||||
return protocol::mtLEDGER_DATA;
|
||||
}
|
||||
|
||||
inline protocol::MessageType
|
||||
protocolMessageType(protocol::TMReplayDeltaRequest const&)
|
||||
{
|
||||
@@ -425,4 +431,64 @@ invokeProtocolMessage(Buffers const& buffers, Handler& handler, std::size_t& hin
|
||||
|
||||
} // namespace xrpl
|
||||
|
||||
namespace protocol {
|
||||
|
||||
template <class Hasher>
|
||||
void
|
||||
hash_append(Hasher& h, TMGetLedger const& msg)
|
||||
{
|
||||
using beast::hash_append;
|
||||
using namespace ripple;
|
||||
hash_append(h, safe_cast<int>(protocolMessageType(msg)));
|
||||
hash_append(h, safe_cast<int>(msg.itype()));
|
||||
if (msg.has_ltype())
|
||||
hash_append(h, safe_cast<int>(msg.ltype()));
|
||||
|
||||
if (msg.has_ledgerhash())
|
||||
hash_append(h, msg.ledgerhash());
|
||||
|
||||
if (msg.has_ledgerseq())
|
||||
hash_append(h, msg.ledgerseq());
|
||||
|
||||
for (auto const& nodeId : msg.nodeids())
|
||||
hash_append(h, nodeId);
|
||||
hash_append(h, msg.nodeids_size());
|
||||
|
||||
// Do NOT include the request cookie. It does not affect the content of the
|
||||
// request, but only where to route the results.
|
||||
// if (msg.has_requestcookie())
|
||||
// hash_append(h, msg.requestcookie());
|
||||
|
||||
if (msg.has_querytype())
|
||||
hash_append(h, safe_cast<int>(msg.querytype()));
|
||||
|
||||
if (msg.has_querydepth())
|
||||
hash_append(h, msg.querydepth());
|
||||
}
|
||||
|
||||
template <class Hasher>
|
||||
void
|
||||
hash_append(Hasher& h, TMLedgerData const& msg)
|
||||
{
|
||||
using beast::hash_append;
|
||||
using namespace ripple;
|
||||
hash_append(h, safe_cast<int>(protocolMessageType(msg)));
|
||||
hash_append(h, msg.ledgerhash());
|
||||
hash_append(h, msg.ledgerseq());
|
||||
hash_append(h, safe_cast<int>(msg.type()));
|
||||
for (auto const& node : msg.nodes())
|
||||
{
|
||||
hash_append(h, node.nodedata());
|
||||
if (node.has_nodeid())
|
||||
hash_append(h, node.nodeid());
|
||||
}
|
||||
hash_append(h, msg.nodes_size());
|
||||
if (msg.has_requestcookie())
|
||||
hash_append(h, msg.requestcookie());
|
||||
if (msg.has_error())
|
||||
hash_append(h, safe_cast<int>(msg.error()));
|
||||
}
|
||||
|
||||
} // namespace protocol
|
||||
|
||||
#endif
|
||||
|
||||
@@ -21,7 +21,9 @@ namespace xrpl {
|
||||
constexpr ProtocolVersion const supportedProtocolList[]
|
||||
{
|
||||
{2, 1},
|
||||
{2, 2}
|
||||
{2, 2},
|
||||
// Adds TMLedgerData::responseCookies and directResponse
|
||||
{2, 3}
|
||||
};
|
||||
// clang-format on
|
||||
|
||||
|
||||
Reference in New Issue
Block a user