Implement ledger forward replay

This commit is contained in:
Peng Wang
2020-07-20 10:51:39 -04:00
committed by manojsdoshi
parent 49409dbf27
commit 7e97bfce10
58 changed files with 5243 additions and 429 deletions

View File

@@ -21,6 +21,7 @@
#define RIPPLE_APP_LEDGER_INBOUNDLEDGER_H_INCLUDED
#include <ripple/app/ledger/Ledger.h>
#include <ripple/app/ledger/impl/TimeoutCounter.h>
#include <ripple/app/main/Application.h>
#include <ripple/basics/CountedObject.h>
#include <ripple/overlay/PeerSet.h>
@@ -31,7 +32,7 @@
namespace ripple {
// A ledger we are trying to acquire
class InboundLedger final : public PeerSet,
class InboundLedger final : public TimeoutCounter,
public std::enable_shared_from_this<InboundLedger>,
public CountedObject<InboundLedger>
{
@@ -54,7 +55,8 @@ public:
uint256 const& hash,
std::uint32_t seq,
Reason reason,
clock_type&);
clock_type&,
std::unique_ptr<PeerSet> peerSet);
~InboundLedger();
@@ -66,14 +68,14 @@ public:
bool
isComplete() const
{
return mComplete;
return complete_;
}
/** Returns false if we failed to get the data. */
bool
isFailed() const
{
return mFailed;
return failed_;
}
std::shared_ptr<Ledger const>
@@ -146,22 +148,10 @@ private:
void
onTimer(bool progress, ScopedLockType& peerSetLock) override;
void
queueJob() override;
void
onPeerAdded(std::shared_ptr<Peer> const& peer) override
{
// For historical nodes, do not trigger too soon
// since a fetch pack is probably coming
if (mReason != Reason::HISTORY)
trigger(peer, TriggerReason::added);
}
std::size_t
getPeerCount() const;
std::weak_ptr<PeerSet>
std::weak_ptr<TimeoutCounter>
pmDowncast() override;
int
@@ -205,6 +195,7 @@ private:
std::mutex mReceivedDataLock;
std::vector<PeerDataPairType> mReceivedData;
bool mReceiveDispatched;
std::unique_ptr<PeerSet> mPeerSet;
};
/** Deserialize a ledger header from a byte array. */

View File

@@ -61,7 +61,7 @@ namespace ripple {
create_genesis_t const create_genesis{};
static uint256
uint256
calculateLedgerHash(LedgerInfo const& info)
{
// VFALCO This has to match addRaw in View.h.

View File

@@ -508,6 +508,9 @@ deserializeTx(SHAMapItem const& item);
std::pair<std::shared_ptr<STTx const>, std::shared_ptr<STObject const>>
deserializeTxPlusMeta(SHAMapItem const& item);
uint256
calculateLedgerHash(LedgerInfo const& info);
} // namespace ripple
#endif

View File

@@ -40,6 +40,11 @@ public:
std::shared_ptr<Ledger const> parent,
std::shared_ptr<Ledger const> replay);
LedgerReplay(
std::shared_ptr<Ledger const> parent,
std::shared_ptr<Ledger const> replay,
std::map<std::uint32_t, std::shared_ptr<STTx const>>&& orderedTxns);
/** @return The parent of the ledger to replay
*/
std::shared_ptr<Ledger const> const&

View File

@@ -0,0 +1,192 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_APP_LEDGER_LEDGERREPLAYTASK_H_INCLUDED
#define RIPPLE_APP_LEDGER_LEDGERREPLAYTASK_H_INCLUDED
#include <ripple/app/ledger/InboundLedger.h>
#include <ripple/app/ledger/impl/TimeoutCounter.h>
#include <ripple/app/main/Application.h>
#include <memory>
#include <vector>
namespace ripple {
class InboundLedgers;
class Ledger;
class LedgerDeltaAcquire;
class LedgerReplayer;
class SkipListAcquire;
namespace test {
class LedgerReplayClient;
} // namespace test
class LedgerReplayTask final
: public TimeoutCounter,
public std::enable_shared_from_this<LedgerReplayTask>,
public CountedObject<LedgerReplayTask>
{
public:
class TaskParameter
{
public:
// set on construct
InboundLedger::Reason reason_;
uint256 finishHash_;
std::uint32_t totalLedgers_; // including the start and the finish
// to be updated
std::uint32_t finishSeq_ = 0;
std::vector<uint256> skipList_ = {}; // including the finishHash
uint256 startHash_ = {};
std::uint32_t startSeq_ = 0;
bool full_ = false;
/**
* constructor
* @param r the reason of the task
* @param finishLedgerHash hash of the last ledger in the range
* @param totalNumLedgers number of ledgers to download
*/
TaskParameter(
InboundLedger::Reason r,
uint256 const& finishLedgerHash,
std::uint32_t totalNumLedgers);
/**
* fill all the fields that was not filled during construction
* @note called with verified skip list data
* @param hash hash of the ledger that has the skip list
* @param seq sequence number of the ledger that has the skip list
* @param sList skip list
* @return false if error (e.g. hash mismatch)
* true on success
*/
bool
update(
uint256 const& hash,
std::uint32_t seq,
std::vector<uint256> const& sList);
/** check if this task can be merged into an existing task */
bool
canMergeInto(TaskParameter const& existingTask) const;
};
/**
* Constructor
* @param app Application reference
* @param inboundLedgers InboundLedgers reference
* @param replayer LedgerReplayer reference
* @param skipListAcquirer shared_ptr of SkipListAcquire subtask,
* to make sure it will not be destroyed.
* @param parameter parameter of the task
*/
LedgerReplayTask(
Application& app,
InboundLedgers& inboundLedgers,
LedgerReplayer& replayer,
std::shared_ptr<SkipListAcquire>& skipListAcquirer,
TaskParameter&& parameter);
~LedgerReplayTask();
/** Start the task */
void
init();
/**
* add a new LedgerDeltaAcquire subtask
* @param delta the new LedgerDeltaAcquire subtask
* @note the LedgerDeltaAcquire subtasks must be added in order
*/
void
addDelta(std::shared_ptr<LedgerDeltaAcquire> const& delta);
TaskParameter const&
getTaskParameter() const
{
return parameter_;
}
/** return if the task is finished */
bool
finished() const;
static char const*
getCountedObjectName()
{
return "LedgerReplayTask";
}
private:
void
onTimer(bool progress, ScopedLockType& sl) override;
std::weak_ptr<TimeoutCounter>
pmDowncast() override;
/**
* Update this task (by a SkipListAcquire subtask) when skip list is ready
* @param hash hash of the ledger that has the skip list
* @param seq sequence number of the ledger that has the skip list
* @param sList skip list
*/
void
updateSkipList(
uint256 const& hash,
std::uint32_t seq,
std::vector<uint256> const& sList);
/**
* Notify this task (by a LedgerDeltaAcquire subtask) that a delta is ready
* @param deltaHash ledger hash of the delta
*/
void
deltaReady(uint256 const& deltaHash);
/**
* Trigger another round
* @param sl lock. this function must be called with the lock
*/
void
trigger(ScopedLockType& sl);
/**
* Try to build more ledgers
* @param sl lock. this function must be called with the lock
*/
void
tryAdvance(ScopedLockType& sl);
InboundLedgers& inboundLedgers_;
LedgerReplayer& replayer_;
TaskParameter parameter_;
uint32_t maxTimeouts_;
std::shared_ptr<SkipListAcquire> skipListAcquirer_;
std::shared_ptr<Ledger const> parent_ = {};
uint32_t deltaToBuild_ = 0; // should not build until have parent
std::vector<std::shared_ptr<LedgerDeltaAcquire>> deltas_;
friend class test::LedgerReplayClient;
};
} // namespace ripple
#endif

View File

@@ -0,0 +1,146 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_APP_LEDGER_LEDGERREPLAYER_H_INCLUDED
#define RIPPLE_APP_LEDGER_LEDGERREPLAYER_H_INCLUDED
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/ledger/LedgerReplayTask.h>
#include <ripple/app/main/Application.h>
#include <ripple/beast/utility/Journal.h>
#include <ripple/core/Stoppable.h>
#include <memory>
#include <mutex>
#include <vector>
namespace ripple {
namespace test {
class LedgerReplayClient;
} // namespace test
namespace LedgerReplayParameters {
// timeout value for LedgerReplayTask
auto constexpr TASK_TIMEOUT = std::chrono::milliseconds{500};
// for LedgerReplayTask to calculate max allowed timeouts
// = max( TASK_MAX_TIMEOUTS_MINIMUM,
// (# of ledger to replay) * TASK_MAX_TIMEOUTS_MULTIPLIER)
std::uint32_t constexpr TASK_MAX_TIMEOUTS_MULTIPLIER = 2;
std::uint32_t constexpr TASK_MAX_TIMEOUTS_MINIMUM = 10;
// timeout value for subtasks: LedgerDeltaAcquire and SkipListAcquire
auto constexpr SUB_TASK_TIMEOUT = std::chrono::milliseconds{250};
// max of allowed subtask timeouts
std::uint32_t constexpr SUB_TASK_MAX_TIMEOUTS = 10;
// max number of peers that do not support the ledger replay feature
// returned by the PeerSet before switch to fallback
auto constexpr MAX_NO_FEATURE_PEER_COUNT = 2;
// subtask timeout value after fallback
auto constexpr SUB_TASK_FALLBACK_TIMEOUT = std::chrono::milliseconds{1000};
// for LedgerReplayer to limit the number of LedgerReplayTask
std::uint32_t constexpr MAX_TASKS = 10;
// for LedgerReplayer to limit the number of ledgers to replay in one task
std::uint32_t constexpr MAX_TASK_SIZE = 256;
// to limit the number of LedgerReplay related jobs in JobQueue
std::uint32_t constexpr MAX_QUEUED_TASKS = 100;
} // namespace LedgerReplayParameters
/**
* Manages the lifetime of ledger replay tasks.
*/
class LedgerReplayer final : public Stoppable
{
public:
LedgerReplayer(
Application& app,
InboundLedgers& inboundLedgers,
std::unique_ptr<PeerSetBuilder> peerSetBuilder,
Stoppable& parent);
~LedgerReplayer();
/**
* Replay a range of ledgers
* @param r reason for the replay request
* @param finishLedgerHash hash of the last ledger
* @param totalNumLedgers total number of ledgers in the range, inclusive
* @note totalNumLedgers must > 0 && totalNumLedgers must <= 256
*/
void
replay(
InboundLedger::Reason r,
uint256 const& finishLedgerHash,
std::uint32_t totalNumLedgers);
/** Create LedgerDeltaAcquire subtasks for the LedgerReplayTask task */
void
createDeltas(std::shared_ptr<LedgerReplayTask> task);
/**
* Process a skip list (extracted from a TMProofPathResponse message)
* @param info ledger info
* @param data skip list holder
* @note info and data must have been verified against the ledger hash
*/
void
gotSkipList(
LedgerInfo const& info,
std::shared_ptr<SHAMapItem const> const& data);
/**
* Process a ledger delta (extracted from a TMReplayDeltaResponse message)
* @param info ledger info
* @param txns set of Txns of the ledger
* @note info and txns must have been verified against the ledger hash
*/
void
gotReplayDelta(
LedgerInfo const& info,
std::map<std::uint32_t, std::shared_ptr<STTx const>>&& txns);
/** Remove completed tasks */
void
sweep();
void
onStop() override;
private:
mutable std::mutex mtx_;
std::vector<std::shared_ptr<LedgerReplayTask>> tasks_;
hash_map<uint256, std::weak_ptr<LedgerDeltaAcquire>> deltas_;
hash_map<uint256, std::weak_ptr<SkipListAcquire>> skipLists_;
Application& app_;
InboundLedgers& inboundLedgers_;
std::unique_ptr<PeerSetBuilder> peerSetBuilder_;
beast::Journal j_;
friend class test::LedgerReplayClient;
};
} // namespace ripple
#endif

View File

