mirror of
https://github.com/Xahau/xahaud.git
synced 2025-12-06 17:27:52 +00:00
Improve ledger-fetching logic:
When fetching ledgers, the existing code would isolate the peer that sent the most useful responses and issue follow up queries only to that peer. This commit increases the query aggressiveness, and changes the mechanism used to select which peers to issue follow-up queries to so as to more evenly spread the load along those peers which provided useful responses.
This commit is contained in:
@@ -39,9 +39,6 @@ class InboundLedger final : public TimeoutCounter,
|
|||||||
public:
|
public:
|
||||||
using clock_type = beast::abstract_clock<std::chrono::steady_clock>;
|
using clock_type = beast::abstract_clock<std::chrono::steady_clock>;
|
||||||
|
|
||||||
using PeerDataPairType =
|
|
||||||
std::pair<std::weak_ptr<Peer>, std::shared_ptr<protocol::TMLedgerData>>;
|
|
||||||
|
|
||||||
// These are the reasons we might acquire a ledger
|
// These are the reasons we might acquire a ledger
|
||||||
enum class Reason {
|
enum class Reason {
|
||||||
HISTORY, // Acquiring past ledger
|
HISTORY, // Acquiring past ledger
|
||||||
@@ -193,7 +190,9 @@ private:
|
|||||||
|
|
||||||
// Data we have received from peers
|
// Data we have received from peers
|
||||||
std::mutex mReceivedDataLock;
|
std::mutex mReceivedDataLock;
|
||||||
std::vector<PeerDataPairType> mReceivedData;
|
std::vector<
|
||||||
|
std::pair<std::weak_ptr<Peer>, std::shared_ptr<protocol::TMLedgerData>>>
|
||||||
|
mReceivedData;
|
||||||
bool mReceiveDispatched;
|
bool mReceiveDispatched;
|
||||||
std::unique_ptr<PeerSet> mPeerSet;
|
std::unique_ptr<PeerSet> mPeerSet;
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -33,7 +33,10 @@
|
|||||||
#include <ripple/resource/Fees.h>
|
#include <ripple/resource/Fees.h>
|
||||||
#include <ripple/shamap/SHAMapNodeID.h>
|
#include <ripple/shamap/SHAMapNodeID.h>
|
||||||
|
|
||||||
|
#include <boost/iterator/function_output_iterator.hpp>
|
||||||
|
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
#include <random>
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
@@ -57,15 +60,15 @@ enum {
|
|||||||
|
|
||||||
// Number of nodes to find initially
|
// Number of nodes to find initially
|
||||||
,
|
,
|
||||||
missingNodesFind = 256
|
missingNodesFind = 512
|
||||||
|
|
||||||
// Number of nodes to request for a reply
|
// Number of nodes to request for a reply
|
||||||
,
|
,
|
||||||
reqNodesReply = 128
|
reqNodesReply = 256
|
||||||
|
|
||||||
// Number of nodes to request blindly
|
// Number of nodes to request blindly
|
||||||
,
|
,
|
||||||
reqNodes = 8
|
reqNodes = 12
|
||||||
};
|
};
|
||||||
|
|
||||||
// millisecond for each ledger timeout
|
// millisecond for each ledger timeout
|
||||||
@@ -601,7 +604,7 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
|
|||||||
tmBH.set_ledgerhash(hash_.begin(), hash_.size());
|
tmBH.set_ledgerhash(hash_.begin(), hash_.size());
|
||||||
for (auto const& p : need)
|
for (auto const& p : need)
|
||||||
{
|
{
|
||||||
JLOG(journal_.warn()) << "Want: " << p.second;
|
JLOG(journal_.debug()) << "Want: " << p.second;
|
||||||
|
|
||||||
if (!typeSet)
|
if (!typeSet)
|
||||||
{
|
{
|
||||||
@@ -661,15 +664,15 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
|
|||||||
if (reason != TriggerReason::reply)
|
if (reason != TriggerReason::reply)
|
||||||
{
|
{
|
||||||
// If we're querying blind, don't query deep
|
// If we're querying blind, don't query deep
|
||||||
tmGL.set_querydepth(0);
|
tmGL.set_querydepth(1);
|
||||||
}
|
}
|
||||||
else if (peer && peer->isHighLatency())
|
else if (peer && peer->isHighLatency())
|
||||||
{
|
{
|
||||||
// If the peer has high latency, query extra deep
|
// If the peer has high latency, query extra deep
|
||||||
tmGL.set_querydepth(2);
|
tmGL.set_querydepth(3);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
tmGL.set_querydepth(1);
|
tmGL.set_querydepth(2);
|
||||||
|
|
||||||
// Get the state data first because it's the most likely to be useful
|
// Get the state data first because it's the most likely to be useful
|
||||||
// if we wind up abandoning this fetch.
|
// if we wind up abandoning this fetch.
|
||||||
@@ -952,22 +955,23 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
auto const f = filter.get();
|
||||||
|
|
||||||
for (auto const& node : packet.nodes())
|
for (auto const& node : packet.nodes())
|
||||||
{
|
{
|
||||||
auto const nodeID = deserializeSHAMapNodeID(node.nodeid());
|
auto const nodeID = deserializeSHAMapNodeID(node.nodeid());
|
||||||
|
|
||||||
if (!nodeID)
|
if (!nodeID)
|
||||||
{
|
throw std::runtime_error("data does not properly deserialize");
|
||||||
san.incInvalid();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nodeID->isRoot())
|
if (nodeID->isRoot())
|
||||||
san += map.addRootNode(
|
{
|
||||||
rootHash, makeSlice(node.nodedata()), filter.get());
|
san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
san += map.addKnownNode(
|
{
|
||||||
*nodeID, makeSlice(node.nodedata()), filter.get());
|
san += map.addKnownNode(*nodeID, makeSlice(node.nodedata()), f);
|
||||||
|
}
|
||||||
|
|
||||||
if (!san.isGood())
|
if (!san.isGood())
|
||||||
{
|
{
|
||||||
@@ -1120,19 +1124,19 @@ InboundLedger::processData(
|
|||||||
std::shared_ptr<Peer> peer,
|
std::shared_ptr<Peer> peer,
|
||||||
protocol::TMLedgerData& packet)
|
protocol::TMLedgerData& packet)
|
||||||
{
|
{
|
||||||
ScopedLockType sl(mtx_);
|
|
||||||
|
|
||||||
if (packet.type() == protocol::liBASE)
|
if (packet.type() == protocol::liBASE)
|
||||||
{
|
{
|
||||||
if (packet.nodes_size() < 1)
|
if (packet.nodes().empty())
|
||||||
{
|
{
|
||||||
JLOG(journal_.warn()) << "Got empty header data";
|
JLOG(journal_.warn()) << peer->id() << ": empty header data";
|
||||||
peer->charge(Resource::feeInvalidRequest);
|
peer->charge(Resource::feeInvalidRequest);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SHAMapAddNode san;
|
SHAMapAddNode san;
|
||||||
|
|
||||||
|
ScopedLockType sl(mtx_);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (!mHaveHeader)
|
if (!mHaveHeader)
|
||||||
@@ -1177,13 +1181,18 @@ InboundLedger::processData(
|
|||||||
if ((packet.type() == protocol::liTX_NODE) ||
|
if ((packet.type() == protocol::liTX_NODE) ||
|
||||||
(packet.type() == protocol::liAS_NODE))
|
(packet.type() == protocol::liAS_NODE))
|
||||||
{
|
{
|
||||||
if (packet.nodes().size() == 0)
|
std::string type = packet.type() == protocol::liTX_NODE ? "liTX_NODE: "
|
||||||
|
: "liAS_NODE: ";
|
||||||
|
|
||||||
|
if (packet.nodes().empty())
|
||||||
{
|
{
|
||||||
JLOG(journal_.info()) << "Got response with no nodes";
|
JLOG(journal_.info()) << peer->id() << ": response with no nodes";
|
||||||
peer->charge(Resource::feeInvalidRequest);
|
peer->charge(Resource::feeInvalidRequest);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ScopedLockType sl(mtx_);
|
||||||
|
|
||||||
// Verify node IDs and data are complete
|
// Verify node IDs and data are complete
|
||||||
for (auto const& node : packet.nodes())
|
for (auto const& node : packet.nodes())
|
||||||
{
|
{
|
||||||
@@ -1198,14 +1207,10 @@ InboundLedger::processData(
|
|||||||
SHAMapAddNode san;
|
SHAMapAddNode san;
|
||||||
receiveNode(packet, san);
|
receiveNode(packet, san);
|
||||||
|
|
||||||
if (packet.type() == protocol::liTX_NODE)
|
JLOG(journal_.debug())
|
||||||
{
|
<< "Ledger "
|
||||||
JLOG(journal_.debug()) << "Ledger TX node stats: " << san.get();
|
<< ((packet.type() == protocol::liTX_NODE) ? "TX" : "AS")
|
||||||
}
|
<< " node stats: " << san.get();
|
||||||
else
|
|
||||||
{
|
|
||||||
JLOG(journal_.debug()) << "Ledger AS node stats: " << san.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (san.isUseful())
|
if (san.isUseful())
|
||||||
progress_ = true;
|
progress_ = true;
|
||||||
@@ -1217,20 +1222,89 @@ InboundLedger::processData(
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
namespace detail {
|
||||||
|
// Track the amount of useful data that each peer returns
|
||||||
|
struct PeerDataCounts
|
||||||
|
{
|
||||||
|
// Map from peer to amount of useful the peer returned
|
||||||
|
std::unordered_map<std::shared_ptr<Peer>, int> counts;
|
||||||
|
// The largest amount of useful data that any peer returned
|
||||||
|
int maxCount = 0;
|
||||||
|
|
||||||
|
// Update the data count for a peer
|
||||||
|
void
|
||||||
|
update(std::shared_ptr<Peer>&& peer, int dataCount)
|
||||||
|
{
|
||||||
|
if (dataCount <= 0)
|
||||||
|
return;
|
||||||
|
maxCount = std::max(maxCount, dataCount);
|
||||||
|
auto i = counts.find(peer);
|
||||||
|
if (i == counts.end())
|
||||||
|
{
|
||||||
|
counts.emplace(std::move(peer), dataCount);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
i->second = std::max(i->second, dataCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prune all the peers that didn't return enough data.
|
||||||
|
void
|
||||||
|
prune()
|
||||||
|
{
|
||||||
|
// Remove all the peers that didn't return at least half as much data as
|
||||||
|
// the best peer
|
||||||
|
auto const thresh = maxCount / 2;
|
||||||
|
auto i = counts.begin();
|
||||||
|
while (i != counts.end())
|
||||||
|
{
|
||||||
|
if (i->second < thresh)
|
||||||
|
i = counts.erase(i);
|
||||||
|
else
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// call F with the `peer` parameter with a random sample of at most n values
|
||||||
|
// of the counts vector.
|
||||||
|
template <class F>
|
||||||
|
void
|
||||||
|
sampleN(std::size_t n, F&& f)
|
||||||
|
{
|
||||||
|
if (counts.empty())
|
||||||
|
return;
|
||||||
|
|
||||||
|
std::minstd_rand rng{std::random_device{}()};
|
||||||
|
std::sample(
|
||||||
|
counts.begin(),
|
||||||
|
counts.end(),
|
||||||
|
boost::make_function_output_iterator(
|
||||||
|
[&f](auto&& v) { f(v.first); }),
|
||||||
|
n,
|
||||||
|
rng);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} // namespace detail
|
||||||
|
|
||||||
/** Process pending TMLedgerData
|
/** Process pending TMLedgerData
|
||||||
Query the 'best' peer
|
Query the a random sample of the 'best' peers
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
InboundLedger::runData()
|
InboundLedger::runData()
|
||||||
{
|
{
|
||||||
std::shared_ptr<Peer> chosenPeer;
|
// Maximum number of peers to request data from
|
||||||
int chosenPeerCount = -1;
|
constexpr std::size_t maxUsefulPeers = 6;
|
||||||
|
|
||||||
std::vector<PeerDataPairType> data;
|
decltype(mReceivedData) data;
|
||||||
|
|
||||||
|
// Reserve some memory so the first couple iterations don't reallocate
|
||||||
|
data.reserve(8);
|
||||||
|
|
||||||
|
detail::PeerDataCounts dataCounts;
|
||||||
|
|
||||||
for (;;)
|
for (;;)
|
||||||
{
|
{
|
||||||
data.clear();
|
data.clear();
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard sl(mReceivedDataLock);
|
std::lock_guard sl(mReceivedDataLock);
|
||||||
|
|
||||||
@@ -1243,24 +1317,22 @@ InboundLedger::runData()
|
|||||||
data.swap(mReceivedData);
|
data.swap(mReceivedData);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Select the peer that gives us the most nodes that are useful,
|
|
||||||
// breaking ties in favor of the peer that responded first.
|
|
||||||
for (auto& entry : data)
|
for (auto& entry : data)
|
||||||
{
|
{
|
||||||
if (auto peer = entry.first.lock())
|
if (auto peer = entry.first.lock())
|
||||||
{
|
{
|
||||||
int count = processData(peer, *(entry.second));
|
int count = processData(peer, *(entry.second));
|
||||||
if (count > chosenPeerCount)
|
dataCounts.update(std::move(peer), count);
|
||||||
{
|
|
||||||
chosenPeerCount = count;
|
|
||||||
chosenPeer = std::move(peer);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (chosenPeer)
|
// Select a random sample of the peers that gives us the most nodes that are
|
||||||
trigger(chosenPeer, TriggerReason::reply);
|
// useful
|
||||||
|
dataCounts.prune();
|
||||||
|
dataCounts.sampleN(maxUsefulPeers, [&](std::shared_ptr<Peer> const& peer) {
|
||||||
|
trigger(peer, TriggerReason::reply);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
Json::Value
|
Json::Value
|
||||||
|
|||||||
@@ -74,6 +74,12 @@ public:
|
|||||||
reason != InboundLedger::Reason::SHARD ||
|
reason != InboundLedger::Reason::SHARD ||
|
||||||
(seq != 0 && app_.getShardStore()));
|
(seq != 0 && app_.getShardStore()));
|
||||||
|
|
||||||
|
// probably not the right rule
|
||||||
|
if (app_.getOPs().isNeedNetworkLedger() &&
|
||||||
|
(reason != InboundLedger::Reason::GENERIC) &&
|
||||||
|
(reason != InboundLedger::Reason::CONSENSUS))
|
||||||
|
return {};
|
||||||
|
|
||||||
bool isNew = true;
|
bool isNew = true;
|
||||||
std::shared_ptr<InboundLedger> inbound;
|
std::shared_ptr<InboundLedger> inbound;
|
||||||
{
|
{
|
||||||
@@ -82,6 +88,7 @@ public:
|
|||||||
{
|
{
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
auto it = mLedgers.find(hash);
|
auto it = mLedgers.find(hash);
|
||||||
if (it != mLedgers.end())
|
if (it != mLedgers.end())
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -60,11 +60,11 @@ enum JobType {
|
|||||||
jtLEDGER_REQ, // Peer request ledger/txnset data
|
jtLEDGER_REQ, // Peer request ledger/txnset data
|
||||||
jtPROPOSAL_ut, // A proposal from an untrusted source
|
jtPROPOSAL_ut, // A proposal from an untrusted source
|
||||||
jtREPLAY_TASK, // A Ledger replay task/subtask
|
jtREPLAY_TASK, // A Ledger replay task/subtask
|
||||||
jtLEDGER_DATA, // Received data for a ledger we're acquiring
|
|
||||||
jtTRANSACTION, // A transaction received from the network
|
jtTRANSACTION, // A transaction received from the network
|
||||||
jtMISSING_TXN, // Request missing transactions
|
jtMISSING_TXN, // Request missing transactions
|
||||||
jtREQUESTED_TXN, // Reply with requested transactions
|
jtREQUESTED_TXN, // Reply with requested transactions
|
||||||
jtBATCH, // Apply batched transactions
|
jtBATCH, // Apply batched transactions
|
||||||
|
jtLEDGER_DATA, // Received data for a ledger we're acquiring
|
||||||
jtADVANCE, // Advance validated/acquired ledgers
|
jtADVANCE, // Advance validated/acquired ledgers
|
||||||
jtPUBLEDGER, // Publish a fully-accepted ledger
|
jtPUBLEDGER, // Publish a fully-accepted ledger
|
||||||
jtTXN_DATA, // Fetch a proposed set
|
jtTXN_DATA, // Fetch a proposed set
|
||||||
|
|||||||
@@ -1339,7 +1339,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMPeerShardInfoV2> const& m)
|
|||||||
// case ShardState::finalized:
|
// case ShardState::finalized:
|
||||||
default:
|
default:
|
||||||
return badData("Invalid incomplete shard state");
|
return badData("Invalid incomplete shard state");
|
||||||
};
|
}
|
||||||
s.add32(incomplete.state());
|
s.add32(incomplete.state());
|
||||||
|
|
||||||
// Verify progress
|
// Verify progress
|
||||||
@@ -3523,8 +3523,8 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
|
|||||||
{
|
{
|
||||||
auto const queryDepth{
|
auto const queryDepth{
|
||||||
m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)};
|
m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)};
|
||||||
std::vector<SHAMapNodeID> nodeIds;
|
|
||||||
std::vector<Blob> rawNodes;
|
std::vector<std::pair<SHAMapNodeID, Blob>> data;
|
||||||
|
|
||||||
for (int i = 0; i < m->nodeids_size() &&
|
for (int i = 0; i < m->nodeids_size() &&
|
||||||
ledgerData.nodes_size() < Tuning::softMaxReplyNodes;
|
ledgerData.nodes_size() < Tuning::softMaxReplyNodes;
|
||||||
@@ -3532,30 +3532,22 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
|
|||||||
{
|
{
|
||||||
auto const shaMapNodeId{deserializeSHAMapNodeID(m->nodeids(i))};
|
auto const shaMapNodeId{deserializeSHAMapNodeID(m->nodeids(i))};
|
||||||
|
|
||||||
nodeIds.clear();
|
data.clear();
|
||||||
rawNodes.clear();
|
data.reserve(Tuning::softMaxReplyNodes);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (map->getNodeFat(
|
if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
|
||||||
*shaMapNodeId,
|
|
||||||
nodeIds,
|
|
||||||
rawNodes,
|
|
||||||
fatLeaves,
|
|
||||||
queryDepth))
|
|
||||||
{
|
{
|
||||||
assert(nodeIds.size() == rawNodes.size());
|
|
||||||
JLOG(p_journal_.trace())
|
JLOG(p_journal_.trace())
|
||||||
<< "processLedgerRequest: getNodeFat got "
|
<< "processLedgerRequest: getNodeFat got "
|
||||||
<< rawNodes.size() << " nodes";
|
<< data.size() << " nodes";
|
||||||
|
|
||||||
auto rawNodeIter{rawNodes.begin()};
|
for (auto const& d : data)
|
||||||
for (auto const& nodeId : nodeIds)
|
|
||||||
{
|
{
|
||||||
protocol::TMLedgerNode* node{ledgerData.add_nodes()};
|
protocol::TMLedgerNode* node{ledgerData.add_nodes()};
|
||||||
node->set_nodeid(nodeId.getRawString());
|
node->set_nodeid(d.first.getRawString());
|
||||||
node->set_nodedata(
|
node->set_nodedata(d.second.data(), d.second.size());
|
||||||
&rawNodeIter->front(), rawNodeIter->size());
|
|
||||||
++rawNodeIter;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@@ -3607,9 +3599,7 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
|
|||||||
<< ledgerData.nodes_size() << " nodes";
|
<< ledgerData.nodes_size() << " nodes";
|
||||||
}
|
}
|
||||||
|
|
||||||
auto message{
|
send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
|
||||||
std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
|
|
||||||
send(message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
|
|||||||
@@ -238,7 +238,7 @@ public:
|
|||||||
void
|
void
|
||||||
visitDifferences(
|
visitDifferences(
|
||||||
SHAMap const* have,
|
SHAMap const* have,
|
||||||
std::function<bool(SHAMapTreeNode const&)>) const;
|
std::function<bool(SHAMapTreeNode const&)> const&) const;
|
||||||
|
|
||||||
/** Visit every leaf node in this SHAMap
|
/** Visit every leaf node in this SHAMap
|
||||||
|
|
||||||
@@ -267,8 +267,7 @@ public:
|
|||||||
bool
|
bool
|
||||||
getNodeFat(
|
getNodeFat(
|
||||||
SHAMapNodeID const& wanted,
|
SHAMapNodeID const& wanted,
|
||||||
std::vector<SHAMapNodeID>& nodeIDs,
|
std::vector<std::pair<SHAMapNodeID, Blob>>& data,
|
||||||
std::vector<Blob>& rawNodes,
|
|
||||||
bool fatLeaves,
|
bool fatLeaves,
|
||||||
std::uint32_t depth) const;
|
std::uint32_t depth) const;
|
||||||
|
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ SHAMap::visitNodes(std::function<bool(SHAMapTreeNode&)> const& function) const
|
|||||||
auto node = std::static_pointer_cast<SHAMapInnerNode>(root_);
|
auto node = std::static_pointer_cast<SHAMapInnerNode>(root_);
|
||||||
int pos = 0;
|
int pos = 0;
|
||||||
|
|
||||||
while (1)
|
while (true)
|
||||||
{
|
{
|
||||||
while (pos < 16)
|
while (pos < 16)
|
||||||
{
|
{
|
||||||
@@ -99,7 +99,7 @@ SHAMap::visitNodes(std::function<bool(SHAMapTreeNode&)> const& function) const
|
|||||||
void
|
void
|
||||||
SHAMap::visitDifferences(
|
SHAMap::visitDifferences(
|
||||||
SHAMap const* have,
|
SHAMap const* have,
|
||||||
std::function<bool(SHAMapTreeNode const&)> function) const
|
std::function<bool(SHAMapTreeNode const&)> const& function) const
|
||||||
{
|
{
|
||||||
// Visit every node in this SHAMap that is not present
|
// Visit every node in this SHAMap that is not present
|
||||||
// in the specified SHAMap
|
// in the specified SHAMap
|
||||||
@@ -426,8 +426,7 @@ SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter)
|
|||||||
bool
|
bool
|
||||||
SHAMap::getNodeFat(
|
SHAMap::getNodeFat(
|
||||||
SHAMapNodeID const& wanted,
|
SHAMapNodeID const& wanted,
|
||||||
std::vector<SHAMapNodeID>& nodeIDs,
|
std::vector<std::pair<SHAMapNodeID, Blob>>& data,
|
||||||
std::vector<Blob>& rawNodes,
|
|
||||||
bool fatLeaves,
|
bool fatLeaves,
|
||||||
std::uint32_t depth) const
|
std::uint32_t depth) const
|
||||||
{
|
{
|
||||||
@@ -443,16 +442,15 @@ SHAMap::getNodeFat(
|
|||||||
auto inner = static_cast<SHAMapInnerNode*>(node);
|
auto inner = static_cast<SHAMapInnerNode*>(node);
|
||||||
if (inner->isEmptyBranch(branch))
|
if (inner->isEmptyBranch(branch))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
node = descendThrow(inner, branch);
|
node = descendThrow(inner, branch);
|
||||||
nodeID = nodeID.getChildNodeID(branch);
|
nodeID = nodeID.getChildNodeID(branch);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (node == nullptr || wanted != nodeID)
|
if (node == nullptr || wanted != nodeID)
|
||||||
{
|
{
|
||||||
JLOG(journal_.warn()) << "peer requested node that is not in the map:\n"
|
JLOG(journal_.info())
|
||||||
<< wanted << " but found\n"
|
<< "peer requested node that is not in the map: " << wanted
|
||||||
<< nodeID;
|
<< " but found " << nodeID;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -465,18 +463,17 @@ SHAMap::getNodeFat(
|
|||||||
std::stack<std::tuple<SHAMapTreeNode*, SHAMapNodeID, int>> stack;
|
std::stack<std::tuple<SHAMapTreeNode*, SHAMapNodeID, int>> stack;
|
||||||
stack.emplace(node, nodeID, depth);
|
stack.emplace(node, nodeID, depth);
|
||||||
|
|
||||||
|
Serializer s(8192);
|
||||||
|
|
||||||
while (!stack.empty())
|
while (!stack.empty())
|
||||||
{
|
{
|
||||||
std::tie(node, nodeID, depth) = stack.top();
|
std::tie(node, nodeID, depth) = stack.top();
|
||||||
stack.pop();
|
stack.pop();
|
||||||
|
|
||||||
{
|
// Add this node to the reply
|
||||||
// Add this node to the reply
|
s.erase();
|
||||||
Serializer s;
|
node->serializeForWire(s);
|
||||||
node->serializeForWire(s);
|
data.emplace_back(std::make_pair(nodeID, s.getData()));
|
||||||
nodeIDs.push_back(nodeID);
|
|
||||||
rawNodes.push_back(std::move(s.modData()));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (node->isInner())
|
if (node->isInner())
|
||||||
{
|
{
|
||||||
@@ -484,6 +481,7 @@ SHAMap::getNodeFat(
|
|||||||
// without decrementing the depth
|
// without decrementing the depth
|
||||||
auto inner = static_cast<SHAMapInnerNode*>(node);
|
auto inner = static_cast<SHAMapInnerNode*>(node);
|
||||||
int bc = inner->getBranchCount();
|
int bc = inner->getBranchCount();
|
||||||
|
|
||||||
if ((depth > 0) || (bc == 1))
|
if ((depth > 0) || (bc == 1))
|
||||||
{
|
{
|
||||||
// We need to process this node's children
|
// We need to process this node's children
|
||||||
@@ -492,7 +490,7 @@ SHAMap::getNodeFat(
|
|||||||
if (!inner->isEmptyBranch(i))
|
if (!inner->isEmptyBranch(i))
|
||||||
{
|
{
|
||||||
auto const childNode = descendThrow(inner, i);
|
auto const childNode = descendThrow(inner, i);
|
||||||
SHAMapNodeID const childID = nodeID.getChildNodeID(i);
|
auto const childID = nodeID.getChildNodeID(i);
|
||||||
|
|
||||||
if (childNode->isInner() && ((depth > 1) || (bc == 1)))
|
if (childNode->isInner() && ((depth > 1) || (bc == 1)))
|
||||||
{
|
{
|
||||||
@@ -506,10 +504,10 @@ SHAMap::getNodeFat(
|
|||||||
else if (childNode->isInner() || fatLeaves)
|
else if (childNode->isInner() || fatLeaves)
|
||||||
{
|
{
|
||||||
// Just include this node
|
// Just include this node
|
||||||
Serializer ns;
|
s.erase();
|
||||||
childNode->serializeForWire(ns);
|
childNode->serializeForWire(s);
|
||||||
nodeIDs.push_back(childID);
|
data.emplace_back(
|
||||||
rawNodes.push_back(std::move(ns.modData()));
|
std::make_pair(childID, s.getData()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -124,24 +124,18 @@ public:
|
|||||||
destination.setSynching();
|
destination.setSynching();
|
||||||
|
|
||||||
{
|
{
|
||||||
std::vector<SHAMapNodeID> gotNodeIDs_a;
|
std::vector<std::pair<SHAMapNodeID, Blob>> a;
|
||||||
std::vector<Blob> gotNodes_a;
|
|
||||||
|
|
||||||
BEAST_EXPECT(source.getNodeFat(
|
BEAST_EXPECT(source.getNodeFat(
|
||||||
SHAMapNodeID(),
|
SHAMapNodeID(), a, rand_bool(eng_), rand_int(eng_, 2)));
|
||||||
gotNodeIDs_a,
|
|
||||||
gotNodes_a,
|
|
||||||
rand_bool(eng_),
|
|
||||||
rand_int(eng_, 2)));
|
|
||||||
|
|
||||||
unexpected(gotNodes_a.size() < 1, "NodeSize");
|
unexpected(a.size() < 1, "NodeSize");
|
||||||
|
|
||||||
BEAST_EXPECT(destination
|
BEAST_EXPECT(
|
||||||
.addRootNode(
|
destination
|
||||||
source.getHash(),
|
.addRootNode(
|
||||||
makeSlice(*gotNodes_a.begin()),
|
source.getHash(), makeSlice(a[0].second), nullptr)
|
||||||
nullptr)
|
.isGood());
|
||||||
.isGood());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
do
|
do
|
||||||
@@ -155,8 +149,7 @@ public:
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
// get as many nodes as possible based on this information
|
// get as many nodes as possible based on this information
|
||||||
std::vector<SHAMapNodeID> gotNodeIDs_b;
|
std::vector<std::pair<SHAMapNodeID, Blob>> b;
|
||||||
std::vector<Blob> gotNodes_b;
|
|
||||||
|
|
||||||
for (auto& it : nodesMissing)
|
for (auto& it : nodesMissing)
|
||||||
{
|
{
|
||||||
@@ -164,29 +157,24 @@ public:
|
|||||||
// non-deterministic number of times and the number of tests run
|
// non-deterministic number of times and the number of tests run
|
||||||
// should be deterministic
|
// should be deterministic
|
||||||
if (!source.getNodeFat(
|
if (!source.getNodeFat(
|
||||||
it.first,
|
it.first, b, rand_bool(eng_), rand_int(eng_, 2)))
|
||||||
gotNodeIDs_b,
|
|
||||||
gotNodes_b,
|
|
||||||
rand_bool(eng_),
|
|
||||||
rand_int(eng_, 2)))
|
|
||||||
fail("", __FILE__, __LINE__);
|
fail("", __FILE__, __LINE__);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't use BEAST_EXPECT here b/c it will be called a
|
// Don't use BEAST_EXPECT here b/c it will be called a
|
||||||
// non-deterministic number of times and the number of tests run
|
// non-deterministic number of times and the number of tests run
|
||||||
// should be deterministic
|
// should be deterministic
|
||||||
if (gotNodeIDs_b.size() != gotNodes_b.size() ||
|
if (b.empty())
|
||||||
gotNodeIDs_b.empty())
|
|
||||||
fail("", __FILE__, __LINE__);
|
fail("", __FILE__, __LINE__);
|
||||||
|
|
||||||
for (std::size_t i = 0; i < gotNodeIDs_b.size(); ++i)
|
for (std::size_t i = 0; i < b.size(); ++i)
|
||||||
{
|
{
|
||||||
// Don't use BEAST_EXPECT here b/c it will be called a
|
// Don't use BEAST_EXPECT here b/c it will be called a
|
||||||
// non-deterministic number of times and the number of tests run
|
// non-deterministic number of times and the number of tests run
|
||||||
// should be deterministic
|
// should be deterministic
|
||||||
if (!destination
|
if (!destination
|
||||||
.addKnownNode(
|
.addKnownNode(
|
||||||
gotNodeIDs_b[i], makeSlice(gotNodes_b[i]), nullptr)
|
b[i].first, makeSlice(b[i].second), nullptr)
|
||||||
.isUseful())
|
.isUseful())
|
||||||
fail("", __FILE__, __LINE__);
|
fail("", __FILE__, __LINE__);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user