@@ -76,8 +76,14 @@ InboundLedger::InboundLedger(
uint256 const& hash,
std::uint32_t seq,
Reason reason,
clock_type& clock)
: PeerSet(app, hash, ledgerAcquireTimeout, app.journal("InboundLedger"))
clock_type& clock,
std::unique_ptr<PeerSet> peerSet)
: TimeoutCounter(
app,
hash,
ledgerAcquireTimeout,
{jtLEDGER_DATA, "InboundLedger", 5},
app.journal("InboundLedger"))
, m_clock(clock)
, mHaveHeader(false)
, mHaveState(false)
@@ -87,31 +93,32 @@ InboundLedger::InboundLedger(
, mSeq(seq)
, mReason(reason)
, mReceiveDispatched(false)
, mPeerSet(std::move(peerSet))
{
JLOG(m_journal.trace()) << "Acquiring ledger " << mHash;
JLOG(journal_.trace()) << "Acquiring ledger " << hash_;
touch();
}
void
InboundLedger::init(ScopedLockType& collectionLock)
{
ScopedLockType sl(mLock);
ScopedLockType sl(mtx_);
collectionLock.unlock();
tryDB(app_.getNodeFamily().db());
if (mFailed)
if (failed_)
return;
if (!mComplete)
if (!complete_)
{
auto shardStore = app_.getShardStore();
if (mReason == Reason::SHARD)
{
if (!shardStore)
{
JLOG(m_journal.error())
JLOG(journal_.error())
<< "Acquiring shard with no shard store available";
mFailed = true;
failed_ = true;
return;
}
@@ -121,30 +128,30 @@ InboundLedger::init(ScopedLockType& collectionLock)
mLedger.reset();
tryDB(app_.getShardFamily()->db());
if (mFailed)
if (failed_)
return;
}
else if (shardStore && mSeq >= shardStore->earliestLedgerSeq())
{
if (auto l = shardStore->fetchLedger(mHash, mSeq))
if (auto l = shardStore->fetchLedger(hash_, mSeq))
{
mHaveHeader = true;
mHaveTransactions = true;
mHaveState = true;
mComplete = true;
complete_ = true;
mLedger = std::move(l);
}
}
}
if (!mComplete)
if (!complete_)
{
addPeers();
queueJob();
queueJob(sl);
return;
}
JLOG(m_journal.debug()) << "Acquiring ledger we already have in "
<< " local store. " << mHash;
JLOG(journal_.debug()) << "Acquiring ledger we already have in "
<< " local store. " << hash_;
mLedger->setImmutable(app_.config());
if (mReason == Reason::HISTORY || mReason == Reason::SHARD)
@@ -160,31 +167,16 @@ InboundLedger::init(ScopedLockType& collectionLock)
std::size_t
InboundLedger::getPeerCount() const
{
return std::count_if(mPeers.begin(), mPeers.end(), [this](auto id) {
return app_.overlay().findPeerByShortID(id) != nullptr;
auto const& peerIds = mPeerSet->getPeerIds();
return std::count_if(peerIds.begin(), peerIds.end(), [this](auto id) {
return (app_.overlay().findPeerByShortID(id) != nullptr);
});
}
void
InboundLedger::queueJob()
{
if (app_.getJobQueue().getJobCountTotal(jtLEDGER_DATA) > 4)
{
JLOG(m_journal.debug()) << "Deferring InboundLedger timer due to load";
setTimer();
return;
}
app_.getJobQueue().addJob(
jtLEDGER_DATA, "InboundLedger", [ptr = shared_from_this()](Job&) {
ptr->invokeOnTimer();
});
}
void
InboundLedger::update(std::uint32_t seq)
{
ScopedLockType sl(mLock);
ScopedLockType sl(mtx_);
// If we didn't know the sequence number, but now do, save it
if ((seq != 0) && (mSeq == 0))
@@ -197,7 +189,7 @@ InboundLedger::update(std::uint32_t seq)
bool
InboundLedger::checkLocal()
{
ScopedLockType sl(mLock);
ScopedLockType sl(mtx_);
if (!isDone())
{
if (mLedger)
@@ -206,7 +198,7 @@ InboundLedger::checkLocal()
tryDB(app_.getShardFamily()->db());
else
tryDB(app_.getNodeFamily().db());
if (mFailed || mComplete)
if (failed_ || complete_)
{
done();
return true;
@@ -226,11 +218,11 @@ InboundLedger::~InboundLedger()
}
if (!isDone())
{
JLOG(m_journal.debug())
<< "Acquire " << mHash << " abort "
<< ((mTimeouts == 0) ? std::string()
JLOG(journal_.debug())
<< "Acquire " << hash_ << " abort "
<< ((timeouts_ == 0) ? std::string()
: (std::string("timeouts:") +
std::to_string(mTimeouts) + " "))
std::to_string(timeouts_) + " "))
<< mStats.get();
}
}
@@ -311,31 +303,31 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
if (!mHaveHeader)
{
auto makeLedger = [&, this](Blob const& data) {
JLOG(m_journal.trace()) << "Ledger header found in fetch pack";
JLOG(journal_.trace()) << "Ledger header found in fetch pack";
mLedger = std::make_shared<Ledger>(
deserializePrefixedHeader(makeSlice(data)),
app_.config(),
mReason == Reason::SHARD ? *app_.getShardFamily()
: app_.getNodeFamily());
if (mLedger->info().hash != mHash ||
if (mLedger->info().hash != hash_ ||
(mSeq != 0 && mSeq != mLedger->info().seq))
{
// We know for a fact the ledger can never be acquired
JLOG(m_journal.warn())
<< "hash " << mHash << " seq " << std::to_string(mSeq)
JLOG(journal_.warn())
<< "hash " << hash_ << " seq " << std::to_string(mSeq)
<< " cannot be a ledger";
mLedger.reset();
mFailed = true;
failed_ = true;
}
};
// Try to fetch the ledger header from the DB
if (auto nodeObject = srcDB.fetchNodeObject(mHash, mSeq))
if (auto nodeObject = srcDB.fetchNodeObject(hash_, mSeq))
{
JLOG(m_journal.trace()) << "Ledger header found in local store";
JLOG(journal_.trace()) << "Ledger header found in local store";
makeLedger(nodeObject->getData());
if (mFailed)
if (failed_)
return;
// Store the ledger header if the source and destination differ
@@ -344,25 +336,25 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
{
Blob blob{nodeObject->getData()};
dstDB.store(
hotLEDGER, std::move(blob), mHash, mLedger->info().seq);
hotLEDGER, std::move(blob), hash_, mLedger->info().seq);
}
}
else
{
// Try to fetch the ledger header from a fetch pack
auto data = app_.getLedgerMaster().getFetchPack(mHash);
auto data = app_.getLedgerMaster().getFetchPack(hash_);
if (!data)
return;
JLOG(m_journal.trace()) << "Ledger header found in fetch pack";
JLOG(journal_.trace()) << "Ledger header found in fetch pack";
makeLedger(*data);
if (mFailed)
if (failed_)
return;
// Store the ledger header in the ledger's database
mLedger->stateMap().family().db().store(
hotLEDGER, std::move(*data), mHash, mLedger->info().seq);
hotLEDGER, std::move(*data), hash_, mLedger->info().seq);
}
if (mSeq == 0)
@@ -376,7 +368,7 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
{
if (mLedger->info().txHash.isZero())
{
JLOG(m_journal.trace()) << "No TXNs to fetch";
JLOG(journal_.trace()) << "No TXNs to fetch";
mHaveTransactions = true;
}
else
@@ -388,7 +380,7 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
{
if (neededTxHashes(1, &filter).empty())
{
JLOG(m_journal.trace()) << "Had full txn map locally";
JLOG(journal_.trace()) << "Had full txn map locally";
mHaveTransactions = true;
}
}
@@ -399,9 +391,9 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
{
if (mLedger->info().accountHash.isZero())
{
JLOG(m_journal.fatal())
JLOG(journal_.fatal())
<< "We are acquiring a ledger with a zero account hash";
mFailed = true;
failed_ = true;
return;
}
AccountStateSF filter(
@@ -411,7 +403,7 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
{
if (neededStateHashes(1, &filter).empty())
{
JLOG(m_journal.trace()) << "Had full AS map locally";
JLOG(journal_.trace()) << "Had full AS map locally";
mHaveState = true;
}
}
@@ -419,8 +411,8 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
if (mHaveTransactions && mHaveState)
{
JLOG(m_journal.debug()) << "Had everything locally";
mComplete = true;
JLOG(journal_.debug()) << "Had everything locally";
complete_ = true;
mLedger->setImmutable(app_.config());
}
}
@@ -434,23 +426,23 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
if (isDone())
{
JLOG(m_journal.info()) << "Already done " << mHash;
JLOG(journal_.info()) << "Already done " << hash_;
return;
}
if (mTimeouts > ledgerTimeoutRetriesMax)
if (timeouts_ > ledgerTimeoutRetriesMax)
{
if (mSeq != 0)
{
JLOG(m_journal.warn())
<< mTimeouts << " timeouts for ledger " << mSeq;
JLOG(journal_.warn())
<< timeouts_ << " timeouts for ledger " << mSeq;
}
else
{
JLOG(m_journal.warn())
<< mTimeouts << " timeouts for ledger " << mHash;
JLOG(journal_.warn())
<< timeouts_ << " timeouts for ledger " << hash_;
}
mFailed = true;
failed_ = true;
done();
return;
}
@@ -462,8 +454,8 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
mByHash = true;
std::size_t pc = getPeerCount();
JLOG(m_journal.debug())
<< "No progress(" << pc << ") for ledger " << mHash;
JLOG(journal_.debug())
<< "No progress(" << pc << ") for ledger " << hash_;
// addPeers triggers if the reason is not HISTORY
// So if the reason IS HISTORY, need to trigger after we add
@@ -481,12 +473,18 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
void
InboundLedger::addPeers()
{
PeerSet::addPeers(
mPeerSet->addPeers(
(getPeerCount() == 0) ? peerCountStart : peerCountAdd,
[this](auto peer) { return peer->hasLedger(mHash, mSeq); });
[this](auto peer) { return peer->hasLedger(hash_, mSeq); },
[this](auto peer) {
// For historical nodes, do not trigger too soon
// since a fetch pack is probably coming
if (mReason != Reason::HISTORY)
trigger(peer, TriggerReason::added);
});
}
std::weak_ptr<PeerSet>
std::weak_ptr<TimeoutCounter>
InboundLedger::pmDowncast()
{
return shared_from_this();
@@ -501,16 +499,16 @@ InboundLedger::done()
mSignaled = true;
touch();
JLOG(m_journal.debug())
<< "Acquire " << mHash << (mFailed ? " fail " : " ")
<< ((mTimeouts == 0)
? std::string()
: (std::string("timeouts:") + std::to_string(mTimeouts) + " "))
<< mStats.get();
JLOG(journal_.debug()) << "Acquire " << hash_ << (failed_ ? " fail " : " ")
<< ((timeouts_ == 0)
? std::string()
: (std::string("timeouts:") +
std::to_string(timeouts_) + " "))
<< mStats.get();
assert(mComplete || mFailed);
assert(complete_ || failed_);
if (mComplete && !mFailed && mLedger)
if (complete_ && !failed_ && mLedger)
{
mLedger->setImmutable(app_.config());
switch (mReason)
@@ -530,14 +528,14 @@ InboundLedger::done()
// We hold the PeerSet lock, so must dispatch
app_.getJobQueue().addJob(
jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()](Job&) {
if (self->mComplete && !self->mFailed)
if (self->complete_ && !self->failed_)
{
self->app_.getLedgerMaster().checkAccept(self->getLedger());
self->app_.getLedgerMaster().tryAdvance();
}
else
self->app_.getInboundLedgers().logFailure(
self->mHash, self->mSeq);
self->hash_, self->mSeq);
});
}
@@ -546,25 +544,25 @@ InboundLedger::done()
void
InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
{
ScopedLockType sl(mLock);
ScopedLockType sl(mtx_);
if (isDone())
{
JLOG(m_journal.debug())
<< "Trigger on ledger: " << mHash << (mComplete ? " completed" : "")
<< (mFailed ? " failed" : "");
JLOG(journal_.debug())
<< "Trigger on ledger: " << hash_ << (complete_ ? " completed" : "")
<< (failed_ ? " failed" : "");
return;
}
if (auto stream = m_journal.trace())
if (auto stream = journal_.trace())
{
if (peer)
stream << "Trigger acquiring ledger " << mHash << " from " << peer;
stream << "Trigger acquiring ledger " << hash_ << " from " << peer;
else
stream << "Trigger acquiring ledger " << mHash;
stream << "Trigger acquiring ledger " << hash_;
if (mComplete || mFailed)
stream << "complete=" << mComplete << " failed=" << mFailed;
if (complete_ || failed_)
stream << "complete=" << complete_ << " failed=" << failed_;
else
stream << "header=" << mHaveHeader << " tx=" << mHaveTransactions
<< " as=" << mHaveState;
@@ -575,23 +573,23 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
tryDB(
mReason == Reason::SHARD ? app_.getShardFamily()->db()
: app_.getNodeFamily().db());
if (mFailed)
if (failed_)
{
JLOG(m_journal.warn()) << " failed local for " << mHash;
JLOG(journal_.warn()) << " failed local for " << hash_;
return;
}
}
protocol::TMGetLedger tmGL;
tmGL.set_ledgerhash(mHash.begin(), mHash.size());
tmGL.set_ledgerhash(hash_.begin(), hash_.size());
if (mTimeouts != 0)
if (timeouts_ != 0)
{
// Be more aggressive if we've timed out at least once
tmGL.set_querytype(protocol::qtINDIRECT);
if (!mProgress && !mFailed && mByHash &&
(mTimeouts > ledgerBecomeAggressiveThreshold))
if (!progress_ && !failed_ && mByHash &&
(timeouts_ > ledgerBecomeAggressiveThreshold))
{
auto need = getNeededHashes();
@@ -600,10 +598,10 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
protocol::TMGetObjectByHash tmBH;
bool typeSet = false;
tmBH.set_query(true);
tmBH.set_ledgerhash(mHash.begin(), mHash.size());
tmBH.set_ledgerhash(hash_.begin(), hash_.size());
for (auto const& p : need)
{
JLOG(m_journal.warn()) << "Want: " << p.second;
JLOG(journal_.warn()) << "Want: " << p.second;
if (!typeSet)
{
@@ -622,38 +620,38 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
auto packet =
std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS);
for (auto id : mPeers)
{
if (auto p = app_.overlay().findPeerByShortID(id))
{
mByHash = false;
p->send(packet);
}
}
auto const& peerIds = mPeerSet->getPeerIds();
std::for_each(
peerIds.begin(), peerIds.end(), [this, &packet](auto id) {
if (auto p = app_.overlay().findPeerByShortID(id))
{
mByHash = false;
p->send(packet);
}
});
}
else
{
JLOG(m_journal.info())
JLOG(journal_.info())
<< "getNeededHashes says acquire is complete";
mHaveHeader = true;
mHaveTransactions = true;
mHaveState = true;
mComplete = true;
complete_ = true;
}
}
}
// We can't do much without the header data because we don't know the
// state or transaction root hashes.
if (!mHaveHeader && !mFailed)
if (!mHaveHeader && !failed_)
{
tmGL.set_itype(protocol::liBASE);
if (mSeq != 0)
tmGL.set_ledgerseq(mSeq);
JLOG(m_journal.trace()) << "Sending header request to "
<< (peer ? "selected peer" : "all peers");
sendRequest(tmGL, peer);
JLOG(journal_.trace()) << "Sending header request to "
<< (peer ? "selected peer" : "all peers");
mPeerSet->sendRequest(tmGL, peer);
return;
}
@@ -675,22 +673,22 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
// Get the state data first because it's the most likely to be useful
// if we wind up abandoning this fetch.
if (mHaveHeader && !mHaveState && !mFailed)
if (mHaveHeader && !mHaveState && !failed_)
{
assert(mLedger);
if (!mLedger->stateMap().isValid())
{
mFailed = true;
failed_ = true;
}
else if (mLedger->stateMap().getHash().isZero())
{
// we need the root node
tmGL.set_itype(protocol::liAS_NODE);
*tmGL.add_nodeids() = SHAMapNodeID().getRawString();
JLOG(m_journal.trace()) << "Sending AS root request to "
<< (peer ? "selected peer" : "all peers");
sendRequest(tmGL, peer);
JLOG(journal_.trace()) << "Sending AS root request to "
<< (peer ? "selected peer" : "all peers");
mPeerSet->sendRequest(tmGL, peer);
return;
}
else
@@ -705,18 +703,18 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
sl.lock();
// Make sure nothing happened while we released the lock
if (!mFailed && !mComplete && !mHaveState)
if (!failed_ && !complete_ && !mHaveState)
{
if (nodes.empty())
{
if (!mLedger->stateMap().isValid())
mFailed = true;
failed_ = true;
else
{
mHaveState = true;
if (mHaveTransactions)
mComplete = true;
complete_ = true;
}
}
else
@@ -731,38 +729,38 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
*(tmGL.add_nodeids()) = id.first.getRawString();
}
JLOG(m_journal.trace())
JLOG(journal_.trace())
<< "Sending AS node request (" << nodes.size()
<< ") to "
<< (peer ? "selected peer" : "all peers");
sendRequest(tmGL, peer);
mPeerSet->sendRequest(tmGL, peer);
return;
}
else
{
JLOG(m_journal.trace()) << "All AS nodes filtered";
JLOG(journal_.trace()) << "All AS nodes filtered";
}
}
}
}
}
if (mHaveHeader && !mHaveTransactions && !mFailed)
if (mHaveHeader && !mHaveTransactions && !failed_)
{
assert(mLedger);
if (!mLedger->txMap().isValid())
{
mFailed = true;
failed_ = true;
}
else if (mLedger->txMap().getHash().isZero())
{
// we need the root node
tmGL.set_itype(protocol::liTX_NODE);
*(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
JLOG(m_journal.trace()) << "Sending TX root request to "
<< (peer ? "selected peer" : "all peers");
sendRequest(tmGL, peer);
JLOG(journal_.trace()) << "Sending TX root request to "
<< (peer ? "selected peer" : "all peers");
mPeerSet->sendRequest(tmGL, peer);
return;
}
else
@@ -776,13 +774,13 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
if (nodes.empty())
{
if (!mLedger->txMap().isValid())
mFailed = true;
failed_ = true;
else
{
mHaveTransactions = true;
if (mHaveState)
mComplete = true;
complete_ = true;
}
}
else
@@ -796,25 +794,25 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
{
*(tmGL.add_nodeids()) = n.first.getRawString();
}
JLOG(m_journal.trace())
JLOG(journal_.trace())
<< "Sending TX node request (" << nodes.size()
<< ") to " << (peer ? "selected peer" : "all peers");
sendRequest(tmGL, peer);
mPeerSet->sendRequest(tmGL, peer);
return;
}
else
{
JLOG(m_journal.trace()) << "All TX nodes filtered";
JLOG(journal_.trace()) << "All TX nodes filtered";
}
}
}
}
if (mComplete || mFailed)
if (complete_ || failed_)
{
JLOG(m_journal.debug())
<< "Done:" << (mComplete ? " complete" : "")
<< (mFailed ? " failed " : " ") << mLedger->info().seq;
JLOG(journal_.debug())
<< "Done:" << (complete_ ? " complete" : "")
<< (failed_ ? " failed " : " ") << mLedger->info().seq;
sl.unlock();
done();
}
@@ -837,7 +835,7 @@ InboundLedger::filterNodes(
// to query everyone:
if (dup == nodes.begin())
{
JLOG(m_journal.trace()) << "filterNodes: all duplicates";
JLOG(journal_.trace()) << "filterNodes: all duplicates";
if (reason != TriggerReason::timeout)
{
@@ -847,7 +845,7 @@ InboundLedger::filterNodes(
}
else
{
JLOG(m_journal.trace()) << "filterNodes: pruning duplicates";
JLOG(journal_.trace()) << "filterNodes: pruning duplicates";
nodes.erase(dup, nodes.end());
}
@@ -870,21 +868,21 @@ bool
InboundLedger::takeHeader(std::string const& data)
{
// Return value: true=normal, false=bad data
JLOG(m_journal.trace()) << "got header acquiring ledger " << mHash;
JLOG(journal_.trace()) << "got header acquiring ledger " << hash_;
if (mComplete || mFailed || mHaveHeader)
if (complete_ || failed_ || mHaveHeader)
return true;
auto* f = mReason == Reason::SHARD ? app_.getShardFamily()
: &app_.getNodeFamily();
mLedger = std::make_shared<Ledger>(
deserializeHeader(makeSlice(data)), app_.config(), *f);
if (mLedger->info().hash != mHash ||
if (mLedger->info().hash != hash_ ||
(mSeq != 0 && mSeq != mLedger->info().seq))
{
JLOG(m_journal.warn())
JLOG(journal_.warn())
<< "Acquire hash mismatch: " << mLedger->info().hash
<< "!=" << mHash;
<< "!=" << hash_;
mLedger.reset();
return false;
}
@@ -897,7 +895,7 @@ InboundLedger::takeHeader(std::string const& data)
Serializer s(data.size() + 4);
s.add32(HashPrefix::ledgerMaster);
s.addRaw(data.data(), data.size());
f->db().store(hotLEDGER, std::move(s.modData()), mHash, mSeq);
f->db().store(hotLEDGER, std::move(s.modData()), hash_, mSeq);
if (mLedger->info().txHash.isZero())
mHaveTransactions = true;
@@ -919,19 +917,19 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
{
if (!mHaveHeader)
{
JLOG(m_journal.warn()) << "Missing ledger header";
JLOG(journal_.warn()) << "Missing ledger header";
san.incInvalid();
return;
}
if (packet.type() == protocol::liTX_NODE)
{
if (mHaveTransactions || mFailed)
if (mHaveTransactions || failed_)
{
san.incDuplicate();
return;
}
}
else if (mHaveState || mFailed)
else if (mHaveState || failed_)
{
san.incDuplicate();
return;
@@ -973,14 +971,14 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
if (!san.isGood())
{
JLOG(m_journal.warn()) << "Received bad node data";
JLOG(journal_.warn()) << "Received bad node data";
return;
}
}
}
catch (std::exception const& e)
{
JLOG(m_journal.error()) << "Received bad node data: " << e.what();
JLOG(journal_.error()) << "Received bad node data: " << e.what();
san.incInvalid();
return;
}
@@ -994,7 +992,7 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
if (mHaveTransactions && mHaveState)
{
mComplete = true;
complete_ = true;
done();
}
}
@@ -1006,7 +1004,7 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
bool
InboundLedger::takeAsRootNode(Slice const& data, SHAMapAddNode& san)
{
if (mFailed || mHaveState)
if (failed_ || mHaveState)
{
san.incDuplicate();
return true;
@@ -1031,7 +1029,7 @@ InboundLedger::takeAsRootNode(Slice const& data, SHAMapAddNode& san)
bool
InboundLedger::takeTxRootNode(Slice const& data, SHAMapAddNode& san)
{
if (mFailed || mHaveTransactions)
if (failed_ || mHaveTransactions)
{
san.incDuplicate();
return true;
@@ -1058,7 +1056,7 @@ InboundLedger::getNeededHashes()
if (!mHaveHeader)
{
ret.push_back(
std::make_pair(protocol::TMGetObjectByHash::otLEDGER, mHash));
std::make_pair(protocol::TMGetObjectByHash::otLEDGER, hash_));
return ret;
}
@@ -1122,13 +1120,13 @@ InboundLedger::processData(
std::shared_ptr<Peer> peer,
protocol::TMLedgerData& packet)
{
ScopedLockType sl(mLock);
ScopedLockType sl(mtx_);
if (packet.type() == protocol::liBASE)
{
if (packet.nodes_size() < 1)
{
JLOG(m_journal.warn()) << "Got empty header data";
JLOG(journal_.warn()) << "Got empty header data";
peer->charge(Resource::feeInvalidRequest);
return -1;
}
@@ -1141,7 +1139,7 @@ InboundLedger::processData(
{
if (!takeHeader(packet.nodes(0).nodedata()))
{
JLOG(m_journal.warn()) << "Got invalid header data";
JLOG(journal_.warn()) << "Got invalid header data";
peer->charge(Resource::feeInvalidRequest);
return -1;
}
@@ -1152,25 +1150,25 @@ InboundLedger::processData(
if (!mHaveState && (packet.nodes().size() > 1) &&
!takeAsRootNode(makeSlice(packet.nodes(1).nodedata()), san))
{
JLOG(m_journal.warn()) << "Included AS root invalid";
JLOG(journal_.warn()) << "Included AS root invalid";
}
if (!mHaveTransactions && (packet.nodes().size() > 2) &&
!takeTxRootNode(makeSlice(packet.nodes(2).nodedata()), san))
{
JLOG(m_journal.warn()) << "Included TX root invalid";
JLOG(journal_.warn()) << "Included TX root invalid";
}
}
catch (std::exception const& ex)
{
JLOG(m_journal.warn())
JLOG(journal_.warn())
<< "Included AS/TX root invalid: " << ex.what();
peer->charge(Resource::feeBadData);
return -1;
}
if (san.isUseful())
mProgress = true;
progress_ = true;
mStats += san;
return san.getGood();
@@ -1181,7 +1179,7 @@ InboundLedger::processData(
{
if (packet.nodes().size() == 0)
{
JLOG(m_journal.info()) << "Got response with no nodes";
JLOG(journal_.info()) << "Got response with no nodes";
peer->charge(Resource::feeInvalidRequest);
return -1;
}
@@ -1191,7 +1189,7 @@ InboundLedger::processData(
{
if (!node.has_nodeid() || !node.has_nodedata())
{
JLOG(m_journal.warn()) << "Got bad node";
JLOG(journal_.warn()) << "Got bad node";
peer->charge(Resource::feeInvalidRequest);
return -1;
}
@@ -1202,15 +1200,15 @@ InboundLedger::processData(
if (packet.type() == protocol::liTX_NODE)
{
JLOG(m_journal.debug()) << "Ledger TX node stats: " << san.get();
JLOG(journal_.debug()) << "Ledger TX node stats: " << san.get();
}
else
{
JLOG(m_journal.debug()) << "Ledger AS node stats: " << san.get();
JLOG(journal_.debug()) << "Ledger AS node stats: " << san.get();
}
if (san.isUseful())
mProgress = true;
progress_ = true;
mStats += san;
return san.getGood();
@@ -1270,18 +1268,18 @@ InboundLedger::getJson(int)
{
Json::Value ret(Json::objectValue);
ScopedLockType sl(mLock);
ScopedLockType sl(mtx_);
ret[jss::hash] = to_string(mHash);
ret[jss::hash] = to_string(hash_);
if (mComplete)
if (complete_)
ret[jss::complete] = true;
if (mFailed)
if (failed_)
ret[jss::failed] = true;
if (!mComplete && !mFailed)
ret[jss::peers] = static_cast<int>(mPeers.size());
if (!complete_ && !failed_)
ret[jss::peers] = static_cast<int>(mPeerSet->getPeerIds().size());
ret[jss::have_header] = mHaveHeader;
@@ -1291,7 +1289,7 @@ InboundLedger::getJson(int)
ret[jss::have_transactions] = mHaveTransactions;
}
ret[jss::timeouts] = mTimeouts;
ret[jss::timeouts] = timeouts_;
if (mHaveHeader && !mHaveState)
{

View File

@@ -50,7 +50,8 @@ public:
Application& app,
clock_type& clock,
Stoppable& parent,
beast::insight::Collector::ptr const& collector)
beast::insight::Collector::ptr const& collector,
std::unique_ptr<PeerSetBuilder> peerSetBuilder)
: Stoppable("InboundLedgers", parent)
, app_(app)
, fetchRate_(clock.now())
@@ -58,6 +59,7 @@ public:
, m_clock(clock)
, mRecentFailures(clock)
, mCounter(collector->make_counter("ledger_fetches"))
, mPeerSetBuilder(std::move(peerSetBuilder))
{
}
@@ -88,7 +90,12 @@ public:
else
{
inbound = std::make_shared<InboundLedger>(
app_, hash, seq, reason, std::ref(m_clock));
app_,
hash,
seq,
reason,
std::ref(m_clock),
mPeerSetBuilder->build());
mLedgers.emplace(hash, inbound);
inbound->init(sl);
++mCounter;
@@ -404,6 +411,8 @@ private:
beast::aged_map<uint256, std::uint32_t> mRecentFailures;
beast::insight::Counter mCounter;
std::unique_ptr<PeerSetBuilder> mPeerSetBuilder;
};
//------------------------------------------------------------------------------
@@ -415,7 +424,8 @@ make_InboundLedgers(
Stoppable& parent,
beast::insight::Collector::ptr const& collector)
{
return std::make_unique<InboundLedgersImp>(app, clock, parent, collector);
return std::make_unique<InboundLedgersImp>(
app, clock, parent, collector, make_PeerSetBuilder(app));
}
} // namespace ripple

View File

@@ -67,12 +67,14 @@ public:
Application& app,
Stoppable& parent,
beast::insight::Collector::ptr const& collector,
std::function<void(std::shared_ptr<SHAMap> const&, bool)> gotSet)
std::function<void(std::shared_ptr<SHAMap> const&, bool)> gotSet,
std::unique_ptr<PeerSetBuilder> peerSetBuilder)
: Stoppable("InboundTransactions", parent)
, app_(app)
, m_seq(0)
, m_zeroSet(m_map[uint256()])
, m_gotSet(std::move(gotSet))
, m_peerSetBuilder(std::move(peerSetBuilder))
{
m_zeroSet.mSet = std::make_shared<SHAMap>(
SHAMapType::TRANSACTION, uint256(), app_.getNodeFamily());
@@ -119,7 +121,8 @@ public:
if (!acquire || isStopping())
return std::shared_ptr<SHAMap>();
ta = std::make_shared<TransactionAcquire>(app_, hash);
ta = std::make_shared<TransactionAcquire>(
app_, hash, m_peerSetBuilder->build());
auto& obj = m_map[hash];
obj.mAcquire = ta;
@@ -260,6 +263,8 @@ private:
InboundTransactionSet& m_zeroSet;
std::function<void(std::shared_ptr<SHAMap> const&, bool)> m_gotSet;
std::unique_ptr<PeerSetBuilder> m_peerSetBuilder;
};
//------------------------------------------------------------------------------
@@ -274,7 +279,7 @@ make_InboundTransactions(
std::function<void(std::shared_ptr<SHAMap> const&, bool)> gotSet)
{
return std::make_unique<InboundTransactionsImp>(
app, parent, collector, std::move(gotSet));
app, parent, collector, std::move(gotSet), make_PeerSetBuilder(app));
}
} // namespace ripple

View File

@@ -0,0 +1,279 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/app/ledger/BuildLedger.h>
#include <ripple/app/ledger/InboundLedger.h>
#include <ripple/app/ledger/LedgerReplay.h>
#include <ripple/app/ledger/LedgerReplayer.h>
#include <ripple/app/ledger/impl/LedgerDeltaAcquire.h>
#include <ripple/app/main/Application.h>
#include <ripple/core/JobQueue.h>
#include <ripple/overlay/PeerSet.h>
namespace ripple {
LedgerDeltaAcquire::LedgerDeltaAcquire(
Application& app,
InboundLedgers& inboundLedgers,
uint256 const& ledgerHash,
std::uint32_t ledgerSeq,
std::unique_ptr<PeerSet> peerSet)
: TimeoutCounter(
app,
ledgerHash,
LedgerReplayParameters::SUB_TASK_TIMEOUT,
{jtREPLAY_TASK,
"LedgerReplayDelta",
LedgerReplayParameters::MAX_QUEUED_TASKS},
app.journal("LedgerReplayDelta"))
, inboundLedgers_(inboundLedgers)
, ledgerSeq_(ledgerSeq)
, peerSet_(std::move(peerSet))
{
JLOG(journal_.trace()) << "Create " << hash_ << " Seq " << ledgerSeq;
}
LedgerDeltaAcquire::~LedgerDeltaAcquire()
{
JLOG(journal_.trace()) << "Destroy " << hash_;
}
void
LedgerDeltaAcquire::init(int numPeers)
{
ScopedLockType sl(mtx_);
if (!isDone())
{
trigger(numPeers, sl);
setTimer(sl);
}
}
void
LedgerDeltaAcquire::trigger(std::size_t limit, ScopedLockType& sl)
{
fullLedger_ = app_.getLedgerMaster().getLedgerByHash(hash_);
if (fullLedger_)
{
complete_ = true;
JLOG(journal_.trace()) << "existing ledger " << hash_;
notify(sl);
return;
}
if (!fallBack_)
{
peerSet_->addPeers(
limit,
[this](auto peer) {
return peer->supportsFeature(ProtocolFeature::LedgerReplay) &&
peer->hasLedger(hash_, ledgerSeq_);
},
[this](auto peer) {
if (peer->supportsFeature(ProtocolFeature::LedgerReplay))
{
JLOG(journal_.trace())
<< "Add a peer " << peer->id() << " for " << hash_;
protocol::TMReplayDeltaRequest request;
request.set_ledgerhash(hash_.data(), hash_.size());
peerSet_->sendRequest(request, peer);
}
else
{
if (++noFeaturePeerCount >=
LedgerReplayParameters::MAX_NO_FEATURE_PEER_COUNT)
{
JLOG(journal_.debug()) << "Fall back for " << hash_;
timerInterval_ =
LedgerReplayParameters::SUB_TASK_FALLBACK_TIMEOUT;
fallBack_ = true;
}
}
});
}
if (fallBack_)
inboundLedgers_.acquire(
hash_, ledgerSeq_, InboundLedger::Reason::GENERIC);
}
void
LedgerDeltaAcquire::onTimer(bool progress, ScopedLockType& sl)
{
JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_;
if (timeouts_ > LedgerReplayParameters::SUB_TASK_MAX_TIMEOUTS)
{
failed_ = true;
JLOG(journal_.debug()) << "too many timeouts " << hash_;
notify(sl);
}
else
{
trigger(1, sl);
}
}
std::weak_ptr<TimeoutCounter>
LedgerDeltaAcquire::pmDowncast()
{
return shared_from_this();
}
void
LedgerDeltaAcquire::processData(
LedgerInfo const& info,
std::map<std::uint32_t, std::shared_ptr<STTx const>>&& orderedTxns)
{
ScopedLockType sl(mtx_);
JLOG(journal_.trace()) << "got data for " << hash_;
if (isDone())
return;
if (info.seq == ledgerSeq_)
{
// create a temporary ledger for building a LedgerReplay object later
replayTemp_ =
std::make_shared<Ledger>(info, app_.config(), app_.getNodeFamily());
if (replayTemp_)
{
complete_ = true;
orderedTxns_ = std::move(orderedTxns);
JLOG(journal_.debug()) << "ready to replay " << hash_;
notify(sl);
return;
}
}
failed_ = true;
JLOG(journal_.error())
<< "failed to create a (info only) ledger from verified data " << hash_;
notify(sl);
}
void
LedgerDeltaAcquire::addDataCallback(
InboundLedger::Reason reason,
OnDeltaDataCB&& cb)
{
ScopedLockType sl(mtx_);
dataReadyCallbacks_.emplace_back(std::move(cb));
if (reasons_.count(reason) == 0)
{
reasons_.emplace(reason);
if (fullLedger_)
onLedgerBuilt(sl, reason);
}
if (isDone())
{
JLOG(journal_.debug())
<< "task added to a finished LedgerDeltaAcquire " << hash_;
notify(sl);
}
}
std::shared_ptr<Ledger const>
LedgerDeltaAcquire::tryBuild(std::shared_ptr<Ledger const> const& parent)
{
ScopedLockType sl(mtx_);
if (fullLedger_)
return fullLedger_;
if (failed_ || !complete_ || !replayTemp_)
return {};
assert(parent->seq() + 1 == replayTemp_->seq());
assert(parent->info().hash == replayTemp_->info().parentHash);
// build ledger
LedgerReplay replayData(parent, replayTemp_, std::move(orderedTxns_));
fullLedger_ = buildLedger(replayData, tapNONE, app_, journal_);
if (fullLedger_ && fullLedger_->info().hash == hash_)
{
JLOG(journal_.info()) << "Built " << hash_;
onLedgerBuilt(sl);
return fullLedger_;
}
else
{
failed_ = true;
complete_ = false;
JLOG(journal_.error()) << "tryBuild failed " << hash_ << " with parent "
<< parent->info().hash;
Throw<std::runtime_error>("Cannot replay ledger");
}
}
void
LedgerDeltaAcquire::onLedgerBuilt(
ScopedLockType& sl,
std::optional<InboundLedger::Reason> reason)
{
JLOG(journal_.debug()) << "onLedgerBuilt " << hash_
<< (reason ? " for a new reason" : "");
std::vector<InboundLedger::Reason> reasons(
reasons_.begin(), reasons_.end());
bool firstTime = true;
if (reason) // small chance
{
reasons.clear();
reasons.push_back(*reason);
firstTime = false;
}
app_.getJobQueue().addJob(
jtREPLAY_TASK,
"onLedgerBuilt",
[=, ledger = this->fullLedger_, &app = this->app_](Job&) {
for (auto reason : reasons)
{
switch (reason)
{
case InboundLedger::Reason::GENERIC:
app.getLedgerMaster().storeLedger(ledger);
break;
default:
// TODO for other use cases
break;
}
}
if (firstTime)
app.getLedgerMaster().tryAdvance();
});
}
void
LedgerDeltaAcquire::notify(ScopedLockType& sl)
{
assert(isDone());
std::vector<OnDeltaDataCB> toCall;
std::swap(toCall, dataReadyCallbacks_);
auto const good = !failed_;
sl.unlock();
for (auto& cb : toCall)
{
cb(good, hash_);
}
sl.lock();
}
} // namespace ripple

View File

@@ -0,0 +1,172 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_APP_LEDGER_LEDGERDELTAACQUIRE_H_INCLUDED
#define RIPPLE_APP_LEDGER_LEDGERDELTAACQUIRE_H_INCLUDED
#include <ripple/app/ledger/InboundLedger.h>
#include <ripple/app/ledger/Ledger.h>
#include <ripple/app/ledger/impl/TimeoutCounter.h>
#include <ripple/basics/CountedObject.h>
#include <ripple/basics/base_uint.h>
#include <list>
#include <map>
namespace ripple {
class InboundLedgers;
class PeerSet;
namespace test {
class LedgerReplayClient;
} // namespace test
/**
* Manage the retrieval of a ledger delta (header and transactions)
* from the network. Before asking peers, always check if the local
* node has the ledger.
*/
class LedgerDeltaAcquire final
: public TimeoutCounter,
public std::enable_shared_from_this<LedgerDeltaAcquire>,
public CountedObject<LedgerDeltaAcquire>
{
public:
/**
* A callback used to notify that the delta's data is ready or failed.
* @param successful if the ledger delta data was acquired successfully
* @param hash hash of the ledger to build
*/
using OnDeltaDataCB =
std::function<void(bool successful, uint256 const& hash)>;
/**
* Constructor
* @param app Application reference
* @param inboundLedgers InboundLedgers reference
* @param ledgerHash hash of the ledger
* @param ledgerSeq sequence number of the ledger
* @param peerSet manage a set of peers that we will ask for the ledger
*/
LedgerDeltaAcquire(
Application& app,
InboundLedgers& inboundLedgers,
uint256 const& ledgerHash,
std::uint32_t ledgerSeq,
std::unique_ptr<PeerSet> peerSet);
~LedgerDeltaAcquire() override;
/**
* Start the LedgerDeltaAcquire task
* @param numPeers number of peers to try initially
*/
void
init(int numPeers);
/**
* Process the data extracted from a peer's reply
* @param info info (header) of the ledger
* @param orderedTxns set of Txns of the ledger
*
* @note info and Txns must have been verified against the ledger hash
*/
void
processData(
LedgerInfo const& info,
std::map<std::uint32_t, std::shared_ptr<STTx const>>&& orderedTxns);
/**
* Try to build the ledger if not already
* @param parent parent ledger
* @return the ledger if built, nullptr otherwise (e.g. waiting for peers'
* replies of the ledger info (header) and Txns.)
* @note may throw runtime_error if the replay failed due to data error
*/
std::shared_ptr<Ledger const>
tryBuild(std::shared_ptr<Ledger const> const& parent);
/**
* Add a reason and a callback to the LedgerDeltaAcquire subtask.
* The reason is used to process the ledger once it is replayed.
* The callback is called when the delta's data is ready or failed
* @note the callback will be called once and only once unless this object
* is destructed before the call.
*/
void
addDataCallback(InboundLedger::Reason reason, OnDeltaDataCB&& cb);
static char const*
getCountedObjectName()
{
return "LedgerDeltaAcquire";
}
private:
void
onTimer(bool progress, ScopedLockType& peerSetLock) override;
std::weak_ptr<TimeoutCounter>
pmDowncast() override;
/**
* Trigger another round
* @param limit number of new peers to send the request
* @param sl lock. this function must be called with the lock
*/
void
trigger(std::size_t limit, ScopedLockType& sl);
/**
* Process a newly built ledger, such as store it.
* @param sl lock. this function must be called with the lock
* @param reason specific new reason if any
* @note this function should be called (1) when the ledger is built the
* first time, and (2) when a LedgerReplayTask with a new reason
* is added.
*/
void
onLedgerBuilt(
ScopedLockType& sl,
std::optional<InboundLedger::Reason> reason = {});
/**
* Call the OnDeltaDataCB callbacks
* @param sl lock. this function must be called with the lock
*/
void
notify(ScopedLockType& sl);
InboundLedgers& inboundLedgers_;
std::uint32_t const ledgerSeq_;
std::unique_ptr<PeerSet> peerSet_;
std::shared_ptr<Ledger const> replayTemp_ = {};
std::shared_ptr<Ledger const> fullLedger_ = {};
std::map<std::uint32_t, std::shared_ptr<STTx const>> orderedTxns_;
std::vector<OnDeltaDataCB> dataReadyCallbacks_;
std::set<InboundLedger::Reason> reasons_;
std::uint32_t noFeaturePeerCount = 0;
bool fallBack_ = false;
friend class LedgerReplayTask; // for asserts only
friend class test::LedgerReplayClient;
};
} // namespace ripple
#endif

View File

@@ -20,6 +20,7 @@
#include <ripple/app/consensus/RCLValidations.h>
#include <ripple/app/ledger/Ledger.h>
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/ledger/LedgerReplayer.h>
#include <ripple/app/ledger/OpenLedger.h>
#include <ripple/app/ledger/OrderBookDB.h>
#include <ripple/app/ledger/PendingSaves.h>
@@ -1393,10 +1394,13 @@ LedgerMaster::findNewLedgersToPublish(
ledger = mLedgerHistory.getLedgerByHash(*hash);
}
// Can we try to acquire the ledger we need?
if (!ledger && (++acqCount < ledger_fetch_size_))
ledger = app_.getInboundLedgers().acquire(
*hash, seq, InboundLedger::Reason::GENERIC);
if (!app_.config().LEDGER_REPLAY)
{
// Can we try to acquire the ledger we need?
if (!ledger && (++acqCount < ledger_fetch_size_))
ledger = app_.getInboundLedgers().acquire(
*hash, seq, InboundLedger::Reason::GENERIC);
}
// Did we acquire the next ledger we need to publish?
if (ledger && (ledger->info().seq == pubSeq))
@@ -1416,6 +1420,43 @@ LedgerMaster::findNewLedgersToPublish(
<< "Exception while trying to find ledgers to publish.";
}
if (app_.config().LEDGER_REPLAY)
{
/* Narrow down the gap of ledgers, and try to replay them.
* When replaying a ledger gap, if the local node has
* the start ledger, it saves an expensive InboundLedger
* acquire. If the local node has the finish ledger, it
* saves a skip list acquire.
*/
auto const& startLedger = ret.empty() ? mPubLedger : ret.back();
auto finishLedger = valLedger;
while (startLedger->seq() + 1 < finishLedger->seq())
{
if (auto const parent = mLedgerHistory.getLedgerByHash(
finishLedger->info().parentHash);
parent)
{
finishLedger = parent;
}
else
{
auto numberLedgers =
finishLedger->seq() - startLedger->seq() + 1;
JLOG(m_journal.debug())
<< "Publish LedgerReplays " << numberLedgers
<< " ledgers, from seq=" << startLedger->info().seq << ", "
<< startLedger->info().hash
<< " to seq=" << finishLedger->info().seq << ", "
<< finishLedger->info().hash;
app_.getLedgerReplayer().replay(
InboundLedger::Reason::GENERIC,
finishLedger->info().hash,
numberLedgers);
break;
}
}
}
return ret;
}

View File

@@ -35,4 +35,14 @@ LedgerReplay::LedgerReplay(
}
}
LedgerReplay::LedgerReplay(
std::shared_ptr<Ledger const> parent,
std::shared_ptr<Ledger const> replay,
std::map<std::uint32_t, std::shared_ptr<STTx const>>&& orderedTxns)
: parent_{std::move(parent)}
, replay_{std::move(replay)}
, orderedTxns_{std::move(orderedTxns)}
{
}
} // namespace ripple

View File

@@ -0,0 +1,293 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/ledger/LedgerReplayer.h>
#include <ripple/app/ledger/impl/LedgerReplayMsgHandler.h>
#include <ripple/app/main/Application.h>
#include <memory>
namespace ripple {
LedgerReplayMsgHandler::LedgerReplayMsgHandler(
Application& app,
LedgerReplayer& replayer)
: app_(app)
, replayer_(replayer)
, journal_(app.journal("LedgerReplayMsgHandler"))
{
}
protocol::TMProofPathResponse
LedgerReplayMsgHandler::processProofPathRequest(
std::shared_ptr<protocol::TMProofPathRequest> const& msg)
{
protocol::TMProofPathRequest& packet = *msg;
protocol::TMProofPathResponse reply;
if (!packet.has_key() || !packet.has_ledgerhash() || !packet.has_type() ||
packet.ledgerhash().size() != uint256::size() ||
packet.key().size() != uint256::size() ||
!protocol::TMLedgerMapType_IsValid(packet.type()))
{
JLOG(journal_.debug()) << "getProofPath: Invalid request";
reply.set_error(protocol::TMReplyError::reBAD_REQUEST);
return reply;
}
reply.set_key(packet.key());
reply.set_ledgerhash(packet.ledgerhash());
reply.set_type(packet.type());
uint256 const key(packet.key());
uint256 const ledgerHash(packet.ledgerhash());
auto ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
if (!ledger)
{
JLOG(journal_.debug())
<< "getProofPath: Don't have ledger " << ledgerHash;
reply.set_error(protocol::TMReplyError::reNO_LEDGER);
return reply;
}
auto const path = [&]() -> std::optional<std::vector<Blob>> {
switch (packet.type())
{
case protocol::lmACCOUNT_STATE:
return ledger->stateMap().getProofPath(key);
case protocol::lmTRANASCTION:
return ledger->txMap().getProofPath(key);
default:
// should not be here
// because already tested with TMLedgerMapType_IsValid()
return {};
}
}();
if (!path)
{
JLOG(journal_.debug()) << "getProofPath: Don't have the node " << key
<< " of ledger " << ledgerHash;
reply.set_error(protocol::TMReplyError::reNO_NODE);
return reply;
}
// pack header
Serializer nData(128);
addRaw(ledger->info(), nData);
reply.set_ledgerheader(nData.getDataPtr(), nData.getLength());
// pack path
for (auto const& b : *path)
reply.add_path(b.data(), b.size());
JLOG(journal_.debug()) << "getProofPath for the node " << key
<< " of ledger " << ledgerHash << " path length "
<< path->size();
return reply;
}
bool
LedgerReplayMsgHandler::processProofPathResponse(
std::shared_ptr<protocol::TMProofPathResponse> const& msg)
{
protocol::TMProofPathResponse& reply = *msg;
if (reply.has_error() || !reply.has_key() || !reply.has_ledgerhash() ||
!reply.has_type() || !reply.has_ledgerheader() ||
reply.path_size() == 0)
{
JLOG(journal_.debug()) << "Bad message: Error reply";
return false;
}
if (reply.type() != protocol::lmACCOUNT_STATE)
{
JLOG(journal_.debug())
<< "Bad message: we only support the state ShaMap for now";
return false;
}
// deserialize the header
auto info = deserializeHeader(
{reply.ledgerheader().data(), reply.ledgerheader().size()});
uint256 replyHash(reply.ledgerhash());
if (calculateLedgerHash(info) != replyHash)
{
JLOG(journal_.debug()) << "Bad message: Hash mismatch";
return false;
}
info.hash = replyHash;
uint256 key(reply.key());
if (key != keylet::skip().key)
{
JLOG(journal_.debug())
<< "Bad message: we only support the short skip list for now. "
"Key in reply "
<< key;
return false;
}
// verify the skip list
std::vector<Blob> path;
path.reserve(reply.path_size());
for (int i = 0; i < reply.path_size(); ++i)
{
path.emplace_back(reply.path(i).begin(), reply.path(i).end());
}
if (!SHAMap::verifyProofPath(info.accountHash, key, path))
{
JLOG(journal_.debug()) << "Bad message: Proof path verify failed";
return false;
}
// deserialize the SHAMapItem
auto node = SHAMapTreeNode::makeFromWire(makeSlice(path.front()));
if (!node || !node->isLeaf())
{
JLOG(journal_.debug()) << "Bad message: Cannot deserialize";
return false;
}
auto item = static_cast<SHAMapLeafNode*>(node.get())->peekItem();
if (!item)
{
JLOG(journal_.debug()) << "Bad message: Cannot get ShaMapItem";
return false;
}
replayer_.gotSkipList(info, item);
return true;
}
protocol::TMReplayDeltaResponse
LedgerReplayMsgHandler::processReplayDeltaRequest(
std::shared_ptr<protocol::TMReplayDeltaRequest> const& msg)
{
protocol::TMReplayDeltaRequest& packet = *msg;
protocol::TMReplayDeltaResponse reply;
if (!packet.has_ledgerhash() ||
packet.ledgerhash().size() != uint256::size())
{
JLOG(journal_.debug()) << "getReplayDelta: Invalid request";
reply.set_error(protocol::TMReplyError::reBAD_REQUEST);
return reply;
}
reply.set_ledgerhash(packet.ledgerhash());
uint256 const ledgerHash{packet.ledgerhash()};
auto ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
if (!ledger || !ledger->isImmutable())
{
JLOG(journal_.debug())
<< "getReplayDelta: Don't have ledger " << ledgerHash;
reply.set_error(protocol::TMReplyError::reNO_LEDGER);
return reply;
}
// pack header
Serializer nData(128);
addRaw(ledger->info(), nData);
reply.set_ledgerheader(nData.getDataPtr(), nData.getLength());
// pack transactions
auto const& txMap = ledger->txMap();
txMap.visitLeaves([&](std::shared_ptr<SHAMapItem const> const& txNode) {
reply.add_transaction(txNode->data(), txNode->size());
});
JLOG(journal_.debug()) << "getReplayDelta for ledger " << ledgerHash
<< " txMap hash " << txMap.getHash().as_uint256();
return reply;
}
bool
LedgerReplayMsgHandler::processReplayDeltaResponse(
std::shared_ptr<protocol::TMReplayDeltaResponse> const& msg)
{
protocol::TMReplayDeltaResponse& reply = *msg;
if (reply.has_error() || !reply.has_ledgerheader())
{
JLOG(journal_.debug()) << "Bad message: Error reply";
return false;
}
auto info = deserializeHeader(
{reply.ledgerheader().data(), reply.ledgerheader().size()});
uint256 replyHash(reply.ledgerhash());
if (calculateLedgerHash(info) != replyHash)
{
JLOG(journal_.debug()) << "Bad message: Hash mismatch";
return false;
}
info.hash = replyHash;
auto numTxns = reply.transaction_size();
std::map<std::uint32_t, std::shared_ptr<STTx const>> orderedTxns;
SHAMap txMap(SHAMapType::TRANSACTION, app_.getNodeFamily());
try
{
for (int i = 0; i < numTxns; ++i)
{
// deserialize:
// -- TxShaMapItem for building a ShaMap for verification
// -- Tx
// -- TxMetaData for Tx ordering
Serializer shaMapItemData(
reply.transaction(i).data(), reply.transaction(i).size());
SerialIter txMetaSit(makeSlice(reply.transaction(i)));
SerialIter txSit(txMetaSit.getSlice(txMetaSit.getVLDataLength()));
SerialIter metaSit(txMetaSit.getSlice(txMetaSit.getVLDataLength()));
auto tx = std::make_shared<STTx const>(txSit);
if (!tx)
{
JLOG(journal_.debug()) << "Bad message: Cannot deserialize";
return false;
}
auto tid = tx->getTransactionID();
STObject meta(metaSit, sfMetadata);
orderedTxns.emplace(meta[sfTransactionIndex], std::move(tx));
auto item = std::make_shared<SHAMapItem const>(
tid, std::move(shaMapItemData));
if (!item ||
!txMap.addGiveItem(SHAMapNodeType::tnTRANSACTION_MD, item))
{
JLOG(journal_.debug()) << "Bad message: Cannot deserialize";
return false;
}
}
}
catch (std::exception const&)
{
JLOG(journal_.debug()) << "Bad message: Cannot deserialize";
return false;
}
if (txMap.getHash().as_uint256() != info.txHash)
{
JLOG(journal_.debug()) << "Bad message: Transactions verify failed";
return false;
}
replayer_.gotReplayDelta(info, std::move(orderedTxns));
return true;
}
} // namespace ripple

View File

@@ -0,0 +1,78 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_APP_LEDGER_LEDGERREPLAYMSGHANDLER_H_INCLUDED
#define RIPPLE_APP_LEDGER_LEDGERREPLAYMSGHANDLER_H_INCLUDED
#include <ripple/beast/utility/Journal.h>
#include <ripple/protocol/messages.h>
namespace ripple {
class Application;
class LedgerReplayer;
class LedgerReplayMsgHandler final
{
public:
LedgerReplayMsgHandler(Application& app, LedgerReplayer& replayer);
~LedgerReplayMsgHandler() = default;
/**
* Process TMProofPathRequest and return TMProofPathResponse
* @note check has_error() and error() of the response for error
*/
protocol::TMProofPathResponse
processProofPathRequest(
std::shared_ptr<protocol::TMProofPathRequest> const& msg);
/**
* Process TMProofPathResponse
* @return false if the response message has bad format or bad data;
* true otherwise
*/
bool
processProofPathResponse(
std::shared_ptr<protocol::TMProofPathResponse> const& msg);
/**
* Process TMReplayDeltaRequest and return TMReplayDeltaResponse
* @note check has_error() and error() of the response for error
*/
protocol::TMReplayDeltaResponse
processReplayDeltaRequest(
std::shared_ptr<protocol::TMReplayDeltaRequest> const& msg);
/**
* Process TMReplayDeltaResponse
* @return false if the response message has bad format or bad data;
* true otherwise
*/
bool
processReplayDeltaResponse(
std::shared_ptr<protocol::TMReplayDeltaResponse> const& msg);
private:
Application& app_;
LedgerReplayer& replayer_;
beast::Journal journal_;
};
} // namespace ripple
#endif

View File

@@ -0,0 +1,306 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/app/ledger/InboundLedgers.h>
#include <ripple/app/ledger/LedgerReplayTask.h>
#include <ripple/app/ledger/LedgerReplayer.h>
#include <ripple/app/ledger/impl/LedgerDeltaAcquire.h>
#include <ripple/app/ledger/impl/SkipListAcquire.h>
#include <ripple/core/JobQueue.h>
namespace ripple {
LedgerReplayTask::TaskParameter::TaskParameter(
InboundLedger::Reason r,
uint256 const& finishLedgerHash,
std::uint32_t totalNumLedgers)
: reason_(r), finishHash_(finishLedgerHash), totalLedgers_(totalNumLedgers)
{
assert(finishLedgerHash.isNonZero() && totalNumLedgers > 0);
}
bool
LedgerReplayTask::TaskParameter::update(
uint256 const& hash,
std::uint32_t seq,
std::vector<uint256> const& sList)
{
if (finishHash_ != hash || sList.size() + 1 < totalLedgers_ || full_)
return false;
finishSeq_ = seq;
skipList_ = sList;
skipList_.emplace_back(finishHash_);
startHash_ = skipList_[skipList_.size() - totalLedgers_];
assert(startHash_.isNonZero());
startSeq_ = finishSeq_ - totalLedgers_ + 1;
full_ = true;
return true;
}
bool
LedgerReplayTask::TaskParameter::canMergeInto(
TaskParameter const& existingTask) const
{
if (reason_ == existingTask.reason_)
{
if (finishHash_ == existingTask.finishHash_ &&
totalLedgers_ <= existingTask.totalLedgers_)
{
return true;
}
if (existingTask.full_)
{
auto const& exList = existingTask.skipList_;
if (auto i = std::find(exList.begin(), exList.end(), finishHash_);
i != exList.end())
{
return existingTask.totalLedgers_ >=
totalLedgers_ + (exList.end() - i) - 1;
}
}
}
return false;
}
LedgerReplayTask::LedgerReplayTask(
Application& app,
InboundLedgers& inboundLedgers,
LedgerReplayer& replayer,
std::shared_ptr<SkipListAcquire>& skipListAcquirer,
TaskParameter&& parameter)
: TimeoutCounter(
app,
parameter.finishHash_,
LedgerReplayParameters::TASK_TIMEOUT,
{jtREPLAY_TASK,
"LedgerReplayTask",
LedgerReplayParameters::MAX_QUEUED_TASKS},
app.journal("LedgerReplayTask"))
, inboundLedgers_(inboundLedgers)
, replayer_(replayer)
, parameter_(parameter)
, maxTimeouts_(std::max(
LedgerReplayParameters::TASK_MAX_TIMEOUTS_MINIMUM,
parameter.totalLedgers_ *
LedgerReplayParameters::TASK_MAX_TIMEOUTS_MULTIPLIER))
, skipListAcquirer_(skipListAcquirer)
{
JLOG(journal_.trace()) << "Create " << hash_;
}
LedgerReplayTask::~LedgerReplayTask()
{
JLOG(journal_.trace()) << "Destroy " << hash_;
}
void
LedgerReplayTask::init()
{
JLOG(journal_.debug()) << "Task start " << hash_;
std::weak_ptr<LedgerReplayTask> wptr = shared_from_this();
skipListAcquirer_->addDataCallback([wptr](bool good, uint256 const& hash) {
if (auto sptr = wptr.lock(); sptr)
{
if (!good)
{
sptr->cancel();
}
else
{
auto const skipListData = sptr->skipListAcquirer_->getData();
sptr->updateSkipList(
hash, skipListData->ledgerSeq, skipListData->skipList);
}
}
});
ScopedLockType sl(mtx_);
if (!isDone())
{
trigger(sl);
setTimer(sl);
}
}
void
LedgerReplayTask::trigger(ScopedLockType& sl)
{
JLOG(journal_.trace()) << "trigger " << hash_;
if (!parameter_.full_)
return;
if (!parent_)
{
parent_ = app_.getLedgerMaster().getLedgerByHash(parameter_.startHash_);
if (!parent_)
{
parent_ = inboundLedgers_.acquire(
parameter_.startHash_,
parameter_.startSeq_,
InboundLedger::Reason::GENERIC);
}
if (parent_)
{
JLOG(journal_.trace())
<< "Got start ledger " << parameter_.startHash_ << " for task "
<< hash_;
}
}
tryAdvance(sl);
}
void
LedgerReplayTask::deltaReady(uint256 const& deltaHash)
{
JLOG(journal_.trace()) << "Delta " << deltaHash << " ready for task "
<< hash_;
ScopedLockType sl(mtx_);
if (!isDone())
tryAdvance(sl);
}
void
LedgerReplayTask::tryAdvance(ScopedLockType& sl)
{
JLOG(journal_.trace()) << "tryAdvance task " << hash_
<< (parameter_.full_ ? ", full parameter"
: ", waiting to fill parameter")
<< ", deltaIndex=" << deltaToBuild_
<< ", totalDeltas=" << deltas_.size() << ", parent "
<< (parent_ ? parent_->info().hash : uint256());
bool shouldTry = parent_ && parameter_.full_ &&
parameter_.totalLedgers_ - 1 == deltas_.size();
if (!shouldTry)
return;
try
{
for (; deltaToBuild_ < deltas_.size(); ++deltaToBuild_)
{
auto& delta = deltas_[deltaToBuild_];
assert(parent_->seq() + 1 == delta->ledgerSeq_);
if (auto l = delta->tryBuild(parent_); l)
{
JLOG(journal_.debug())
<< "Task " << hash_ << " got ledger " << l->info().hash
<< " deltaIndex=" << deltaToBuild_
<< " totalDeltas=" << deltas_.size();
parent_ = l;
}
else
return;
}
complete_ = true;
JLOG(journal_.info()) << "Completed " << hash_;
}
catch (std::runtime_error const&)
{
failed_ = true;
}
}
void
LedgerReplayTask::updateSkipList(
uint256 const& hash,
std::uint32_t seq,
std::vector<uint256> const& sList)
{
{
ScopedLockType sl(mtx_);
if (isDone())
return;
if (!parameter_.update(hash, seq, sList))
{
JLOG(journal_.error()) << "Parameter update failed " << hash_;
failed_ = true;
return;
}
}
replayer_.createDeltas(shared_from_this());
ScopedLockType sl(mtx_);
if (!isDone())
trigger(sl);
}
void
LedgerReplayTask::onTimer(bool progress, ScopedLockType& sl)
{
JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_;
if (timeouts_ > maxTimeouts_)
{
failed_ = true;
JLOG(journal_.debug())
<< "LedgerReplayTask Failed, too many timeouts " << hash_;
}
else
{
trigger(sl);
}
}
std::weak_ptr<TimeoutCounter>
LedgerReplayTask::pmDowncast()
{
return shared_from_this();
}
void
LedgerReplayTask::addDelta(std::shared_ptr<LedgerDeltaAcquire> const& delta)
{
std::weak_ptr<LedgerReplayTask> wptr = shared_from_this();
delta->addDataCallback(
parameter_.reason_, [wptr](bool good, uint256 const& hash) {
if (auto sptr = wptr.lock(); sptr)
{
if (!good)
sptr->cancel();
else
sptr->deltaReady(hash);
}
});
ScopedLockType sl(mtx_);
if (!isDone())
{
JLOG(journal_.trace())
<< "addDelta task " << hash_ << " deltaIndex=" << deltaToBuild_
<< " totalDeltas=" << deltas_.size();
assert(
deltas_.empty() ||
deltas_.back()->ledgerSeq_ + 1 == delta->ledgerSeq_);
deltas_.push_back(delta);
}
}
bool
LedgerReplayTask::finished() const
{
ScopedLockType sl(mtx_);
return isDone();
}
} // namespace ripple

View File

@@ -0,0 +1,283 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/app/ledger/LedgerReplayer.h>
#include <ripple/app/ledger/impl/LedgerDeltaAcquire.h>
#include <ripple/app/ledger/impl/SkipListAcquire.h>
#include <ripple/core/JobQueue.h>
namespace ripple {
LedgerReplayer::LedgerReplayer(
Application& app,
InboundLedgers& inboundLedgers,
std::unique_ptr<PeerSetBuilder> peerSetBuilder,
Stoppable& parent)
: Stoppable("LedgerReplayer", parent)
, app_(app)
, inboundLedgers_(inboundLedgers)
, peerSetBuilder_(std::move(peerSetBuilder))
, j_(app.journal("LedgerReplayer"))
{
}
LedgerReplayer::~LedgerReplayer()
{
std::lock_guard<std::mutex> lock(mtx_);
tasks_.clear();
}
void
LedgerReplayer::replay(
InboundLedger::Reason r,
uint256 const& finishLedgerHash,
std::uint32_t totalNumLedgers)
{
assert(
finishLedgerHash.isNonZero() && totalNumLedgers > 0 &&
totalNumLedgers <= LedgerReplayParameters::MAX_TASK_SIZE);
LedgerReplayTask::TaskParameter parameter(
r, finishLedgerHash, totalNumLedgers);
std::shared_ptr<LedgerReplayTask> task;
std::shared_ptr<SkipListAcquire> skipList;
bool newSkipList = false;
{
std::lock_guard<std::mutex> lock(mtx_);
if (isStopping())
return;
if (tasks_.size() >= LedgerReplayParameters::MAX_TASKS)
{
JLOG(j_.info()) << "Too many replay tasks, dropping new task "
<< parameter.finishHash_;
return;
}
for (auto const& t : tasks_)
{
if (parameter.canMergeInto(t->getTaskParameter()))
{
JLOG(j_.info()) << "Task " << parameter.finishHash_ << " with "
<< totalNumLedgers
<< " ledgers merged into an existing task.";
return;
}
}
JLOG(j_.info()) << "Replay " << totalNumLedgers
<< " ledgers. Finish ledger hash "
<< parameter.finishHash_;
auto i = skipLists_.find(parameter.finishHash_);
if (i != skipLists_.end())
skipList = i->second.lock();
if (!skipList) // cannot find, or found but cannot lock
{
skipList = std::make_shared<SkipListAcquire>(
app_,
inboundLedgers_,
parameter.finishHash_,
peerSetBuilder_->build());
skipLists_[parameter.finishHash_] = skipList;
newSkipList = true;
}
task = std::make_shared<LedgerReplayTask>(
app_, inboundLedgers_, *this, skipList, std::move(parameter));
tasks_.push_back(task);
}
if (newSkipList)
skipList->init(1);
// task init after skipList init, could save a timeout
task->init();
}
void
LedgerReplayer::createDeltas(std::shared_ptr<LedgerReplayTask> task)
{
{
// TODO for use cases like Consensus (i.e. totalLedgers = 1 or small):
// check if the last closed or validated ledger l the local node has
// is in the skip list and is an ancestor of parameter.startLedger
// that has to be downloaded, if so expand the task to start with l.
}
auto const& parameter = task->getTaskParameter();
JLOG(j_.trace()) << "Creating " << parameter.totalLedgers_ - 1 << " deltas";
if (parameter.totalLedgers_ > 1)
{
auto skipListItem = std::find(
parameter.skipList_.begin(),
parameter.skipList_.end(),
parameter.startHash_);
if (skipListItem == parameter.skipList_.end() ||
++skipListItem == parameter.skipList_.end())
{
JLOG(j_.error()) << "Task parameter error when creating deltas "
<< parameter.finishHash_;
return;
}
for (std::uint32_t seq = parameter.startSeq_ + 1;
seq <= parameter.finishSeq_ &&
skipListItem != parameter.skipList_.end();
++seq, ++skipListItem)
{
std::shared_ptr<LedgerDeltaAcquire> delta;
bool newDelta = false;
{
std::lock_guard<std::mutex> lock(mtx_);
if (isStopping())
return;
auto i = deltas_.find(*skipListItem);
if (i != deltas_.end())
delta = i->second.lock();
if (!delta) // cannot find, or found but cannot lock
{
delta = std::make_shared<LedgerDeltaAcquire>(
app_,
inboundLedgers_,
*skipListItem,
seq,
peerSetBuilder_->build());
deltas_[*skipListItem] = delta;
newDelta = true;
}
}
task->addDelta(delta);
if (newDelta)
delta->init(1);
}
}
}
void
LedgerReplayer::gotSkipList(
LedgerInfo const& info,
std::shared_ptr<SHAMapItem const> const& item)
{
std::shared_ptr<SkipListAcquire> skipList = {};
{
std::lock_guard<std::mutex> lock(mtx_);
auto i = skipLists_.find(info.hash);
if (i == skipLists_.end())
return;
skipList = i->second.lock();
if (!skipList)
{
skipLists_.erase(i);
return;
}
}
if (skipList)
skipList->processData(info.seq, item);
}
void
LedgerReplayer::gotReplayDelta(
LedgerInfo const& info,
std::map<std::uint32_t, std::shared_ptr<STTx const>>&& txns)
{
std::shared_ptr<LedgerDeltaAcquire> delta = {};
{
std::lock_guard<std::mutex> lock(mtx_);
auto i = deltas_.find(info.hash);
if (i == deltas_.end())
return;
delta = i->second.lock();
if (!delta)
{
deltas_.erase(i);
return;
}
}
if (delta)
delta->processData(info, std::move(txns));
}
void
LedgerReplayer::sweep()
{
std::lock_guard<std::mutex> lock(mtx_);
JLOG(j_.debug()) << "Sweeping, LedgerReplayer has " << tasks_.size()
<< " tasks, " << skipLists_.size() << " skipLists, and "
<< deltas_.size() << " deltas.";
tasks_.erase(
std::remove_if(
tasks_.begin(),
tasks_.end(),
[this](auto const& t) -> bool {
if (t->finished())
{
JLOG(j_.debug())
<< "Sweep task " << t->getTaskParameter().finishHash_;
return true;
}
return false;
}),
tasks_.end());
auto removeCannotLocked = [](auto& subTasks) {
for (auto it = subTasks.begin(); it != subTasks.end();)
{
if (auto item = it->second.lock(); !item)
{
it = subTasks.erase(it);
}
else
++it;
}
};
removeCannotLocked(skipLists_);
removeCannotLocked(deltas_);
}
void
LedgerReplayer::onStop()
{
JLOG(j_.info()) << "Stopping...";
{
std::lock_guard<std::mutex> lock(mtx_);
std::for_each(
tasks_.begin(), tasks_.end(), [](auto& i) { i->cancel(); });
tasks_.clear();
auto lockAndCancel = [](auto& i) {
if (auto sptr = i.second.lock(); sptr)
{
sptr->cancel();
}
};
std::for_each(skipLists_.begin(), skipLists_.end(), lockAndCancel);
skipLists_.clear();
std::for_each(deltas_.begin(), deltas_.end(), lockAndCancel);
deltas_.clear();
}
stopped();
JLOG(j_.info()) << "Stopped";
}
} // namespace ripple

View File

@@ -0,0 +1,241 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/app/ledger/InboundLedger.h>
#include <ripple/app/ledger/LedgerReplayer.h>
#include <ripple/app/ledger/impl/SkipListAcquire.h>
#include <ripple/app/main/Application.h>
#include <ripple/core/JobQueue.h>
#include <ripple/overlay/PeerSet.h>
namespace ripple {
SkipListAcquire::SkipListAcquire(
Application& app,
InboundLedgers& inboundLedgers,
uint256 const& ledgerHash,
std::unique_ptr<PeerSet> peerSet)
: TimeoutCounter(
app,
ledgerHash,
LedgerReplayParameters::SUB_TASK_TIMEOUT,
{jtREPLAY_TASK,
"SkipListAcquire",
LedgerReplayParameters::MAX_QUEUED_TASKS},
app.journal("LedgerReplaySkipList"))
, inboundLedgers_(inboundLedgers)
, peerSet_(std::move(peerSet))
{
JLOG(journal_.trace()) << "Create " << hash_;
}
SkipListAcquire::~SkipListAcquire()
{
JLOG(journal_.trace()) << "Destroy " << hash_;
}
void
SkipListAcquire::init(int numPeers)
{
ScopedLockType sl(mtx_);
if (!isDone())
{
trigger(numPeers, sl);
setTimer(sl);
}
}
void
SkipListAcquire::trigger(std::size_t limit, ScopedLockType& sl)
{
if (auto const l = app_.getLedgerMaster().getLedgerByHash(hash_); l)
{
JLOG(journal_.trace()) << "existing ledger " << hash_;
retrieveSkipList(l, sl);
return;
}
if (!fallBack_)
{
peerSet_->addPeers(
limit,
[this](auto peer) {
return peer->supportsFeature(ProtocolFeature::LedgerReplay) &&
peer->hasLedger(hash_, 0);
},
[this](auto peer) {
if (peer->supportsFeature(ProtocolFeature::LedgerReplay))
{
JLOG(journal_.trace())
<< "Add a peer " << peer->id() << " for " << hash_;
protocol::TMProofPathRequest request;
request.set_ledgerhash(hash_.data(), hash_.size());
request.set_key(
keylet::skip().key.data(), keylet::skip().key.size());
request.set_type(
protocol::TMLedgerMapType::lmACCOUNT_STATE);
peerSet_->sendRequest(request, peer);
}
else
{
JLOG(journal_.trace()) << "Add a no feature peer "
<< peer->id() << " for " << hash_;
if (++noFeaturePeerCount_ >=
LedgerReplayParameters::MAX_NO_FEATURE_PEER_COUNT)
{
JLOG(journal_.debug()) << "Fall back for " << hash_;
timerInterval_ =
LedgerReplayParameters::SUB_TASK_FALLBACK_TIMEOUT;
fallBack_ = true;
}
}
});
}
if (fallBack_)
inboundLedgers_.acquire(hash_, 0, InboundLedger::Reason::GENERIC);
}
void
SkipListAcquire::onTimer(bool progress, ScopedLockType& sl)
{
JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_;
if (timeouts_ > LedgerReplayParameters::SUB_TASK_MAX_TIMEOUTS)
{
failed_ = true;
JLOG(journal_.debug()) << "too many timeouts " << hash_;
notify(sl);
}
else
{
trigger(1, sl);
}
}
std::weak_ptr<TimeoutCounter>
SkipListAcquire::pmDowncast()
{
return shared_from_this();
}
void
SkipListAcquire::processData(
std::uint32_t ledgerSeq,
std::shared_ptr<SHAMapItem const> const& item)
{
assert(ledgerSeq != 0 && item);
ScopedLockType sl(mtx_);
if (isDone())
return;
JLOG(journal_.trace()) << "got data for " << hash_;
try
{
if (auto sle = std::make_shared<SLE>(
SerialIter{item->data(), item->size()}, item->key());
sle)
{
if (auto const& skipList = sle->getFieldV256(sfHashes).value();
!skipList.empty())
onSkipListAcquired(skipList, ledgerSeq, sl);
return;
}
}
catch (...)
{
}
failed_ = true;
JLOG(journal_.error()) << "failed to retrieve Skip list from verified data "
<< hash_;
notify(sl);
}
void
SkipListAcquire::addDataCallback(OnSkipListDataCB&& cb)
{
ScopedLockType sl(mtx_);
dataReadyCallbacks_.emplace_back(std::move(cb));
if (isDone())
{
JLOG(journal_.debug())
<< "task added to a finished SkipListAcquire " << hash_;
notify(sl);
}
}
std::shared_ptr<SkipListAcquire::SkipListData const>
SkipListAcquire::getData() const
{
ScopedLockType sl(mtx_);
return data_;
}
void
SkipListAcquire::retrieveSkipList(
std::shared_ptr<Ledger const> const& ledger,
ScopedLockType& sl)
{
if (auto const hashIndex = ledger->read(keylet::skip());
hashIndex && hashIndex->isFieldPresent(sfHashes))
{
auto const& slist = hashIndex->getFieldV256(sfHashes).value();
if (!slist.empty())
{
onSkipListAcquired(slist, ledger->seq(), sl);
return;
}
}
failed_ = true;
JLOG(journal_.error()) << "failed to retrieve Skip list from a ledger "
<< hash_;
notify(sl);
}
void
SkipListAcquire::onSkipListAcquired(
std::vector<uint256> const& skipList,
std::uint32_t ledgerSeq,
ScopedLockType& sl)
{
complete_ = true;
data_ = std::make_shared<SkipListData>(ledgerSeq, skipList);
JLOG(journal_.debug()) << "Skip list acquired " << hash_;
notify(sl);
}
void
SkipListAcquire::notify(ScopedLockType& sl)
{
assert(isDone());
std::vector<OnSkipListDataCB> toCall;
std::swap(toCall, dataReadyCallbacks_);
auto const good = !failed_;
sl.unlock();
for (auto& cb : toCall)
{
cb(good, hash_);
}
sl.lock();
}
} // namespace ripple

View File

@@ -0,0 +1,174 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_APP_LEDGER_SKIPLISTACQUIRE_H_INCLUDED
#define RIPPLE_APP_LEDGER_SKIPLISTACQUIRE_H_INCLUDED
#include <ripple/app/ledger/InboundLedger.h>
#include <ripple/app/ledger/Ledger.h>
#include <ripple/app/ledger/impl/TimeoutCounter.h>
#include <ripple/app/main/Application.h>
#include <ripple/shamap/SHAMap.h>
#include <queue>
namespace ripple {
class InboundLedgers;
class PeerSet;
namespace test {
class LedgerReplayClient;
} // namespace test
/**
* Manage the retrieval of a skip list in a ledger from the network.
* Before asking peers, always check if the local node has the ledger.
*/
class SkipListAcquire final
: public TimeoutCounter,
public std::enable_shared_from_this<SkipListAcquire>,
public CountedObject<SkipListAcquire>
{
public:
/**
* A callback used to notify that the SkipList is ready or failed.
* @param successful if the skipList data was acquired successfully
* @param hash hash of the ledger that has the skipList
*/
using OnSkipListDataCB =
std::function<void(bool successful, uint256 const& hash)>;
struct SkipListData
{
std::uint32_t const ledgerSeq;
std::vector<ripple::uint256> const skipList;
SkipListData(
std::uint32_t const ledgerSeq,
std::vector<ripple::uint256> const& skipList)
: ledgerSeq(ledgerSeq), skipList(skipList)
{
}
};
/**
* Constructor
* @param app Application reference
* @param inboundLedgers InboundLedgers reference
* @param ledgerHash hash of the ledger that has the skip list
* @param peerSet manage a set of peers that we will ask for the skip list
*/
SkipListAcquire(
Application& app,
InboundLedgers& inboundLedgers,
uint256 const& ledgerHash,
std::unique_ptr<PeerSet> peerSet);
~SkipListAcquire() override;
/**
* Start the SkipListAcquire task
* @param numPeers number of peers to try initially
*/
void
init(int numPeers);
/**
* Process the data extracted from a peer's reply
* @param ledgerSeq sequence number of the ledger that has the skip list
* @param item holder of the skip list
* @note ledgerSeq and item must have been verified against the ledger hash
*/
void
processData(
std::uint32_t ledgerSeq,
std::shared_ptr<SHAMapItem const> const& item);
/**
* Add a callback that will be called when the skipList is ready or failed.
* @note the callback will be called once and only once unless this object
* is destructed before the call.
*/
void
addDataCallback(OnSkipListDataCB&& cb);
std::shared_ptr<SkipListData const>
getData() const;
static char const*
getCountedObjectName()
{
return "SkipListAcquire";
}
private:
void
onTimer(bool progress, ScopedLockType& peerSetLock) override;
std::weak_ptr<TimeoutCounter>
pmDowncast() override;
/**
* Trigger another round
* @param limit number of new peers to send the request
* @param sl lock. this function must be called with the lock
*/
void
trigger(std::size_t limit, ScopedLockType& sl);
/**
* Retrieve the skip list from the ledger
* @param ledger the ledger that has the skip list
* @param sl lock. this function must be called with the lock
*/
void
retrieveSkipList(
std::shared_ptr<Ledger const> const& ledger,
ScopedLockType& sl);
/**
* Process the skip list
* @param skipList skip list
* @param ledgerSeq sequence number of the ledger that has the skip list
* @param sl lock. this function must be called with the lock
*/
void
onSkipListAcquired(
std::vector<uint256> const& skipList,
std::uint32_t ledgerSeq,
ScopedLockType& sl);
/**
* Call the OnSkipListDataCB callbacks
* @param sl lock. this function must be called with the lock
*/
void
notify(ScopedLockType& sl);
InboundLedgers& inboundLedgers_;
std::unique_ptr<PeerSet> peerSet_;
std::vector<OnSkipListDataCB> dataReadyCallbacks_;
std::shared_ptr<SkipListData const> data_;
std::uint32_t noFeaturePeerCount_ = 0;
bool fallBack_ = false;
friend class test::LedgerReplayClient;
};
} // namespace ripple
#endif

View File

@@ -0,0 +1,128 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/app/ledger/impl/TimeoutCounter.h>
#include <ripple/app/main/Application.h>
#include <ripple/core/JobQueue.h>
#include <ripple/overlay/Overlay.h>
namespace ripple {
using namespace std::chrono_literals;
TimeoutCounter::TimeoutCounter(
Application& app,
uint256 const& hash,
std::chrono::milliseconds interval,
QueueJobParameter&& jobParameter,
beast::Journal journal)
: app_(app)
, journal_(journal)
, hash_(hash)
, timeouts_(0)
, complete_(false)
, failed_(false)
, progress_(false)
, timerInterval_(interval)
, queueJobParameter_(std::move(jobParameter))
, timer_(app_.getIOService())
{
assert((timerInterval_ > 10ms) && (timerInterval_ < 30s));
}
void
TimeoutCounter::setTimer(ScopedLockType& sl)
{
if (isDone())
return;
timer_.expires_after(timerInterval_);
timer_.async_wait(
[wptr = pmDowncast()](boost::system::error_code const& ec) {
if (ec == boost::asio::error::operation_aborted)
return;
if (auto ptr = wptr.lock())
{
ScopedLockType sl(ptr->mtx_);
ptr->queueJob(sl);
}
});
}
void
TimeoutCounter::queueJob(ScopedLockType& sl)
{
if (isDone())
return;
if (queueJobParameter_.jobLimit &&
app_.getJobQueue().getJobCountTotal(queueJobParameter_.jobType) >=
queueJobParameter_.jobLimit)
{
JLOG(journal_.debug()) << "Deferring " << queueJobParameter_.jobName
<< " timer due to load";
setTimer(sl);
return;
}
app_.getJobQueue().addJob(
queueJobParameter_.jobType,
queueJobParameter_.jobName,
[wptr = pmDowncast()](Job&) {
if (auto sptr = wptr.lock(); sptr)
sptr->invokeOnTimer();
});
}
void
TimeoutCounter::invokeOnTimer()
{
ScopedLockType sl(mtx_);
if (isDone())
return;
if (!progress_)
{
++timeouts_;
JLOG(journal_.debug()) << "Timeout(" << timeouts_ << ") "
<< " acquiring " << hash_;
onTimer(false, sl);
}
else
{
progress_ = false;
onTimer(true, sl);
}
if (!isDone())
setTimer(sl);
}
void
TimeoutCounter::cancel()
{
ScopedLockType sl(mtx_);
if (!isDone())
{
failed_ = true;
JLOG(journal_.info()) << "Cancel " << hash_;
}
}
} // namespace ripple

View File

@@ -0,0 +1,152 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_APP_LEDGER_TIMEOUTCOUNTER_H_INCLUDED
#define RIPPLE_APP_LEDGER_TIMEOUTCOUNTER_H_INCLUDED
#include <ripple/app/main/Application.h>
#include <ripple/beast/clock/abstract_clock.h>
#include <ripple/beast/utility/Journal.h>
#include <ripple/core/Job.h>
#include <boost/asio/basic_waitable_timer.hpp>
#include <mutex>
namespace ripple {
/**
This class is an "active" object. It maintains its own timer
and dispatches work to a job queue. Implementations derive
from this class and override the abstract hook functions in
the base.
This class implements an asynchronous loop:
1. The entry point is `setTimer`.
2. After `mTimerInterval`, `queueJob` is called, which schedules a job to
call `invokeOnTimer` (or loops back to setTimer if there are too many
concurrent jobs).
3. The job queue calls `invokeOnTimer` which either breaks the loop if
`isDone` or calls `onTimer`.
4. `onTimer` is the only "real" virtual method in this class. It is the
callback for when the timeout expires. Generally, its only responsibility
is to set `mFailed = true`. However, if it wants to implement a policy of
retries, then it has a chance to just increment a count of expired
timeouts.
5. Once `onTimer` returns, if the object is still not `isDone`, then
`invokeOnTimer` sets another timeout by looping back to setTimer.
This loop executes concurrently with another asynchronous sequence,
implemented by the subtype, that is trying to make progress and eventually
set `mComplete = true`. While it is making progress but not complete, it
should set `mProgress = true`, which is passed to onTimer so it can decide
whether to postpone failure and reset the timeout. However, if it can
complete all its work in one synchronous step (while it holds the lock), then
it can ignore `mProgress`.
*/
class TimeoutCounter
{
public:
/**
* Cancel the task by marking it as failed if the task is not done.
* @note this function does not attempt to cancel the scheduled timer or
* to remove the queued job if any. When the timer expires or
* the queued job starts, however, the code will see that
* the task is done and returns immediately, if it can lock
* the weak pointer of the task.
*/
virtual void
cancel();
protected:
using ScopedLockType = std::unique_lock<std::recursive_mutex>;
struct QueueJobParameter
{
JobType jobType;
std::string jobName;
std::optional<std::uint32_t> jobLimit;
};
TimeoutCounter(
Application& app,
uint256 const& targetHash,
std::chrono::milliseconds timeoutInterval,
QueueJobParameter&& jobParameter,
beast::Journal journal);
virtual ~TimeoutCounter() = default;
/** Schedule a call to queueJob() after mTimerInterval. */
void
setTimer(ScopedLockType&);
/** Queue a job to call invokeOnTimer(). */
void
queueJob(ScopedLockType&);
/** Hook called from invokeOnTimer(). */
virtual void
onTimer(bool progress, ScopedLockType&) = 0;
/** Return a weak pointer to this. */
virtual std::weak_ptr<TimeoutCounter>
pmDowncast() = 0;
bool
isDone() const
{
return complete_ || failed_;
}
// Used in this class for access to boost::asio::io_service and
// ripple::Overlay. Used in subtypes for the kitchen sink.
Application& app_;
beast::Journal journal_;
mutable std::recursive_mutex mtx_;
/** The hash of the object (in practice, always a ledger) we are trying to
* fetch. */
uint256 const hash_;
int timeouts_;
bool complete_;
bool failed_;
/** Whether forward progress has been made. */
bool progress_;
/** The minimum time to wait between calls to execute(). */
std::chrono::milliseconds timerInterval_;
QueueJobParameter queueJobParameter_;
private:
/** Calls onTimer() if in the right state.
* Only called by queueJob().
*/
void
invokeOnTimer();
boost::asio::basic_waitable_timer<std::chrono::steady_clock> timer_;
};
} // namespace ripple
#endif

View File

@@ -24,6 +24,7 @@
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/overlay/impl/ProtocolMessage.h>
#include <memory>
@@ -39,39 +40,39 @@ enum {
MAX_TIMEOUTS = 20,
};
TransactionAcquire::TransactionAcquire(Application& app, uint256 const& hash)
: PeerSet(app, hash, TX_ACQUIRE_TIMEOUT, app.journal("TransactionAcquire"))
TransactionAcquire::TransactionAcquire(
Application& app,
uint256 const& hash,
std::unique_ptr<PeerSet> peerSet)
: TimeoutCounter(
app,
hash,
TX_ACQUIRE_TIMEOUT,
{jtTXN_DATA, "TransactionAcquire", {}},
app.journal("TransactionAcquire"))
, mHaveRoot(false)
, mPeerSet(std::move(peerSet))
{
mMap = std::make_shared<SHAMap>(
SHAMapType::TRANSACTION, hash, app_.getNodeFamily());
mMap->setUnbacked();
}
void
TransactionAcquire::queueJob()
{
app_.getJobQueue().addJob(
jtTXN_DATA, "TransactionAcquire", [ptr = shared_from_this()](Job&) {
ptr->invokeOnTimer();
});
}
void
TransactionAcquire::done()
{
// We hold a PeerSet lock and so cannot do real work here
if (mFailed)
if (failed_)
{
JLOG(m_journal.warn()) << "Failed to acquire TX set " << mHash;
JLOG(journal_.warn()) << "Failed to acquire TX set " << hash_;
}
else
{
JLOG(m_journal.debug()) << "Acquired TX set " << mHash;
JLOG(journal_.debug()) << "Acquired TX set " << hash_;
mMap->setImmutable();
uint256 const& hash(mHash);
uint256 const& hash(hash_);
std::shared_ptr<SHAMap> const& map(mMap);
auto const pap = &app_;
// Note that, when we're in the process of shutting down, addJob()
@@ -89,20 +90,20 @@ TransactionAcquire::done()
void
TransactionAcquire::onTimer(bool progress, ScopedLockType& psl)
{
if (mTimeouts > MAX_TIMEOUTS)
if (timeouts_ > MAX_TIMEOUTS)
{
mFailed = true;
failed_ = true;
done();
return;
}
if (mTimeouts >= NORM_TIMEOUTS)
if (timeouts_ >= NORM_TIMEOUTS)
trigger(nullptr);
addPeers(1);
}
std::weak_ptr<PeerSet>
std::weak_ptr<TimeoutCounter>
TransactionAcquire::pmDowncast()
{
return shared_from_this();
@@ -111,35 +112,35 @@ TransactionAcquire::pmDowncast()
void
TransactionAcquire::trigger(std::shared_ptr<Peer> const& peer)
{
if (mComplete)
if (complete_)
{
JLOG(m_journal.info()) << "trigger after complete";
JLOG(journal_.info()) << "trigger after complete";
return;
}
if (mFailed)
if (failed_)
{
JLOG(m_journal.info()) << "trigger after fail";
JLOG(journal_.info()) << "trigger after fail";
return;
}
if (!mHaveRoot)
{
JLOG(m_journal.trace()) << "TransactionAcquire::trigger "
<< (peer ? "havePeer" : "noPeer") << " no root";
JLOG(journal_.trace()) << "TransactionAcquire::trigger "
<< (peer ? "havePeer" : "noPeer") << " no root";
protocol::TMGetLedger tmGL;
tmGL.set_ledgerhash(mHash.begin(), mHash.size());
tmGL.set_ledgerhash(hash_.begin(), hash_.size());
tmGL.set_itype(protocol::liTS_CANDIDATE);
tmGL.set_querydepth(3); // We probably need the whole thing
if (mTimeouts != 0)
if (timeouts_ != 0)
tmGL.set_querytype(protocol::qtINDIRECT);
*(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
sendRequest(tmGL, peer);
mPeerSet->sendRequest(tmGL, peer);
}
else if (!mMap->isValid())
{
mFailed = true;
failed_ = true;
done();
}
else
@@ -150,26 +151,26 @@ TransactionAcquire::trigger(std::shared_ptr<Peer> const& peer)
if (nodes.empty())
{
if (mMap->isValid())
mComplete = true;
complete_ = true;
else
mFailed = true;
failed_ = true;
done();
return;
}
protocol::TMGetLedger tmGL;
tmGL.set_ledgerhash(mHash.begin(), mHash.size());
tmGL.set_ledgerhash(hash_.begin(), hash_.size());
tmGL.set_itype(protocol::liTS_CANDIDATE);
if (mTimeouts != 0)
if (timeouts_ != 0)
tmGL.set_querytype(protocol::qtINDIRECT);
for (auto const& node : nodes)
{
*tmGL.add_nodeids() = node.first.getRawString();
}
sendRequest(tmGL, peer);
mPeerSet->sendRequest(tmGL, peer);
}
}
@@ -179,17 +180,17 @@ TransactionAcquire::takeNodes(
const std::list<Blob>& data,
std::shared_ptr<Peer> const& peer)
{
ScopedLockType sl(mLock);
ScopedLockType sl(mtx_);
if (mComplete)
if (complete_)
{
JLOG(m_journal.trace()) << "TX set complete";
JLOG(journal_.trace()) << "TX set complete";
return SHAMapAddNode();
}
if (mFailed)
if (failed_)
{
JLOG(m_journal.trace()) << "TX set failed";
JLOG(journal_.trace()) << "TX set failed";
return SHAMapAddNode();
}
@@ -207,15 +208,15 @@ TransactionAcquire::takeNodes(
if (nodeIDit->isRoot())
{
if (mHaveRoot)
JLOG(m_journal.debug())
JLOG(journal_.debug())
<< "Got root TXS node, already have it";
else if (!mMap->addRootNode(
SHAMapHash{mHash},
SHAMapHash{hash_},
makeSlice(*nodeDatait),
nullptr)
.isGood())
{
JLOG(m_journal.warn()) << "TX acquire got bad root node";
JLOG(journal_.warn()) << "TX acquire got bad root node";
}
else
mHaveRoot = true;
@@ -223,7 +224,7 @@ TransactionAcquire::takeNodes(
else if (!mMap->addKnownNode(*nodeIDit, makeSlice(*nodeDatait), &sf)
.isGood())
{
JLOG(m_journal.warn()) << "TX acquire got bad non-root node";
JLOG(journal_.warn()) << "TX acquire got bad non-root node";
return SHAMapAddNode::invalid();
}
@@ -232,12 +233,12 @@ TransactionAcquire::takeNodes(
}
trigger(peer);
mProgress = true;
progress_ = true;
return SHAMapAddNode::useful();
}
catch (std::exception const&)
{
JLOG(m_journal.error()) << "Peer sends us junky transaction node data";
JLOG(journal_.error()) << "Peer sends us junky transaction node data";
return SHAMapAddNode::invalid();
}
}
@@ -245,27 +246,29 @@ TransactionAcquire::takeNodes(
void
TransactionAcquire::addPeers(std::size_t limit)
{
PeerSet::addPeers(
limit, [this](auto peer) { return peer->hasTxSet(mHash); });
mPeerSet->addPeers(
limit,
[this](auto peer) { return peer->hasTxSet(hash_); },
[this](auto peer) { trigger(peer); });
}
void
TransactionAcquire::init(int numPeers)
{
ScopedLockType sl(mLock);
ScopedLockType sl(mtx_);
addPeers(numPeers);
setTimer();
setTimer(sl);
}
void
TransactionAcquire::stillNeed()
{
ScopedLockType sl(mLock);
ScopedLockType sl(mtx_);
if (mTimeouts > NORM_TIMEOUTS)
mTimeouts = NORM_TIMEOUTS;
if (timeouts_ > NORM_TIMEOUTS)
timeouts_ = NORM_TIMEOUTS;
}
} // namespace ripple

View File

@@ -29,14 +29,17 @@ namespace ripple {
// VFALCO TODO rename to PeerTxRequest
// A transaction set we are trying to acquire
class TransactionAcquire final
: public PeerSet,
: public TimeoutCounter,
public std::enable_shared_from_this<TransactionAcquire>,
public CountedObject<TransactionAcquire>
{
public:
using pointer = std::shared_ptr<TransactionAcquire>;
TransactionAcquire(Application& app, uint256 const& hash);
TransactionAcquire(
Application& app,
uint256 const& hash,
std::unique_ptr<PeerSet> peerSet);
~TransactionAcquire() = default;
SHAMapAddNode
@@ -54,19 +57,11 @@ public:
private:
std::shared_ptr<SHAMap> mMap;
bool mHaveRoot;
void
queueJob() override;
std::unique_ptr<PeerSet> mPeerSet;
void
onTimer(bool progress, ScopedLockType& peerSetLock) override;
void
onPeerAdded(std::shared_ptr<Peer> const& peer) override
{
trigger(peer);
}
void
done();
@@ -75,7 +70,7 @@ private:
void
trigger(std::shared_ptr<Peer> const&);
std::weak_ptr<PeerSet>
std::weak_ptr<TimeoutCounter>
pmDowncast() override;
};

View File

@@ -21,6 +21,7 @@
#include <ripple/app/ledger/InboundLedgers.h>
#include <ripple/app/ledger/InboundTransactions.h>
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/ledger/LedgerReplayer.h>
#include <ripple/app/ledger/LedgerToJson.h>
#include <ripple/app/ledger/OpenLedger.h>
#include <ripple/app/ledger/OrderBookDB.h>
@@ -59,6 +60,7 @@
#include <ripple/nodestore/DummyScheduler.h>
#include <ripple/overlay/Cluster.h>
#include <ripple/overlay/PeerReservationTable.h>
#include <ripple/overlay/PeerSet.h>
#include <ripple/overlay/make_Overlay.h>
#include <ripple/protocol/BuildInfo.h>
#include <ripple/protocol/Feature.h>
@@ -195,6 +197,7 @@ public:
std::unique_ptr<LedgerMaster> m_ledgerMaster;
std::unique_ptr<InboundLedgers> m_inboundLedgers;
std::unique_ptr<InboundTransactions> m_inboundTransactions;
std::unique_ptr<LedgerReplayer> m_ledgerReplayer;
TaggedCache<uint256, AcceptedLedger> m_acceptedLedgerCache;
std::unique_ptr<NetworkOPs> m_networkOPs;
std::unique_ptr<Cluster> cluster_;
@@ -367,6 +370,12 @@ public:
gotTXSet(set, fromAcquire);
}))
, m_ledgerReplayer(std::make_unique<LedgerReplayer>(
*this,
*m_inboundLedgers,
make_PeerSetBuilder(*this),
*m_jobQueue))
, m_acceptedLedgerCache(
"AcceptedLedger",
4,
@@ -583,6 +592,12 @@ public:
return *m_ledgerMaster;
}
LedgerReplayer&
getLedgerReplayer() override
{
return *m_ledgerReplayer;
}
InboundLedgers&
getInboundLedgers() override
{
@@ -1270,6 +1285,7 @@ public:
getTempNodeCache().sweep();
getValidations().expire();
getInboundLedgers().sweep();
getLedgerReplayer().sweep();
m_acceptedLedgerCache.sweep();
cachedSLEs_.expire();
@@ -2022,7 +2038,8 @@ ApplicationImp::loadOldLedger(
hash,
0,
InboundLedger::Reason::GENERIC,
stopwatch());
stopwatch(),
make_DummyPeerSet(*this));
if (il->checkLocal())
loadLedger = il->getLedger();
}
@@ -2065,7 +2082,8 @@ ApplicationImp::loadOldLedger(
replayLedger->info().parentHash,
0,
InboundLedger::Reason::GENERIC,
stopwatch());
stopwatch(),
make_DummyPeerSet(*this));
if (il->checkLocal())
loadLedger = il->getLedger();

View File

@@ -64,6 +64,7 @@ class InboundTransactions;
class AcceptedLedger;
class Ledger;
class LedgerMaster;
class LedgerReplayer;
class LoadManager;
class ManifestCache;
class ValidatorKeys;
@@ -204,6 +205,8 @@ public:
virtual LedgerMaster&
getLedgerMaster() = 0;
virtual LedgerReplayer&
getLedgerReplayer() = 0;
virtual NetworkOPs&
getOPs() = 0;
virtual OrderBookDB&

View File

@@ -185,6 +185,9 @@ public:
// Compression
bool COMPRESSION = false;
// Enable the experimental Ledger Replay functionality
bool LEDGER_REPLAY = false;
// Work queue limits
int MAX_TRANSACTIONS = 250;
static constexpr int MAX_JOB_QUEUE_TX = 1000;

View File

@@ -96,6 +96,7 @@ struct ConfigSection
#define SECTION_VALIDATOR_TOKEN "validator_token"
#define SECTION_VETO_AMENDMENTS "veto_amendments"
#define SECTION_WORKERS "workers"
#define SECTION_LEDGER_REPLAY "ledger_replay"
} // namespace ripple

View File

@@ -42,8 +42,10 @@ enum JobType {
jtPUBOLDLEDGER, // An old ledger has been accepted
jtVALIDATION_ut, // A validation from an untrusted source
jtTRANSACTION_l, // A local transaction
jtREPLAY_REQ, // Peer request a ledger delta or a skip list
jtLEDGER_REQ, // Peer request ledger/txnset data
jtPROPOSAL_ut, // A proposal from an untrusted source
jtREPLAY_TASK, // A Ledger replay task/subtask
jtLEDGER_DATA, // Received data for a ledger we're acquiring
jtCLIENT, // A websocket command from the client
jtRPC, // A websocket command from the client

View File

@@ -57,8 +57,10 @@ private:
2000ms,
5000ms);
add(jtTRANSACTION_l, "localTransaction", maxLimit, false, 100ms, 500ms);
add(jtREPLAY_REQ, "ledgerReplayRequest", 10, false, 250ms, 1000ms);
add(jtLEDGER_REQ, "ledgerRequest", 2, false, 0ms, 0ms);
add(jtPROPOSAL_ut, "untrustedProposal", maxLimit, false, 500ms, 1250ms);
add(jtREPLAY_TASK, "ledgerReplayTask", maxLimit, false, 0ms, 0ms);
add(jtLEDGER_DATA, "ledgerData", 2, false, 0ms, 0ms);
add(jtCLIENT, "clientCommand", maxLimit, false, 2000ms, 5000ms);
add(jtRPC, "RPC", maxLimit, false, 0ms, 0ms);

View File

@@ -529,6 +529,9 @@ Config::loadFromString(std::string const& fileContents)
if (getSingleSection(secConfig, SECTION_COMPRESSION, strTemp, j_))
COMPRESSION = beast::lexicalCastThrow<bool>(strTemp);
if (getSingleSection(secConfig, SECTION_LEDGER_REPLAY, strTemp, j_))
LEDGER_REPLAY = beast::lexicalCastThrow<bool>(strTemp);
if (exists(SECTION_REDUCE_RELAY))
{
auto sec = section(SECTION_REDUCE_RELAY);

View File

@@ -37,7 +37,8 @@ static constexpr std::uint32_t csHopLimit = 3;
enum class ProtocolFeature {
ValidatorListPropagation,
ValidatorList2Propagation
ValidatorList2Propagation,
LedgerReplay,
};
/** Represents a peer connection in the overlay. */

View File

@@ -24,6 +24,7 @@
#include <ripple/beast/clock/abstract_clock.h>
#include <ripple/beast/utility/Journal.h>
#include <ripple/overlay/Peer.h>
#include <ripple/overlay/impl/ProtocolMessage.h>
#include <boost/asio/basic_waitable_timer.hpp>
#include <mutex>
#include <set>
@@ -38,94 +39,64 @@ namespace ripple {
Callers maintain the set by adding and removing peers depending
on whether the peers have useful information.
This class is an "active" object. It maintains its own timer
and dispatches work to a job queue. Implementations derive
from this class and override the abstract hook functions in
the base.
The data is represented by its hash.
*/
class PeerSet
{
protected:
using ScopedLockType = std::unique_lock<std::recursive_mutex>;
public:
virtual ~PeerSet() = default;
PeerSet(
Application& app,
uint256 const& hash,
std::chrono::milliseconds interval,
beast::Journal journal);
virtual ~PeerSet() = 0;
/** Add at most `limit` peers to this set from the overlay. */
void
/**
* Try add more peers
* @param limit number of peers to add
* @param hasItem callback that helps to select peers
* @param onPeerAdded callback called when a peer is added
*/
virtual void
addPeers(
std::size_t limit,
std::function<bool(std::shared_ptr<Peer> const&)> score);
std::function<bool(std::shared_ptr<Peer> const&)> hasItem,
std::function<void(std::shared_ptr<Peer> const&)> onPeerAdded) = 0;
/** Hook called from addPeers(). */
virtual void
onPeerAdded(std::shared_ptr<Peer> const&) = 0;
/** Hook called from invokeOnTimer(). */
virtual void
onTimer(bool progress, ScopedLockType&) = 0;
/** Queue a job to call invokeOnTimer(). */
virtual void
queueJob() = 0;
/** Return a weak pointer to this. */
virtual std::weak_ptr<PeerSet>
pmDowncast() = 0;
bool
isDone() const
/** send a message */
template <typename MessageType>
void
sendRequest(MessageType const& message, std::shared_ptr<Peer> const& peer)
{
return mComplete || mFailed;
this->sendRequest(message, protocolMessageType(message), peer);
}
/** Calls onTimer() if in the right state. */
void
invokeOnTimer();
/** Send a GetLedger message to one or all peers. */
void
virtual void
sendRequest(
const protocol::TMGetLedger& message,
std::shared_ptr<Peer> const& peer);
::google::protobuf::Message const& message,
protocol::MessageType type,
std::shared_ptr<Peer> const& peer) = 0;
/** Schedule a call to queueJob() after mTimerInterval. */
void
setTimer();
// Used in this class for access to boost::asio::io_service and
// ripple::Overlay. Used in subtypes for the kitchen sink.
Application& app_;
beast::Journal m_journal;
std::recursive_mutex mLock;
/** The hash of the object (in practice, always a ledger) we are trying to
* fetch. */
uint256 const mHash;
int mTimeouts;
bool mComplete;
bool mFailed;
/** Whether forward progress has been made. */
bool mProgress;
/** The identifiers of the peers we are tracking. */
std::set<Peer::id_t> mPeers;
private:
/** The minimum time to wait between calls to execute(). */
std::chrono::milliseconds mTimerInterval;
// VFALCO TODO move the responsibility for the timer to a higher level
boost::asio::basic_waitable_timer<std::chrono::steady_clock> mTimer;
/** get the set of ids of previously added peers */
virtual const std::set<Peer::id_t>&
getPeerIds() const = 0;
};
class PeerSetBuilder
{
public:
virtual ~PeerSetBuilder() = default;
virtual std::unique_ptr<PeerSet>
build() = 0;
};
std::unique_ptr<PeerSetBuilder>
make_PeerSetBuilder(Application& app);
/**
* Make a dummy PeerSet that does not do anything.
* @note For the use case of InboundLedger in ApplicationImp::loadOldLedger(),
* where a real PeerSet is not needed.
*/
std::unique_ptr<PeerSet>
make_DummyPeerSet(Application& app);
} // namespace ripple
#endif

View File

@@ -204,7 +204,8 @@ ConnectAttempt::onHandshake(error_code ec)
req_ = makeRequest(
!overlay_.peerFinder().config().peerPrivate,
app_.config().COMPRESSION,
app_.config().VP_REDUCE_RELAY_ENABLE);
app_.config().VP_REDUCE_RELAY_ENABLE,
app_.config().LEDGER_REPLAY);
buildHandshake(
req_,

View File

@@ -71,13 +71,18 @@ featureEnabled(
}
std::string
makeFeaturesRequestHeader(bool comprEnabled, bool vpReduceRelayEnabled)
makeFeaturesRequestHeader(
bool comprEnabled,
bool vpReduceRelayEnabled,
bool ledgerReplayEnabled)
{
std::stringstream str;
if (comprEnabled)
str << FEATURE_COMPR << "=lz4" << DELIM_FEATURE;
if (vpReduceRelayEnabled)
str << FEATURE_VPRR << "=1";
if (ledgerReplayEnabled)
str << FEATURE_LEDGER_REPLAY << "=1";
return str.str();
}
@@ -85,13 +90,16 @@ std::string
makeFeaturesResponseHeader(
http_request_type const& headers,
bool comprEnabled,
bool vpReduceRelayEnabled)
bool vpReduceRelayEnabled,
bool ledgerReplayEnabled)
{
std::stringstream str;
if (comprEnabled && isFeatureValue(headers, FEATURE_COMPR, "lz4"))
str << FEATURE_COMPR << "=lz4" << DELIM_FEATURE;
if (vpReduceRelayEnabled && featureEnabled(headers, FEATURE_VPRR))
str << FEATURE_VPRR << "=1";
if (ledgerReplayEnabled && featureEnabled(headers, FEATURE_LEDGER_REPLAY))
str << FEATURE_LEDGER_REPLAY << "=1";
return str.str();
}
@@ -353,8 +361,11 @@ verifyHandshake(
}
auto
makeRequest(bool crawlPublic, bool comprEnabled, bool vpReduceRelayEnabled)
-> request_type
makeRequest(
bool crawlPublic,
bool comprEnabled,
bool vpReduceRelayEnabled,
bool ledgerReplayEnabled) -> request_type
{
request_type m;
m.method(boost::beast::http::verb::get);
@@ -367,7 +378,8 @@ makeRequest(bool crawlPublic, bool comprEnabled, bool vpReduceRelayEnabled)
m.insert("Crawl", crawlPublic ? "public" : "private");
m.insert(
"X-Protocol-Ctl",
makeFeaturesRequestHeader(comprEnabled, vpReduceRelayEnabled));
makeFeaturesRequestHeader(
comprEnabled, vpReduceRelayEnabled, ledgerReplayEnabled));
return m;
}
@@ -395,7 +407,8 @@ makeResponse(
makeFeaturesResponseHeader(
req,
app.config().COMPRESSION,
app.config().VP_REDUCE_RELAY_ENABLE));
app.config().VP_REDUCE_RELAY_ENABLE,
app.config().LEDGER_REPLAY));
buildHandshake(resp, sharedValue, networkID, public_ip, remote_ip, app);

View File

@@ -96,10 +96,15 @@ verifyHandshake(
@param crawlPublic if true then server's IP/Port are included in crawl
@param comprEnabled if true then compression feature is enabled
@param vpReduceRelayEnabled if true then reduce-relay feature is enabled
@param ledgerReplayEnabled if true then ledger-replay feature is enabled
@return http request with empty body
*/
request_type
makeRequest(bool crawlPublic, bool comprEnabled, bool vpReduceRelayEnabled);
makeRequest(
bool crawlPublic,
bool comprEnabled,
bool vpReduceRelayEnabled,
bool ledgerReplayEnabled);
/** Make http response
@@ -131,6 +136,8 @@ makeResponse(
static constexpr char FEATURE_COMPR[] = "compr"; // compression
static constexpr char FEATURE_VPRR[] =
"vprr"; // validation/proposal reduce-relay
static constexpr char FEATURE_LEDGER_REPLAY[] =
"ledgerreplay"; // ledger replay
static constexpr char DELIM_FEATURE[] = ";";
static constexpr char DELIM_VALUE[] = ",";
@@ -204,10 +211,14 @@ peerFeatureEnabled(
/** Make request header X-Protocol-Ctl value with supported features
@param comprEnabled if true then compression feature is enabled
@param vpReduceRelayEnabled if true then reduce-relay feature is enabled
@param ledgerReplayEnabled if true then ledger-replay feature is enabled
@return X-Protocol-Ctl header value
*/
std::string
makeFeaturesRequestHeader(bool comprEnabled, bool vpReduceRelayEnabled);
makeFeaturesRequestHeader(
bool comprEnabled,
bool vpReduceRelayEnabled,
bool ledgerReplayEnabled);
/** Make response header X-Protocol-Ctl value with supported features.
If the request has a feature that we support enabled
@@ -216,13 +227,15 @@ makeFeaturesRequestHeader(bool comprEnabled, bool vpReduceRelayEnabled);
@param header request's header
@param comprEnabled if true then compression feature is enabled
@param vpReduceRelayEnabled if true then reduce-relay feature is enabled
@param ledgerReplayEnabled if true then ledger-replay feature is enabled
@return X-Protocol-Ctl header value
*/
std::string
makeFeaturesResponseHeader(
http_request_type const& headers,
bool comprEnabled,
bool vpReduceRelayEnabled);
bool vpReduceRelayEnabled,
bool ledgerReplayEnabled);
} // namespace ripple

View File

@@ -85,6 +85,7 @@ Message::compress()
case protocol::mtGET_OBJECTS:
case protocol::mtVALIDATORLIST:
case protocol::mtVALIDATORLISTCOLLECTION:
case protocol::mtREPLAY_DELTA_RESPONSE:
return true;
case protocol::mtPING:
case protocol::mtCLUSTER:
@@ -96,6 +97,9 @@ Message::compress()
case protocol::mtSHARD_INFO:
case protocol::mtGET_PEER_SHARD_INFO:
case protocol::mtPEER_SHARD_INFO:
case protocol::mtPROOF_PATH_REQ:
case protocol::mtPROOF_PATH_RESPONSE:
case protocol::mtREPLAY_DELTA_REQ:
break;
}
return false;

View File

@@ -112,6 +112,11 @@ PeerImp::PeerImp(
headers_,
FEATURE_VPRR,
app_.config().VP_REDUCE_RELAY_ENABLE))
, ledgerReplayEnabled_(peerFeatureEnabled(
headers_,
FEATURE_LEDGER_REPLAY,
app_.config().LEDGER_REPLAY))
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
{
JLOG(journal_.debug()) << " compression enabled "
<< (compressionEnabled_ == Compressed::On)
@@ -444,6 +449,8 @@ PeerImp::supportsFeature(ProtocolFeature f) const
return protocol_ >= make_protocol(2, 1);
case ProtocolFeature::ValidatorList2Propagation:
return protocol_ >= make_protocol(2, 2);
case ProtocolFeature::LedgerReplay:
return ledgerReplayEnabled_;
}
return false;
}
@@ -1474,6 +1481,104 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
});
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
{
JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
if (!ledgerReplayEnabled_)
{
charge(Resource::feeInvalidRequest);
return;
}
fee_ = Resource::feeMediumBurdenPeer;
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(
jtREPLAY_REQ, "recvProofPathRequest", [weak, m](Job&) {
if (auto peer = weak.lock())
{
auto reply =
peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
if (reply.has_error())
{
if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
peer->charge(Resource::feeInvalidRequest);
else
peer->charge(Resource::feeRequestNoReply);
}
else
{
peer->send(std::make_shared<Message>(
reply, protocol::mtPROOF_PATH_RESPONSE));
}
}
});
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m)
{
if (!ledgerReplayEnabled_)
{
charge(Resource::feeInvalidRequest);
return;
}
if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
{
charge(Resource::feeBadData);
}
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
{
JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
if (!ledgerReplayEnabled_)
{
charge(Resource::feeInvalidRequest);
return;
}
fee_ = Resource::feeMediumBurdenPeer;
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(
jtREPLAY_REQ, "recvReplayDeltaRequest", [weak, m](Job&) {
if (auto peer = weak.lock())
{
auto reply =
peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
if (reply.has_error())
{
if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
peer->charge(Resource::feeInvalidRequest);
else
peer->charge(Resource::feeRequestNoReply);
}
else
{
peer->send(std::make_shared<Message>(
reply, protocol::mtREPLAY_DELTA_RESPONSE));
}
}
});
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
{
if (!ledgerReplayEnabled_)
{
charge(Resource::feeInvalidRequest);
return;
}
if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
{
charge(Resource::feeBadData);
}
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
{

View File

@@ -21,6 +21,7 @@
#define RIPPLE_OVERLAY_PEERIMP_H_INCLUDED
#include <ripple/app/consensus/RCLCxPeerPos.h>
#include <ripple/app/ledger/impl/LedgerReplayMsgHandler.h>
#include <ripple/basics/Log.h>
#include <ripple/basics/RangeSet.h>
#include <ripple/beast/utility/WrappedSink.h>
@@ -169,10 +170,11 @@ private:
hash_map<PublicKey, ShardInfo> shardInfo_;
Compressed compressionEnabled_ = Compressed::Off;
// true if validation/proposal reduce-relay feature is enabled
// on the peer.
bool vpReduceRelayEnabled_ = false;
bool ledgerReplayEnabled_ = false;
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
friend class OverlayImpl;
@@ -530,6 +532,14 @@ public:
onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m);
void
onMessage(std::shared_ptr<protocol::TMSquelch> const& m);
void
onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m);
void
onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m);
void
onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m);
void
onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m);
private:
//--------------------------------------------------------------------------
@@ -624,6 +634,11 @@ PeerImp::PeerImp(
headers_,
FEATURE_VPRR,
app_.config().VP_REDUCE_RELAY_ENABLE))
, ledgerReplayEnabled_(peerFeatureEnabled(
headers_,
FEATURE_LEDGER_REPLAY,
app_.config().LEDGER_REPLAY))
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
{
read_buffer_.commit(boost::asio::buffer_copy(
read_buffer_.prepare(boost::asio::buffer_size(buffers)), buffers));

View File

@@ -24,32 +24,47 @@
namespace ripple {
using namespace std::chrono_literals;
PeerSet::PeerSet(
Application& app,
uint256 const& hash,
std::chrono::milliseconds interval,
beast::Journal journal)
: app_(app)
, m_journal(journal)
, mHash(hash)
, mTimeouts(0)
, mComplete(false)
, mFailed(false)
, mProgress(false)
, mTimerInterval(interval)
, mTimer(app_.getIOService())
class PeerSetImpl : public PeerSet
{
public:
PeerSetImpl(Application& app);
void
addPeers(
std::size_t limit,
std::function<bool(std::shared_ptr<Peer> const&)> hasItem,
std::function<void(std::shared_ptr<Peer> const&)> onPeerAdded) override;
/** Send a message to one or all peers. */
void
sendRequest(
::google::protobuf::Message const& message,
protocol::MessageType type,
std::shared_ptr<Peer> const& peer) override;
const std::set<Peer::id_t>&
getPeerIds() const override;
private:
// Used in this class for access to boost::asio::io_service and
// ripple::Overlay.
Application& app_;
beast::Journal journal_;
/** The identifiers of the peers we are tracking. */
std::set<Peer::id_t> peers_;
};
PeerSetImpl::PeerSetImpl(Application& app)
: app_(app), journal_(app.journal("PeerSet"))
{
assert((mTimerInterval > 10ms) && (mTimerInterval < 30s));
}
PeerSet::~PeerSet() = default;
void
PeerSet::addPeers(
PeerSetImpl::addPeers(
std::size_t limit,
std::function<bool(std::shared_ptr<Peer> const&)> hasItem)
std::function<bool(std::shared_ptr<Peer> const&)> hasItem,
std::function<void(std::shared_ptr<Peer> const&)> onPeerAdded)
{
using ScoredPeer = std::pair<int, std::shared_ptr<Peer>>;
@@ -71,11 +86,10 @@ PeerSet::addPeers(
});
std::size_t accepted = 0;
ScopedLockType sl(mLock);
for (auto const& pair : pairs)
{
auto const peer = pair.second;
if (!mPeers.insert(peer->id()).second)
if (!peers_.insert(peer->id()).second)
continue;
onPeerAdded(peer);
if (++accepted >= limit)
@@ -84,65 +98,95 @@ PeerSet::addPeers(
}
void
PeerSet::setTimer()
{
mTimer.expires_after(mTimerInterval);
mTimer.async_wait(
[wptr = pmDowncast()](boost::system::error_code const& ec) {
if (ec == boost::asio::error::operation_aborted)
return;
if (auto ptr = wptr.lock())
ptr->queueJob();
});
}
void
PeerSet::invokeOnTimer()
{
ScopedLockType sl(mLock);
if (isDone())
return;
if (!mProgress)
{
++mTimeouts;
JLOG(m_journal.debug())
<< "Timeout(" << mTimeouts << ") pc=" << mPeers.size()
<< " acquiring " << mHash;
onTimer(false, sl);
}
else
{
mProgress = false;
onTimer(true, sl);
}
if (!isDone())
setTimer();
}
void
PeerSet::sendRequest(
const protocol::TMGetLedger& tmGL,
PeerSetImpl::sendRequest(
::google::protobuf::Message const& message,
protocol::MessageType type,
std::shared_ptr<Peer> const& peer)
{
auto packet = std::make_shared<Message>(tmGL, protocol::mtGET_LEDGER);
auto packet = std::make_shared<Message>(message, type);
if (peer)
{
peer->send(packet);
return;
}
ScopedLockType sl(mLock);
for (auto id : mPeers)
for (auto id : peers_)
{
if (auto p = app_.overlay().findPeerByShortID(id))
p->send(packet);
}
}
const std::set<Peer::id_t>&
PeerSetImpl::getPeerIds() const
{
return peers_;
}
class PeerSetBuilderImpl : public PeerSetBuilder
{
public:
PeerSetBuilderImpl(Application& app) : app_(app)
{
}
virtual std::unique_ptr<PeerSet>
build() override
{
return std::make_unique<PeerSetImpl>(app_);
}
private:
Application& app_;
};
std::unique_ptr<PeerSetBuilder>
make_PeerSetBuilder(Application& app)
{
return std::make_unique<PeerSetBuilderImpl>(app);
}
class DummyPeerSet : public PeerSet
{
public:
DummyPeerSet(Application& app) : j_(app.journal("DummyPeerSet"))
{
}
void
addPeers(
std::size_t limit,
std::function<bool(std::shared_ptr<Peer> const&)> hasItem,
std::function<void(std::shared_ptr<Peer> const&)> onPeerAdded) override
{
JLOG(j_.error()) << "DummyPeerSet addPeers should not be called";
}
void
sendRequest(
::google::protobuf::Message const& message,
protocol::MessageType type,
std::shared_ptr<Peer> const& peer) override
{
JLOG(j_.error()) << "DummyPeerSet sendRequest should not be called";
}
const std::set<Peer::id_t>&
getPeerIds() const override
{
static std::set<Peer::id_t> emptyPeers;
JLOG(j_.error()) << "DummyPeerSet getPeerIds should not be called";
return emptyPeers;
}
private:
beast::Journal j_;
};
std::unique_ptr<PeerSet>
make_DummyPeerSet(Application& app)
{
return std::make_unique<DummyPeerSet>(app);
}
} // namespace ripple

View File

@@ -32,11 +32,30 @@
#include <cstdint>
#include <memory>
#include <optional>
#include <ripple.pb.h>
#include <type_traits>
#include <vector>
namespace ripple {
inline protocol::MessageType
protocolMessageType(protocol::TMGetLedger const&)
{
return protocol::mtGET_LEDGER;
}
inline protocol::MessageType
protocolMessageType(protocol::TMReplayDeltaRequest const&)
{
return protocol::mtREPLAY_DELTA_REQ;
}
inline protocol::MessageType
protocolMessageType(protocol::TMProofPathRequest const&)
{
return protocol::mtPROOF_PATH_REQ;
}
/** Returns the name of a protocol message given its type. */
template <class = void>
std::string
@@ -82,6 +101,14 @@ protocolMessageName(int type)
return "get_objects";
case protocol::mtSQUELCH:
return "squelch";
case protocol::mtPROOF_PATH_REQ:
return "proof_path_request";
case protocol::mtPROOF_PATH_RESPONSE:
return "proof_path_response";
case protocol::mtREPLAY_DELTA_REQ:
return "replay_delta_request";
case protocol::mtREPLAY_DELTA_RESPONSE:
return "replay_delta_response";
default:
break;
}
@@ -438,6 +465,22 @@ invokeProtocolMessage(
success =
detail::invoke<protocol::TMSquelch>(*header, buffers, handler);
break;
case protocol::mtPROOF_PATH_REQ:
success = detail::invoke<protocol::TMProofPathRequest>(
*header, buffers, handler);
break;
case protocol::mtPROOF_PATH_RESPONSE:
success = detail::invoke<protocol::TMProofPathResponse>(
*header, buffers, handler);
break;
case protocol::mtREPLAY_DELTA_REQ:
success = detail::invoke<protocol::TMReplayDeltaRequest>(
*header, buffers, handler);
break;
case protocol::mtREPLAY_DELTA_RESPONSE:
success = detail::invoke<protocol::TMReplayDeltaResponse>(
*header, buffers, handler);
break;
default:
handler.onMessageUnknown(header->message_type);
success = true;

View File

@@ -142,6 +142,18 @@ TrafficCount::categorize(
: TrafficCount::category::get_hash;
}
if (type == protocol::mtPROOF_PATH_REQ)
return TrafficCount::category::proof_path_request;
if (type == protocol::mtPROOF_PATH_RESPONSE)
return TrafficCount::category::proof_path_response;
if (type == protocol::mtREPLAY_DELTA_REQ)
return TrafficCount::category::replay_delta_request;
if (type == protocol::mtREPLAY_DELTA_RESPONSE)
return TrafficCount::category::replay_delta_response;
return TrafficCount::category::unknown;
}

View File

@@ -140,6 +140,14 @@ public:
share_hash,
get_hash,
// TMProofPathRequest and TMProofPathResponse
proof_path_request,
proof_path_response,
// TMReplayDeltaRequest and TMReplayDeltaResponse
replay_delta_request,
replay_delta_response,
unknown // must be last
};
@@ -224,7 +232,11 @@ protected:
{"getobject_Fetch Pack_get"}, // category::get_fetch_pack
{"getobject_share"}, // category::share_hash
{"getobject_get"}, // category::get_hash
{"unknown"} // category::unknown
{"proof_path_request"}, // category::proof_path_request
{"proof_path_response"}, // category::proof_path_response
{"replay_delta_request"}, // category::replay_delta_request
{"replay_delta_response"}, // category::replay_delta_response
{"unknown"} // category::unknown
}};
};

View File

@@ -25,6 +25,10 @@ enum MessageType
mtVALIDATORLIST = 54;
mtSQUELCH = 55;
mtVALIDATORLISTCOLLECTION = 56;
mtPROOF_PATH_REQ = 57;
mtPROOF_PATH_RESPONSE = 58;
mtREPLAY_DELTA_REQ = 59;
mtREPLAY_DELTA_RESPONSE = 60;
}
// token, iterations, target, challenge = issue demand for proof of work
@@ -334,6 +338,7 @@ enum TMReplyError
{
reNO_LEDGER = 1; // We don't have the ledger you are asking about
reNO_NODE = 2; // We don't have any of the nodes you are asking for
reBAD_REQUEST = 3; // The request is wrong, e.g. wrong format
}
message TMLedgerData
@@ -365,3 +370,39 @@ message TMSquelch
optional uint32 squelchDuration = 3; // squelch duration in seconds
}
enum TMLedgerMapType
{
lmTRANASCTION = 1; // transaction map
lmACCOUNT_STATE = 2; // account state map
}
message TMProofPathRequest
{
required bytes key = 1;
required bytes ledgerHash = 2;
required TMLedgerMapType type = 3;
}
message TMProofPathResponse
{
required bytes key = 1;
required bytes ledgerHash = 2;
required TMLedgerMapType type = 3;
optional bytes ledgerHeader = 4;
repeated bytes path = 5;
optional TMReplyError error = 6;
}
message TMReplayDeltaRequest
{
required bytes ledgerHash = 1;
}
message TMReplayDeltaResponse
{
required bytes ledgerHash = 1;
optional bytes ledgerHeader = 2;
repeated bytes transaction = 3;
optional TMReplyError error = 4;
}

View File

@@ -264,6 +264,28 @@ public:
bool fatLeaves,
std::uint32_t depth) const;
/**
* Get the proof path of the key. The proof path is every node on the path
* from leaf to root. Sibling hashes are stored in the parent nodes.
* @param key key of the leaf
* @return the proof path if found
*/
std::optional<std::vector<Blob>>
getProofPath(uint256 const& key) const;
/**
* Verify the proof path
* @param rootHash root hash of the map
* @param key key of the leaf
* @param path the proof path
* @return true if verified successfully
*/
static bool
verifyProofPath(
uint256 const& rootHash,
uint256 const& key,
std::vector<Blob> const& path);
/** Serializes the root in a format appropriate for sending over the wire */
void
serializeRoot(Serializer& s) const;

View File

@@ -69,6 +69,17 @@ public:
SHAMapNodeID
getChildNodeID(unsigned int m) const;
/**
* Create a SHAMapNodeID of a node with the depth of the node and
* the key of a leaf
*
* @param depth the depth of the node
* @param key the key of a leaf
* @return SHAMapNodeID of the node
*/
static SHAMapNodeID
createID(int depth, uint256 const& key);
// FIXME-C++20: use spaceship and operator synthesis
/** Comparison operators */
bool

View File

@@ -132,4 +132,11 @@ selectBranch(SHAMapNodeID const& id, uint256 const& hash)
return branch;
}
SHAMapNodeID
SHAMapNodeID::createID(int depth, uint256 const& key)
{
assert((depth >= 0) && (depth < 65));
return SHAMapNodeID(depth, key & depthMask(depth));
}
} // namespace ripple

View File

@@ -788,4 +788,84 @@ SHAMap::hasLeafNode(uint256 const& tag, SHAMapHash const& targetNodeHash) const
// already
}
std::optional<std::vector<Blob>>
SHAMap::getProofPath(uint256 const& key) const
{
SharedPtrNodeStack stack;
walkTowardsKey(key, &stack);
if (stack.empty())
{
JLOG(journal_.debug()) << "no path to " << key;
return {};
}
if (auto const& node = stack.top().first; !node || node->isInner() ||
std::static_pointer_cast<SHAMapLeafNode>(node)->peekItem()->key() !=
key)
{
JLOG(journal_.debug()) << "no path to " << key;
return {};
}
std::vector<Blob> path;
path.reserve(stack.size());
while (!stack.empty())
{
Serializer s;
stack.top().first->serializeForWire(s);
path.emplace_back(std::move(s.modData()));
stack.pop();
}
JLOG(journal_.debug()) << "getPath for key " << key << ", path length "
<< path.size();
return path;
}
bool
SHAMap::verifyProofPath(
uint256 const& rootHash,
uint256 const& key,
std::vector<Blob> const& path)
{
if (path.empty() || path.size() > 65)
return false;
SHAMapHash hash{rootHash};
try
{
for (auto rit = path.rbegin(); rit != path.rend(); ++rit)
{
auto const& blob = *rit;
auto node = SHAMapTreeNode::makeFromWire(makeSlice(blob));
if (!node)
return false;
node->updateHash();
if (node->getHash() != hash)
return false;
auto depth = std::distance(path.rbegin(), rit);
if (node->isInner())
{
auto nodeId = SHAMapNodeID::createID(depth, key);
hash = static_cast<SHAMapInnerNode*>(node.get())
->getChildHash(selectBranch(nodeId, key));
}
else
{
// should exhaust all the blobs now
return depth + 1 == path.size();
}
}
}
catch (std::exception const&)
{
// the data in the path may come from the network,
// exception could be thrown when parsing the data
return false;
}
return false;
}
} // namespace ripple

File diff suppressed because it is too large Load Diff

View File

@@ -488,7 +488,8 @@ public:
auto request = ripple::makeRequest(
true,
env->app().config().COMPRESSION,
env->app().config().VP_REDUCE_RELAY_ENABLE);
env->app().config().VP_REDUCE_RELAY_ENABLE,
false);
http_request_type http_request;
http_request.version(request.version());
http_request.base() = request.base();

View File

@@ -1500,7 +1500,8 @@ vp_squelched=1
auto request = ripple::makeRequest(
true,
env_.app().config().COMPRESSION,
env_.app().config().VP_REDUCE_RELAY_ENABLE);
env_.app().config().VP_REDUCE_RELAY_ENABLE,
false);
http_request_type http_request;
http_request.version(request.version());
http_request.base() = request.base();

View File

@@ -353,7 +353,78 @@ public:
}
};
BEAST_DEFINE_TESTSUITE(SHAMap, ripple_app, ripple);
class SHAMapPathProof_test : public beast::unit_test::suite
{
void
run() override
{
test::SuiteJournal journal("SHAMapPathProof_test", *this);
tests::TestNodeFamily tf{journal};
SHAMap map{SHAMapType::FREE, tf};
map.setUnbacked();
uint256 key;
uint256 rootHash;
std::vector<Blob> goodPath;
for (unsigned char c = 1; c < 100; ++c)
{
uint256 k(c);
Blob b(32, c);
map.addItem(SHAMapNodeType::tnACCOUNT_STATE, SHAMapItem{k, b});
map.invariants();
auto root = map.getHash().as_uint256();
auto path = map.getProofPath(k);
BEAST_EXPECT(path);
if (!path)
break;
BEAST_EXPECT(map.verifyProofPath(root, k, *path));
if (c == 1)
{
// extra node
path->insert(path->begin(), path->front());
BEAST_EXPECT(!map.verifyProofPath(root, k, *path));
// wrong key
uint256 wrongKey(c + 1);
BEAST_EXPECT(!map.getProofPath(wrongKey));
}
if (c == 99)
{
key = k;
rootHash = root;
goodPath = std::move(*path);
}
}
// still good
BEAST_EXPECT(map.verifyProofPath(rootHash, key, goodPath));
// empty path
std::vector<Blob> badPath;
BEAST_EXPECT(!map.verifyProofPath(rootHash, key, badPath));
// too long
badPath = goodPath;
badPath.push_back(goodPath.back());
BEAST_EXPECT(!map.verifyProofPath(rootHash, key, badPath));
// bad node
badPath.clear();
badPath.emplace_back(100, 100);
BEAST_EXPECT(!map.verifyProofPath(rootHash, key, badPath));
// bad node type
badPath.clear();
badPath.push_back(goodPath.front());
badPath.front().back()--; // change node type
BEAST_EXPECT(!map.verifyProofPath(rootHash, key, badPath));
// all inner
badPath.clear();
badPath = goodPath;
badPath.erase(badPath.begin());
BEAST_EXPECT(!map.verifyProofPath(rootHash, key, badPath));
}
};
BEAST_DEFINE_TESTSUITE(SHAMap, ripple_app, ripple);
BEAST_DEFINE_TESTSUITE(SHAMapPathProof, ripple_app, ripple);
} // namespace tests
} // namespace ripple