mirror of
https://github.com/Xahau/xahaud.git
synced 2026-02-03 05:25:17 +00:00
Compare commits
8 Commits
sublimator
...
partial-sy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b3c4e56c26 | ||
|
|
9ddf649e2a | ||
|
|
aeb2888fe9 | ||
|
|
8263f39e3e | ||
|
|
dc5ec93207 | ||
|
|
5d85d2df4b | ||
|
|
c20c948183 | ||
|
|
f6a4e8f36d |
2
.github/workflows/xahau-ga-macos.yml
vendored
2
.github/workflows/xahau-ga-macos.yml
vendored
@@ -20,7 +20,7 @@ jobs:
|
|||||||
- Ninja
|
- Ninja
|
||||||
configuration:
|
configuration:
|
||||||
- Debug
|
- Debug
|
||||||
runs-on: macos-15-xlarge
|
runs-on: macos-15
|
||||||
env:
|
env:
|
||||||
build_dir: .build
|
build_dir: .build
|
||||||
# Bump this number to invalidate all caches globally.
|
# Bump this number to invalidate all caches globally.
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -178,8 +178,26 @@ handleNewValidation(
|
|||||||
auto const outcome =
|
auto const outcome =
|
||||||
validations.add(calcNodeID(masterKey.value_or(signingKey)), val);
|
validations.add(calcNodeID(masterKey.value_or(signingKey)), val);
|
||||||
|
|
||||||
|
if (j.has_value())
|
||||||
|
{
|
||||||
|
JLOG(j->warn()) << "handleNewValidation: seq=" << seq
|
||||||
|
<< " hash=" << hash << " trusted=" << val->isTrusted()
|
||||||
|
<< " outcome="
|
||||||
|
<< (outcome == ValStatus::current
|
||||||
|
? "current"
|
||||||
|
: outcome == ValStatus::stale
|
||||||
|
? "stale"
|
||||||
|
: outcome == ValStatus::badSeq ? "badSeq"
|
||||||
|
: "other");
|
||||||
|
}
|
||||||
|
|
||||||
if (outcome == ValStatus::current)
|
if (outcome == ValStatus::current)
|
||||||
{
|
{
|
||||||
|
// For partial sync: track the network-observed ledger from ANY
|
||||||
|
// validation (not just trusted). This allows queries before
|
||||||
|
// trusted validators are fully configured.
|
||||||
|
app.getLedgerMaster().setNetworkObservedLedger(hash, seq);
|
||||||
|
|
||||||
if (val->isTrusted())
|
if (val->isTrusted())
|
||||||
{
|
{
|
||||||
// Was: app.getLedgerMaster().checkAccept(hash, seq);
|
// Was: app.getLedgerMaster().checkAccept(hash, seq);
|
||||||
@@ -198,6 +216,23 @@ handleNewValidation(
|
|||||||
app.getLedgerMaster().checkAccept(hash, seq);
|
app.getLedgerMaster().checkAccept(hash, seq);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Partial sync debug: only log untrusted validations during startup
|
||||||
|
// (before we have any validated ledger)
|
||||||
|
auto [lastHash, lastSeq] =
|
||||||
|
app.getLedgerMaster().getLastValidatedLedger();
|
||||||
|
if (lastSeq == 0)
|
||||||
|
{
|
||||||
|
auto jPartialSync = app.journal("PartialSync");
|
||||||
|
auto const quorum = app.validators().quorum();
|
||||||
|
auto const unlSize = app.validators().count();
|
||||||
|
JLOG(jPartialSync.debug())
|
||||||
|
<< "validation NOT trusted: seq=" << seq << " hash=" << hash
|
||||||
|
<< " unlSize=" << unlSize << " quorum=" << quorum
|
||||||
|
<< " (masterKey=" << (masterKey ? "found" : "none") << ")";
|
||||||
|
}
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -81,6 +81,13 @@ public:
|
|||||||
return mLedger;
|
return mLedger;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Returns true if we have the ledger header (may still be incomplete). */
|
||||||
|
bool
|
||||||
|
hasHeader() const
|
||||||
|
{
|
||||||
|
return mHaveHeader;
|
||||||
|
}
|
||||||
|
|
||||||
std::uint32_t
|
std::uint32_t
|
||||||
getSeq() const
|
getSeq() const
|
||||||
{
|
{
|
||||||
@@ -107,6 +114,26 @@ public:
|
|||||||
void
|
void
|
||||||
runData();
|
runData();
|
||||||
|
|
||||||
|
/** Add a node hash to the priority queue for immediate fetching.
|
||||||
|
Used by partial sync mode to prioritize nodes needed by queries.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
addPriorityHash(uint256 const& hash);
|
||||||
|
|
||||||
|
/** Check if a transaction hash has been seen in this ledger's txMap.
|
||||||
|
Used by submit_and_wait to find transactions in partial ledgers.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
hasTx(uint256 const& txHash) const;
|
||||||
|
|
||||||
|
/** Return the count of known transaction hashes (for debugging). */
|
||||||
|
std::size_t
|
||||||
|
knownTxCount() const
|
||||||
|
{
|
||||||
|
ScopedLockType sl(mtx_);
|
||||||
|
return knownTxHashes_.size();
|
||||||
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
touch()
|
touch()
|
||||||
{
|
{
|
||||||
@@ -176,9 +203,11 @@ private:
|
|||||||
clock_type::time_point mLastAction;
|
clock_type::time_point mLastAction;
|
||||||
|
|
||||||
std::shared_ptr<Ledger> mLedger;
|
std::shared_ptr<Ledger> mLedger;
|
||||||
|
//@@start state-tracking-members
|
||||||
bool mHaveHeader;
|
bool mHaveHeader;
|
||||||
bool mHaveState;
|
bool mHaveState;
|
||||||
bool mHaveTransactions;
|
bool mHaveTransactions;
|
||||||
|
//@@end state-tracking-members
|
||||||
bool mSignaled;
|
bool mSignaled;
|
||||||
bool mByHash;
|
bool mByHash;
|
||||||
std::uint32_t mSeq;
|
std::uint32_t mSeq;
|
||||||
@@ -186,6 +215,13 @@ private:
|
|||||||
|
|
||||||
std::set<uint256> mRecentNodes;
|
std::set<uint256> mRecentNodes;
|
||||||
|
|
||||||
|
// Priority nodes to fetch immediately (for partial sync queries)
|
||||||
|
std::set<uint256> priorityHashes_;
|
||||||
|
|
||||||
|
// Transaction hashes seen in incoming txMap leaf nodes (for
|
||||||
|
// submit_and_wait)
|
||||||
|
std::set<uint256> knownTxHashes_;
|
||||||
|
|
||||||
SHAMapAddNode mStats;
|
SHAMapAddNode mStats;
|
||||||
|
|
||||||
// Data we have received from peers
|
// Data we have received from peers
|
||||||
|
|||||||
@@ -23,6 +23,7 @@
|
|||||||
#include <ripple/app/ledger/InboundLedger.h>
|
#include <ripple/app/ledger/InboundLedger.h>
|
||||||
#include <ripple/protocol/RippleLedgerHash.h>
|
#include <ripple/protocol/RippleLedgerHash.h>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <optional>
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
@@ -56,6 +57,45 @@ public:
|
|||||||
virtual std::shared_ptr<InboundLedger>
|
virtual std::shared_ptr<InboundLedger>
|
||||||
find(LedgerHash const& hash) = 0;
|
find(LedgerHash const& hash) = 0;
|
||||||
|
|
||||||
|
/** Get a partial ledger (has header but may be incomplete).
|
||||||
|
Used for partial sync mode - allows RPC queries against
|
||||||
|
ledgers that are still being acquired.
|
||||||
|
@return The ledger if header exists and not failed, nullptr otherwise.
|
||||||
|
*/
|
||||||
|
virtual std::shared_ptr<Ledger const>
|
||||||
|
getPartialLedger(uint256 const& hash) = 0;
|
||||||
|
|
||||||
|
/** Find which partial ledger contains a transaction.
|
||||||
|
Used by submit_and_wait to locate transactions as they appear
|
||||||
|
in incoming ledgers' txMaps.
|
||||||
|
@param txHash The transaction hash to search for
|
||||||
|
@return The ledger hash if found, nullopt otherwise
|
||||||
|
*/
|
||||||
|
virtual std::optional<uint256>
|
||||||
|
findTxLedger(uint256 const& txHash) = 0;
|
||||||
|
|
||||||
|
/** Add a priority node hash for immediate fetching.
|
||||||
|
Used by partial sync mode to prioritize specific nodes
|
||||||
|
needed by queries.
|
||||||
|
@param ledgerSeq The ledger sequence being acquired
|
||||||
|
@param nodeHash The specific node hash to prioritize
|
||||||
|
*/
|
||||||
|
virtual void
|
||||||
|
addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) = 0;
|
||||||
|
|
||||||
|
/** Add a ledger range where TX fetching should be prioritized.
|
||||||
|
Ledgers in this range will fetch TX nodes BEFORE state nodes.
|
||||||
|
Used by submit_and_wait to quickly detect transactions.
|
||||||
|
@param start First ledger sequence (inclusive)
|
||||||
|
@param end Last ledger sequence (inclusive)
|
||||||
|
*/
|
||||||
|
virtual void
|
||||||
|
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) = 0;
|
||||||
|
|
||||||
|
/** Check if TX fetching should be prioritized for a ledger sequence. */
|
||||||
|
virtual bool
|
||||||
|
isTxPrioritized(std::uint32_t seq) const = 0;
|
||||||
|
|
||||||
// VFALCO TODO Remove the dependency on the Peer object.
|
// VFALCO TODO Remove the dependency on the Peer object.
|
||||||
//
|
//
|
||||||
virtual bool
|
virtual bool
|
||||||
|
|||||||
@@ -300,6 +300,38 @@ public:
|
|||||||
return !mValidLedger.empty();
|
return !mValidLedger.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//! Get the hash/seq of the last validated ledger (even if not resident).
|
||||||
|
std::pair<uint256, LedgerIndex>
|
||||||
|
getLastValidatedLedger()
|
||||||
|
{
|
||||||
|
std::lock_guard lock(m_mutex);
|
||||||
|
return mLastValidLedger;
|
||||||
|
}
|
||||||
|
|
||||||
|
//! For partial sync: set the network-observed ledger from any validation.
|
||||||
|
//! This allows queries before trusted validators are fully configured.
|
||||||
|
void
|
||||||
|
setNetworkObservedLedger(uint256 const& hash, LedgerIndex seq)
|
||||||
|
{
|
||||||
|
std::lock_guard lock(m_mutex);
|
||||||
|
if (seq > mNetworkObservedLedger.second)
|
||||||
|
{
|
||||||
|
JLOG(jPartialSync_.warn())
|
||||||
|
<< "network-observed ledger updated to seq=" << seq
|
||||||
|
<< " hash=" << hash;
|
||||||
|
mNetworkObservedLedger = std::make_pair(hash, seq);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//! Get the network-observed ledger (from any validations, not just
|
||||||
|
//! trusted).
|
||||||
|
std::pair<uint256, LedgerIndex>
|
||||||
|
getNetworkObservedLedger()
|
||||||
|
{
|
||||||
|
std::lock_guard lock(m_mutex);
|
||||||
|
return mNetworkObservedLedger;
|
||||||
|
}
|
||||||
|
|
||||||
// Returns the minimum ledger sequence in SQL database, if any.
|
// Returns the minimum ledger sequence in SQL database, if any.
|
||||||
std::optional<LedgerIndex>
|
std::optional<LedgerIndex>
|
||||||
minSqlSeq();
|
minSqlSeq();
|
||||||
@@ -349,6 +381,7 @@ private:
|
|||||||
|
|
||||||
Application& app_;
|
Application& app_;
|
||||||
beast::Journal m_journal;
|
beast::Journal m_journal;
|
||||||
|
beast::Journal jPartialSync_;
|
||||||
|
|
||||||
std::recursive_mutex mutable m_mutex;
|
std::recursive_mutex mutable m_mutex;
|
||||||
|
|
||||||
@@ -373,6 +406,9 @@ private:
|
|||||||
// Fully validated ledger, whether or not we have the ledger resident.
|
// Fully validated ledger, whether or not we have the ledger resident.
|
||||||
std::pair<uint256, LedgerIndex> mLastValidLedger{uint256(), 0};
|
std::pair<uint256, LedgerIndex> mLastValidLedger{uint256(), 0};
|
||||||
|
|
||||||
|
// Network-observed ledger from any validations (for partial sync).
|
||||||
|
std::pair<uint256, LedgerIndex> mNetworkObservedLedger{uint256(), 0};
|
||||||
|
|
||||||
LedgerHistory mLedgerHistory;
|
LedgerHistory mLedgerHistory;
|
||||||
|
|
||||||
CanonicalTXSet mHeldTransactions{uint256()};
|
CanonicalTXSet mHeldTransactions{uint256()};
|
||||||
|
|||||||
@@ -42,6 +42,7 @@ namespace ripple {
|
|||||||
|
|
||||||
using namespace std::chrono_literals;
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
|
//@@start tx-fetch-constants
|
||||||
enum {
|
enum {
|
||||||
// Number of peers to start with
|
// Number of peers to start with
|
||||||
peerCountStart = 5
|
peerCountStart = 5
|
||||||
@@ -70,6 +71,7 @@ enum {
|
|||||||
,
|
,
|
||||||
reqNodes = 12
|
reqNodes = 12
|
||||||
};
|
};
|
||||||
|
//@@end tx-fetch-constants
|
||||||
|
|
||||||
// millisecond for each ledger timeout
|
// millisecond for each ledger timeout
|
||||||
auto constexpr ledgerAcquireTimeout = 3000ms;
|
auto constexpr ledgerAcquireTimeout = 3000ms;
|
||||||
@@ -99,6 +101,8 @@ InboundLedger::InboundLedger(
|
|||||||
, mPeerSet(std::move(peerSet))
|
, mPeerSet(std::move(peerSet))
|
||||||
{
|
{
|
||||||
JLOG(journal_.trace()) << "Acquiring ledger " << hash_;
|
JLOG(journal_.trace()) << "Acquiring ledger " << hash_;
|
||||||
|
JLOG(app_.journal("TxTrack").warn())
|
||||||
|
<< "NEW LEDGER seq=" << seq << " hash=" << hash;
|
||||||
touch();
|
touch();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -190,6 +194,22 @@ InboundLedger::update(std::uint32_t seq)
|
|||||||
touch();
|
touch();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
InboundLedger::addPriorityHash(uint256 const& hash)
|
||||||
|
{
|
||||||
|
ScopedLockType sl(mtx_);
|
||||||
|
priorityHashes_.insert(hash);
|
||||||
|
JLOG(journal_.debug()) << "Added priority hash " << hash << " for ledger "
|
||||||
|
<< hash_;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool
|
||||||
|
InboundLedger::hasTx(uint256 const& txHash) const
|
||||||
|
{
|
||||||
|
ScopedLockType sl(mtx_);
|
||||||
|
return knownTxHashes_.count(txHash) > 0;
|
||||||
|
}
|
||||||
|
|
||||||
bool
|
bool
|
||||||
InboundLedger::checkLocal()
|
InboundLedger::checkLocal()
|
||||||
{
|
{
|
||||||
@@ -413,6 +433,7 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//@@start completion-check
|
||||||
if (mHaveTransactions && mHaveState)
|
if (mHaveTransactions && mHaveState)
|
||||||
{
|
{
|
||||||
JLOG(journal_.debug()) << "Had everything locally";
|
JLOG(journal_.debug()) << "Had everything locally";
|
||||||
@@ -420,6 +441,7 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
|
|||||||
assert(mLedger->read(keylet::fees()));
|
assert(mLedger->read(keylet::fees()));
|
||||||
mLedger->setImmutable();
|
mLedger->setImmutable();
|
||||||
}
|
}
|
||||||
|
//@@end completion-check
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Called with a lock by the PeerSet when the timer expires
|
/** Called with a lock by the PeerSet when the timer expires
|
||||||
@@ -586,6 +608,43 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle priority hashes immediately (for partial sync queries)
|
||||||
|
if (mHaveHeader && !priorityHashes_.empty())
|
||||||
|
{
|
||||||
|
JLOG(journal_.warn()) << "PRIORITY: trigger() sending "
|
||||||
|
<< priorityHashes_.size() << " priority requests";
|
||||||
|
|
||||||
|
protocol::TMGetObjectByHash tmBH;
|
||||||
|
tmBH.set_query(true);
|
||||||
|
tmBH.set_type(protocol::TMGetObjectByHash::otSTATE_NODE);
|
||||||
|
tmBH.set_ledgerhash(hash_.begin(), hash_.size());
|
||||||
|
|
||||||
|
for (auto const& h : priorityHashes_)
|
||||||
|
{
|
||||||
|
JLOG(journal_.warn()) << "PRIORITY: requesting node " << h;
|
||||||
|
protocol::TMIndexedObject* io = tmBH.add_objects();
|
||||||
|
io->set_hash(h.begin(), h.size());
|
||||||
|
if (mSeq != 0)
|
||||||
|
io->set_ledgerseq(mSeq);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send to all peers in our peer set
|
||||||
|
auto packet = std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS);
|
||||||
|
auto const& peerIds = mPeerSet->getPeerIds();
|
||||||
|
std::size_t sentCount = 0;
|
||||||
|
for (auto id : peerIds)
|
||||||
|
{
|
||||||
|
if (auto p = app_.overlay().findPeerByShortID(id))
|
||||||
|
{
|
||||||
|
p->send(packet);
|
||||||
|
++sentCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
JLOG(journal_.warn()) << "PRIORITY: sent to " << sentCount << " peers";
|
||||||
|
|
||||||
|
priorityHashes_.clear();
|
||||||
|
}
|
||||||
|
|
||||||
protocol::TMGetLedger tmGL;
|
protocol::TMGetLedger tmGL;
|
||||||
tmGL.set_ledgerhash(hash_.begin(), hash_.size());
|
tmGL.set_ledgerhash(hash_.begin(), hash_.size());
|
||||||
|
|
||||||
@@ -679,7 +738,12 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
|
|||||||
|
|
||||||
// Get the state data first because it's the most likely to be useful
|
// Get the state data first because it's the most likely to be useful
|
||||||
// if we wind up abandoning this fetch.
|
// if we wind up abandoning this fetch.
|
||||||
if (mHaveHeader && !mHaveState && !failed_)
|
// When TX is prioritized for this ledger range, skip state until TX
|
||||||
|
// complete.
|
||||||
|
bool const txPrioritized =
|
||||||
|
mSeq != 0 && app_.getInboundLedgers().isTxPrioritized(mSeq);
|
||||||
|
if (mHaveHeader && !mHaveState && !failed_ &&
|
||||||
|
!(txPrioritized && !mHaveTransactions))
|
||||||
{
|
{
|
||||||
assert(mLedger);
|
assert(mLedger);
|
||||||
|
|
||||||
@@ -898,6 +962,9 @@ InboundLedger::takeHeader(std::string const& data)
|
|||||||
mLedger->txMap().setLedgerSeq(mSeq);
|
mLedger->txMap().setLedgerSeq(mSeq);
|
||||||
mHaveHeader = true;
|
mHaveHeader = true;
|
||||||
|
|
||||||
|
JLOG(app_.journal("TxTrack").warn())
|
||||||
|
<< "GOT HEADER seq=" << mSeq << " txHash=" << mLedger->info().txHash;
|
||||||
|
|
||||||
Serializer s(data.size() + 4);
|
Serializer s(data.size() + 4);
|
||||||
s.add32(HashPrefix::ledgerMaster);
|
s.add32(HashPrefix::ledgerMaster);
|
||||||
s.addRaw(data.data(), data.size());
|
s.addRaw(data.data(), data.size());
|
||||||
@@ -967,6 +1034,33 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
|
|||||||
if (!nodeID)
|
if (!nodeID)
|
||||||
throw std::runtime_error("data does not properly deserialize");
|
throw std::runtime_error("data does not properly deserialize");
|
||||||
|
|
||||||
|
// For TX nodes, extract tx hash from leaf nodes for submit_and_wait
|
||||||
|
if (packet.type() == protocol::liTX_NODE)
|
||||||
|
{
|
||||||
|
auto const& data = node.nodedata();
|
||||||
|
// Leaf nodes have wire type as last byte
|
||||||
|
// Format: [tx+meta data...][32-byte tx hash][1-byte type]
|
||||||
|
if (data.size() >= 33)
|
||||||
|
{
|
||||||
|
uint8_t wireType =
|
||||||
|
static_cast<uint8_t>(data[data.size() - 1]);
|
||||||
|
// wireTypeTransactionWithMeta = 4
|
||||||
|
if (wireType == 4)
|
||||||
|
{
|
||||||
|
uint256 txHash;
|
||||||
|
std::memcpy(
|
||||||
|
txHash.data(), data.data() + data.size() - 33, 32);
|
||||||
|
auto [it, inserted] = knownTxHashes_.insert(txHash);
|
||||||
|
if (inserted)
|
||||||
|
{
|
||||||
|
JLOG(app_.journal("TxTrack").warn())
|
||||||
|
<< "GOT TX ledger=" << mSeq << " tx=" << txHash
|
||||||
|
<< " count=" << knownTxHashes_.size();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (nodeID->isRoot())
|
if (nodeID->isRoot())
|
||||||
{
|
{
|
||||||
san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f);
|
san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f);
|
||||||
|
|||||||
@@ -23,6 +23,7 @@
|
|||||||
#include <ripple/app/misc/NetworkOPs.h>
|
#include <ripple/app/misc/NetworkOPs.h>
|
||||||
#include <ripple/basics/DecayingSample.h>
|
#include <ripple/basics/DecayingSample.h>
|
||||||
#include <ripple/basics/Log.h>
|
#include <ripple/basics/Log.h>
|
||||||
|
#include <ripple/basics/RangeSet.h>
|
||||||
#include <ripple/beast/container/aged_map.h>
|
#include <ripple/beast/container/aged_map.h>
|
||||||
#include <ripple/beast/core/LexicalCast.h>
|
#include <ripple/beast/core/LexicalCast.h>
|
||||||
#include <ripple/core/JobQueue.h>
|
#include <ripple/core/JobQueue.h>
|
||||||
@@ -193,6 +194,89 @@ public:
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::shared_ptr<Ledger const>
|
||||||
|
getPartialLedger(uint256 const& hash) override
|
||||||
|
{
|
||||||
|
auto inbound = find(hash);
|
||||||
|
if (inbound && inbound->hasHeader() && !inbound->isFailed())
|
||||||
|
return inbound->getLedger();
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::optional<uint256>
|
||||||
|
findTxLedger(uint256 const& txHash) override
|
||||||
|
{
|
||||||
|
auto const swj = app_.journal("SubmitAndWait");
|
||||||
|
ScopedLockType sl(mLock);
|
||||||
|
JLOG(swj.debug()) << "findTxLedger tx=" << txHash << " searching "
|
||||||
|
<< mLedgers.size() << " inbound ledgers";
|
||||||
|
for (auto const& [hash, inbound] : mLedgers)
|
||||||
|
{
|
||||||
|
bool hasHdr = inbound->hasHeader();
|
||||||
|
bool failed = inbound->isFailed();
|
||||||
|
bool hasTx = hasHdr && !failed && inbound->hasTx(txHash);
|
||||||
|
JLOG(swj.trace())
|
||||||
|
<< "findTxLedger checking ledger seq=" << inbound->getSeq()
|
||||||
|
<< " hash=" << hash << " hasHeader=" << hasHdr
|
||||||
|
<< " failed=" << failed << " hasTx=" << hasTx;
|
||||||
|
if (hasTx)
|
||||||
|
{
|
||||||
|
JLOG(swj.warn()) << "findTxLedger FOUND tx=" << txHash
|
||||||
|
<< " in ledger seq=" << inbound->getSeq();
|
||||||
|
return hash;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
JLOG(swj.debug()) << "findTxLedger tx=" << txHash << " NOT FOUND";
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) override
|
||||||
|
{
|
||||||
|
std::shared_ptr<InboundLedger> inbound;
|
||||||
|
{
|
||||||
|
ScopedLockType sl(mLock);
|
||||||
|
// Find inbound ledger by sequence (need to iterate)
|
||||||
|
for (auto const& [hash, ledger] : mLedgers)
|
||||||
|
{
|
||||||
|
if (ledger->getSeq() == ledgerSeq && !ledger->isFailed() &&
|
||||||
|
!ledger->isComplete())
|
||||||
|
{
|
||||||
|
inbound = ledger;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (inbound)
|
||||||
|
{
|
||||||
|
inbound->addPriorityHash(nodeHash);
|
||||||
|
JLOG(j_.warn()) << "PRIORITY: added node " << nodeHash
|
||||||
|
<< " for ledger seq " << ledgerSeq;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
JLOG(j_.warn()) << "PRIORITY: no inbound ledger for seq "
|
||||||
|
<< ledgerSeq << " (node " << nodeHash << ")";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) override
|
||||||
|
{
|
||||||
|
std::lock_guard lock(txPriorityMutex_);
|
||||||
|
txPriorityRange_.insert(ClosedInterval<std::uint32_t>(start, end));
|
||||||
|
JLOG(j_.debug()) << "TX priority added for ledgers " << start << "-"
|
||||||
|
<< end;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool
|
||||||
|
isTxPrioritized(std::uint32_t seq) const override
|
||||||
|
{
|
||||||
|
std::lock_guard lock(txPriorityMutex_);
|
||||||
|
return boost::icl::contains(txPriorityRange_, seq);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
This gets called when
|
This gets called when
|
||||||
"We got some data from an inbound ledger"
|
"We got some data from an inbound ledger"
|
||||||
@@ -411,6 +495,11 @@ public:
|
|||||||
}
|
}
|
||||||
else if ((la + std::chrono::minutes(1)) < start)
|
else if ((la + std::chrono::minutes(1)) < start)
|
||||||
{
|
{
|
||||||
|
JLOG(app_.journal("SubmitAndWait").debug())
|
||||||
|
<< "sweep removing ledger seq=" << it->second->getSeq()
|
||||||
|
<< " complete=" << it->second->isComplete()
|
||||||
|
<< " failed=" << it->second->isFailed()
|
||||||
|
<< " knownTxCount=" << it->second->knownTxCount();
|
||||||
stuffToSweep.push_back(it->second);
|
stuffToSweep.push_back(it->second);
|
||||||
// shouldn't cause the actual final delete
|
// shouldn't cause the actual final delete
|
||||||
// since we are holding a reference in the vector.
|
// since we are holding a reference in the vector.
|
||||||
@@ -425,8 +514,8 @@ public:
|
|||||||
beast::expire(mRecentFailures, kReacquireInterval);
|
beast::expire(mRecentFailures, kReacquireInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
JLOG(j_.debug())
|
JLOG(app_.journal("SubmitAndWait").debug())
|
||||||
<< "Swept " << stuffToSweep.size() << " out of " << total
|
<< "sweep removed " << stuffToSweep.size() << " out of " << total
|
||||||
<< " inbound ledgers. Duration: "
|
<< " inbound ledgers. Duration: "
|
||||||
<< std::chrono::duration_cast<std::chrono::milliseconds>(
|
<< std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||||
m_clock.now() - start)
|
m_clock.now() - start)
|
||||||
@@ -461,6 +550,10 @@ private:
|
|||||||
|
|
||||||
std::set<uint256> pendingAcquires_;
|
std::set<uint256> pendingAcquires_;
|
||||||
std::mutex acquiresMutex_;
|
std::mutex acquiresMutex_;
|
||||||
|
|
||||||
|
// Ledger ranges where TX fetching should be prioritized
|
||||||
|
mutable std::mutex txPriorityMutex_;
|
||||||
|
RangeSet<std::uint32_t> txPriorityRange_;
|
||||||
};
|
};
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -187,6 +187,7 @@ LedgerMaster::LedgerMaster(
|
|||||||
beast::Journal journal)
|
beast::Journal journal)
|
||||||
: app_(app)
|
: app_(app)
|
||||||
, m_journal(journal)
|
, m_journal(journal)
|
||||||
|
, jPartialSync_(app.journal("PartialSync"))
|
||||||
, mLedgerHistory(collector, app)
|
, mLedgerHistory(collector, app)
|
||||||
, standalone_(app_.config().standalone())
|
, standalone_(app_.config().standalone())
|
||||||
, fetch_depth_(
|
, fetch_depth_(
|
||||||
@@ -1009,11 +1010,29 @@ LedgerMaster::checkAccept(uint256 const& hash, std::uint32_t seq)
|
|||||||
auto validations = app_.validators().negativeUNLFilter(
|
auto validations = app_.validators().negativeUNLFilter(
|
||||||
app_.getValidations().getTrustedForLedger(hash, seq));
|
app_.getValidations().getTrustedForLedger(hash, seq));
|
||||||
valCount = validations.size();
|
valCount = validations.size();
|
||||||
if (valCount >= app_.validators().quorum())
|
auto const quorum = app_.validators().quorum();
|
||||||
|
|
||||||
|
JLOG(jPartialSync_.warn())
|
||||||
|
<< "checkAccept: hash=" << hash << " seq=" << seq
|
||||||
|
<< " valCount=" << valCount << " quorum=" << quorum
|
||||||
|
<< " mLastValidLedger.seq=" << mLastValidLedger.second;
|
||||||
|
|
||||||
|
if (valCount >= quorum)
|
||||||
{
|
{
|
||||||
std::lock_guard ml(m_mutex);
|
std::lock_guard ml(m_mutex);
|
||||||
if (seq > mLastValidLedger.second)
|
if (seq > mLastValidLedger.second)
|
||||||
|
{
|
||||||
|
JLOG(jPartialSync_.warn())
|
||||||
|
<< "checkAccept: QUORUM REACHED - setting mLastValidLedger"
|
||||||
|
<< " seq=" << seq << " hash=" << hash;
|
||||||
mLastValidLedger = std::make_pair(hash, seq);
|
mLastValidLedger = std::make_pair(hash, seq);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
JLOG(jPartialSync_.debug())
|
||||||
|
<< "checkAccept: quorum not reached, need " << quorum
|
||||||
|
<< " have " << valCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (seq == mValidLedgerSeq)
|
if (seq == mValidLedgerSeq)
|
||||||
|
|||||||
@@ -216,6 +216,9 @@ public:
|
|||||||
bool bLocal,
|
bool bLocal,
|
||||||
FailHard failType) override;
|
FailHard failType) override;
|
||||||
|
|
||||||
|
std::optional<uint256>
|
||||||
|
broadcastRawTransaction(Blob const& txBlob) override;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For transactions submitted directly by a client, apply batch of
|
* For transactions submitted directly by a client, apply batch of
|
||||||
* transactions and wait for this transaction to complete.
|
* transactions and wait for this transaction to complete.
|
||||||
@@ -819,11 +822,13 @@ NetworkOPsImp::isNeedNetworkLedger()
|
|||||||
return needNetworkLedger_;
|
return needNetworkLedger_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//@@start is-full-check
|
||||||
inline bool
|
inline bool
|
||||||
NetworkOPsImp::isFull()
|
NetworkOPsImp::isFull()
|
||||||
{
|
{
|
||||||
return !needNetworkLedger_ && (mMode == OperatingMode::FULL);
|
return !needNetworkLedger_ && (mMode == OperatingMode::FULL);
|
||||||
}
|
}
|
||||||
|
//@@end is-full-check
|
||||||
|
|
||||||
std::string
|
std::string
|
||||||
NetworkOPsImp::getHostId(bool forAdmin)
|
NetworkOPsImp::getHostId(bool forAdmin)
|
||||||
@@ -1193,6 +1198,43 @@ NetworkOPsImp::processTransaction(
|
|||||||
doTransactionAsync(transaction, bUnlimited, failType);
|
doTransactionAsync(transaction, bUnlimited, failType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::optional<uint256>
|
||||||
|
NetworkOPsImp::broadcastRawTransaction(Blob const& txBlob)
|
||||||
|
{
|
||||||
|
// Parse the transaction blob to get the hash
|
||||||
|
std::shared_ptr<STTx const> stx;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
SerialIter sit(makeSlice(txBlob));
|
||||||
|
stx = std::make_shared<STTx const>(std::ref(sit));
|
||||||
|
}
|
||||||
|
catch (std::exception const& e)
|
||||||
|
{
|
||||||
|
JLOG(m_journal.warn())
|
||||||
|
<< "broadcastRawTransaction: Failed to parse tx blob: " << e.what();
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint256 txHash = stx->getTransactionID();
|
||||||
|
|
||||||
|
// Broadcast to all peers without local validation
|
||||||
|
protocol::TMTransaction msg;
|
||||||
|
Serializer s;
|
||||||
|
stx->add(s);
|
||||||
|
msg.set_rawtransaction(s.data(), s.size());
|
||||||
|
msg.set_status(protocol::tsNEW); // tsNEW = origin node could not validate
|
||||||
|
msg.set_receivetimestamp(
|
||||||
|
app_.timeKeeper().now().time_since_epoch().count());
|
||||||
|
|
||||||
|
app_.overlay().foreach(
|
||||||
|
send_always(std::make_shared<Message>(msg, protocol::mtTRANSACTION)));
|
||||||
|
|
||||||
|
JLOG(m_journal.info()) << "broadcastRawTransaction: Broadcast tx "
|
||||||
|
<< txHash;
|
||||||
|
|
||||||
|
return txHash;
|
||||||
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
NetworkOPsImp::doTransactionAsync(
|
NetworkOPsImp::doTransactionAsync(
|
||||||
std::shared_ptr<Transaction> transaction,
|
std::shared_ptr<Transaction> transaction,
|
||||||
@@ -1459,6 +1501,7 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
|
|||||||
bool const isEmitted =
|
bool const isEmitted =
|
||||||
hook::isEmittedTxn(*(e.transaction->getSTransaction()));
|
hook::isEmittedTxn(*(e.transaction->getSTransaction()));
|
||||||
|
|
||||||
|
//@@start tx-relay
|
||||||
if (toSkip && !isEmitted)
|
if (toSkip && !isEmitted)
|
||||||
{
|
{
|
||||||
protocol::TMTransaction tx;
|
protocol::TMTransaction tx;
|
||||||
@@ -1474,6 +1517,7 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
|
|||||||
app_.overlay().relay(e.transaction->getID(), tx, *toSkip);
|
app_.overlay().relay(e.transaction->getID(), tx, *toSkip);
|
||||||
e.transaction->setBroadcast();
|
e.transaction->setBroadcast();
|
||||||
}
|
}
|
||||||
|
//@@end tx-relay
|
||||||
}
|
}
|
||||||
|
|
||||||
if (validatedLedgerIndex)
|
if (validatedLedgerIndex)
|
||||||
@@ -1700,6 +1744,14 @@ NetworkOPsImp::checkLastClosedLedger(
|
|||||||
if (!switchLedgers)
|
if (!switchLedgers)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
// Safety check: can't acquire a ledger with an invalid hash
|
||||||
|
if (!closedLedger.isNonZero())
|
||||||
|
{
|
||||||
|
JLOG(m_journal.warn())
|
||||||
|
<< "checkLastClosedLedger: closedLedger hash is zero, skipping";
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
auto consensus = m_ledgerMaster.getLedgerByHash(closedLedger);
|
auto consensus = m_ledgerMaster.getLedgerByHash(closedLedger);
|
||||||
|
|
||||||
if (!consensus)
|
if (!consensus)
|
||||||
@@ -1903,6 +1955,7 @@ NetworkOPsImp::endConsensus()
|
|||||||
// timing to make sure there shouldn't be a newer LCL. We need this
|
// timing to make sure there shouldn't be a newer LCL. We need this
|
||||||
// information to do the next three tests.
|
// information to do the next three tests.
|
||||||
|
|
||||||
|
//@@start mode-transitions
|
||||||
if (((mMode == OperatingMode::CONNECTED) ||
|
if (((mMode == OperatingMode::CONNECTED) ||
|
||||||
(mMode == OperatingMode::SYNCING)) &&
|
(mMode == OperatingMode::SYNCING)) &&
|
||||||
!ledgerChange)
|
!ledgerChange)
|
||||||
@@ -1928,8 +1981,11 @@ NetworkOPsImp::endConsensus()
|
|||||||
setMode(OperatingMode::FULL);
|
setMode(OperatingMode::FULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
//@@end mode-transitions
|
||||||
|
|
||||||
|
//@@start consensus-gate
|
||||||
beginConsensus(networkClosed);
|
beginConsensus(networkClosed);
|
||||||
|
//@@end consensus-gate
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
|||||||
@@ -32,6 +32,7 @@
|
|||||||
#include <boost/asio.hpp>
|
#include <boost/asio.hpp>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <optional>
|
||||||
#include <tuple>
|
#include <tuple>
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
@@ -112,6 +113,17 @@ public:
|
|||||||
bool bLocal,
|
bool bLocal,
|
||||||
FailHard failType) = 0;
|
FailHard failType) = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Broadcast a raw transaction to peers without local validation.
|
||||||
|
* Used by submit_and_wait during partial sync mode when local state
|
||||||
|
* is not available for validation.
|
||||||
|
*
|
||||||
|
* @param txBlob The raw serialized transaction blob
|
||||||
|
* @return The transaction hash, or nullopt if parsing failed
|
||||||
|
*/
|
||||||
|
virtual std::optional<uint256>
|
||||||
|
broadcastRawTransaction(Blob const& txBlob) = 0;
|
||||||
|
|
||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
//
|
//
|
||||||
// Owner functions
|
// Owner functions
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ namespace ripple {
|
|||||||
not change them without verifying each use and ensuring that it is
|
not change them without verifying each use and ensuring that it is
|
||||||
not a breaking change.
|
not a breaking change.
|
||||||
*/
|
*/
|
||||||
|
//@@start operating-mode-enum
|
||||||
enum class OperatingMode {
|
enum class OperatingMode {
|
||||||
DISCONNECTED = 0, //!< not ready to process requests
|
DISCONNECTED = 0, //!< not ready to process requests
|
||||||
CONNECTED = 1, //!< convinced we are talking to the network
|
CONNECTED = 1, //!< convinced we are talking to the network
|
||||||
@@ -38,6 +39,7 @@ enum class OperatingMode {
|
|||||||
TRACKING = 3, //!< convinced we agree with the network
|
TRACKING = 3, //!< convinced we agree with the network
|
||||||
FULL = 4 //!< we have the ledger and can even validate
|
FULL = 4 //!< we have the ledger and can even validate
|
||||||
};
|
};
|
||||||
|
//@@end operating-mode-enum
|
||||||
|
|
||||||
class StateAccounting
|
class StateAccounting
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -471,6 +471,10 @@ ManifestCache::applyManifest(Manifest m)
|
|||||||
|
|
||||||
auto masterKey = m.masterKey;
|
auto masterKey = m.masterKey;
|
||||||
map_.emplace(std::move(masterKey), std::move(m));
|
map_.emplace(std::move(masterKey), std::move(m));
|
||||||
|
|
||||||
|
// Something has changed. Keep track of it.
|
||||||
|
seq_++;
|
||||||
|
|
||||||
return ManifestDisposition::accepted;
|
return ManifestDisposition::accepted;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -895,6 +895,16 @@ ValidatorList::applyListsAndBroadcast(
|
|||||||
if (good)
|
if (good)
|
||||||
{
|
{
|
||||||
networkOPs.clearUNLBlocked();
|
networkOPs.clearUNLBlocked();
|
||||||
|
// For partial sync: trigger early quorum calculation so
|
||||||
|
// validations can be trusted before consensus starts
|
||||||
|
JLOG(j_.warn()) << "All publisher lists available, triggering "
|
||||||
|
"early updateTrusted for partial sync";
|
||||||
|
updateTrusted(
|
||||||
|
{}, // empty seenValidators - we just need quorum calculated
|
||||||
|
timeKeeper_.now(),
|
||||||
|
networkOPs,
|
||||||
|
overlay,
|
||||||
|
hashRouter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bool broadcast = disposition <= ListDisposition::known_sequence;
|
bool broadcast = disposition <= ListDisposition::known_sequence;
|
||||||
|
|||||||
@@ -166,6 +166,7 @@ ValidatorSite::load(
|
|||||||
void
|
void
|
||||||
ValidatorSite::start()
|
ValidatorSite::start()
|
||||||
{
|
{
|
||||||
|
JLOG(j_.warn()) << "ValidatorSite::start() called";
|
||||||
std::lock_guard l0{sites_mutex_};
|
std::lock_guard l0{sites_mutex_};
|
||||||
std::lock_guard l1{state_mutex_};
|
std::lock_guard l1{state_mutex_};
|
||||||
if (timer_.expires_at() == clock_type::time_point{})
|
if (timer_.expires_at() == clock_type::time_point{})
|
||||||
@@ -218,6 +219,11 @@ ValidatorSite::setTimer(
|
|||||||
if (next != sites_.end())
|
if (next != sites_.end())
|
||||||
{
|
{
|
||||||
pending_ = next->nextRefresh <= clock_type::now();
|
pending_ = next->nextRefresh <= clock_type::now();
|
||||||
|
auto delay = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||||
|
next->nextRefresh - clock_type::now());
|
||||||
|
JLOG(j_.warn()) << "ValidatorSite::setTimer() pending=" << pending_
|
||||||
|
<< " delay=" << delay.count() << "ms"
|
||||||
|
<< " uri=" << next->startingResource->uri;
|
||||||
cv_.notify_all();
|
cv_.notify_all();
|
||||||
timer_.expires_at(next->nextRefresh);
|
timer_.expires_at(next->nextRefresh);
|
||||||
auto idx = std::distance(sites_.begin(), next);
|
auto idx = std::distance(sites_.begin(), next);
|
||||||
@@ -225,6 +231,10 @@ ValidatorSite::setTimer(
|
|||||||
this->onTimer(idx, ec);
|
this->onTimer(idx, ec);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
JLOG(j_.warn()) << "ValidatorSite::setTimer() no sites configured";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
@@ -339,6 +349,8 @@ ValidatorSite::onRequestTimeout(std::size_t siteIdx, error_code const& ec)
|
|||||||
void
|
void
|
||||||
ValidatorSite::onTimer(std::size_t siteIdx, error_code const& ec)
|
ValidatorSite::onTimer(std::size_t siteIdx, error_code const& ec)
|
||||||
{
|
{
|
||||||
|
JLOG(j_.warn()) << "ValidatorSite::onTimer() fired for site " << siteIdx
|
||||||
|
<< " ec=" << ec.message();
|
||||||
if (ec)
|
if (ec)
|
||||||
{
|
{
|
||||||
// Restart the timer if any errors are encountered, unless the error
|
// Restart the timer if any errors are encountered, unless the error
|
||||||
|
|||||||
@@ -21,6 +21,7 @@
|
|||||||
#define RIPPLE_BASICS_LOCALVALUE_H_INCLUDED
|
#define RIPPLE_BASICS_LOCALVALUE_H_INCLUDED
|
||||||
|
|
||||||
#include <boost/thread/tss.hpp>
|
#include <boost/thread/tss.hpp>
|
||||||
|
#include <chrono>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
|
||||||
@@ -33,6 +34,12 @@ struct LocalValues
|
|||||||
explicit LocalValues() = default;
|
explicit LocalValues() = default;
|
||||||
|
|
||||||
bool onCoro = true;
|
bool onCoro = true;
|
||||||
|
void* coroPtr = nullptr; // Pointer to owning JobQueue::Coro (if any)
|
||||||
|
|
||||||
|
// Configurable timeout for SHAMap node fetching during partial sync.
|
||||||
|
// Zero means use the default (30s). RPC handlers can set this to
|
||||||
|
// customize poll-wait behavior.
|
||||||
|
std::chrono::milliseconds fetchTimeout{0};
|
||||||
|
|
||||||
struct BasicValue
|
struct BasicValue
|
||||||
{
|
{
|
||||||
@@ -127,6 +134,38 @@ LocalValue<T>::operator*()
|
|||||||
.emplace(this, std::make_unique<detail::LocalValues::Value<T>>(t_))
|
.emplace(this, std::make_unique<detail::LocalValues::Value<T>>(t_))
|
||||||
.first->second->get());
|
.first->second->get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns pointer to current coroutine if running inside one, nullptr otherwise
|
||||||
|
inline void*
|
||||||
|
getCurrentCoroPtr()
|
||||||
|
{
|
||||||
|
auto lvs = detail::getLocalValues().get();
|
||||||
|
if (lvs && lvs->onCoro)
|
||||||
|
return lvs->coroPtr;
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the configured fetch timeout for current coroutine context.
|
||||||
|
// Returns 0ms if not in a coroutine or no custom timeout set.
|
||||||
|
inline std::chrono::milliseconds
|
||||||
|
getCoroFetchTimeout()
|
||||||
|
{
|
||||||
|
auto lvs = detail::getLocalValues().get();
|
||||||
|
if (lvs && lvs->onCoro)
|
||||||
|
return lvs->fetchTimeout;
|
||||||
|
return std::chrono::milliseconds{0};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the fetch timeout for the current coroutine context.
|
||||||
|
// Only works if called from within a coroutine.
|
||||||
|
inline void
|
||||||
|
setCoroFetchTimeout(std::chrono::milliseconds timeout)
|
||||||
|
{
|
||||||
|
auto lvs = detail::getLocalValues().get();
|
||||||
|
if (lvs && lvs->onCoro)
|
||||||
|
lvs->fetchTimeout = timeout;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace ripple
|
} // namespace ripple
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@@ -21,6 +21,7 @@
|
|||||||
#define RIPPLE_CORE_COROINL_H_INCLUDED
|
#define RIPPLE_CORE_COROINL_H_INCLUDED
|
||||||
|
|
||||||
#include <ripple/basics/ByteUtilities.h>
|
#include <ripple/basics/ByteUtilities.h>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
@@ -48,6 +49,7 @@ JobQueue::Coro::Coro(
|
|||||||
},
|
},
|
||||||
boost::coroutines::attributes(megabytes(1)))
|
boost::coroutines::attributes(megabytes(1)))
|
||||||
{
|
{
|
||||||
|
lvs_.coroPtr = this;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline JobQueue::Coro::~Coro()
|
inline JobQueue::Coro::~Coro()
|
||||||
@@ -57,6 +59,7 @@ inline JobQueue::Coro::~Coro()
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//@@start coro-yield
|
||||||
inline void
|
inline void
|
||||||
JobQueue::Coro::yield() const
|
JobQueue::Coro::yield() const
|
||||||
{
|
{
|
||||||
@@ -66,6 +69,7 @@ JobQueue::Coro::yield() const
|
|||||||
}
|
}
|
||||||
(*yield_)();
|
(*yield_)();
|
||||||
}
|
}
|
||||||
|
//@@end coro-yield
|
||||||
|
|
||||||
inline bool
|
inline bool
|
||||||
JobQueue::Coro::post()
|
JobQueue::Coro::post()
|
||||||
@@ -89,6 +93,7 @@ JobQueue::Coro::post()
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//@@start coro-resume
|
||||||
inline void
|
inline void
|
||||||
JobQueue::Coro::resume()
|
JobQueue::Coro::resume()
|
||||||
{
|
{
|
||||||
@@ -111,6 +116,7 @@ JobQueue::Coro::resume()
|
|||||||
running_ = false;
|
running_ = false;
|
||||||
cv_.notify_all();
|
cv_.notify_all();
|
||||||
}
|
}
|
||||||
|
//@@end coro-resume
|
||||||
|
|
||||||
inline bool
|
inline bool
|
||||||
JobQueue::Coro::runnable() const
|
JobQueue::Coro::runnable() const
|
||||||
@@ -146,6 +152,62 @@ JobQueue::Coro::join()
|
|||||||
cv_.wait(lk, [this]() { return running_ == false; });
|
cv_.wait(lk, [this]() { return running_ == false; });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline bool
|
||||||
|
JobQueue::Coro::postAndYield()
|
||||||
|
{
|
||||||
|
{
|
||||||
|
std::lock_guard lk(mutex_run_);
|
||||||
|
running_ = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flag starts false - will be set true right before yield()
|
||||||
|
yielding_.store(false, std::memory_order_release);
|
||||||
|
|
||||||
|
// Post a job that waits for yield to be ready, then resumes
|
||||||
|
if (!jq_.addJob(
|
||||||
|
type_, name_, [this, sp = shared_from_this()]() {
|
||||||
|
// Spin-wait until yield() is about to happen
|
||||||
|
// yielding_ is set true immediately before (*yield_)() is called
|
||||||
|
while (!yielding_.load(std::memory_order_acquire))
|
||||||
|
std::this_thread::yield();
|
||||||
|
resume();
|
||||||
|
}))
|
||||||
|
{
|
||||||
|
std::lock_guard lk(mutex_run_);
|
||||||
|
running_ = false;
|
||||||
|
cv_.notify_all();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Signal that we're about to yield, then yield
|
||||||
|
yielding_.store(true, std::memory_order_release);
|
||||||
|
yield();
|
||||||
|
|
||||||
|
// Clear flag after resuming
|
||||||
|
yielding_.store(false, std::memory_order_release);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline bool
|
||||||
|
JobQueue::Coro::sleepFor(std::chrono::milliseconds delay)
|
||||||
|
{
|
||||||
|
{
|
||||||
|
std::lock_guard lk(mutex_run_);
|
||||||
|
running_ = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a detached thread that sleeps and then posts resume job
|
||||||
|
// This frees up the job queue thread during the sleep
|
||||||
|
std::thread([sp = shared_from_this(), delay]() {
|
||||||
|
std::this_thread::sleep_for(delay);
|
||||||
|
// Post a job to resume the coroutine
|
||||||
|
sp->post();
|
||||||
|
}).detach();
|
||||||
|
|
||||||
|
yield();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace ripple
|
} // namespace ripple
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@@ -29,6 +29,7 @@
|
|||||||
#include <boost/coroutine/all.hpp>
|
#include <boost/coroutine/all.hpp>
|
||||||
#include <boost/range/begin.hpp> // workaround for boost 1.72 bug
|
#include <boost/range/begin.hpp> // workaround for boost 1.72 bug
|
||||||
#include <boost/range/end.hpp> // workaround for boost 1.72 bug
|
#include <boost/range/end.hpp> // workaround for boost 1.72 bug
|
||||||
|
#include <atomic>
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
@@ -69,6 +70,7 @@ public:
|
|||||||
std::condition_variable cv_;
|
std::condition_variable cv_;
|
||||||
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
|
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
|
||||||
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
|
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
|
||||||
|
std::atomic<bool> yielding_{false}; // For postAndYield synchronization
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
bool finished_ = false;
|
bool finished_ = false;
|
||||||
#endif
|
#endif
|
||||||
@@ -136,6 +138,22 @@ public:
|
|||||||
/** Waits until coroutine returns from the user function. */
|
/** Waits until coroutine returns from the user function. */
|
||||||
void
|
void
|
||||||
join();
|
join();
|
||||||
|
|
||||||
|
/** Combined post and yield for poll-wait patterns.
|
||||||
|
Safely schedules resume before yielding, avoiding race conditions.
|
||||||
|
@return true if successfully posted and yielded, false if job queue
|
||||||
|
stopping.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
postAndYield();
|
||||||
|
|
||||||
|
/** Sleep for a duration without blocking the job queue thread.
|
||||||
|
Yields the coroutine and schedules resume after the delay.
|
||||||
|
@param delay The duration to sleep.
|
||||||
|
@return true if successfully slept, false if job queue stopping.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
sleepFor(std::chrono::milliseconds delay);
|
||||||
};
|
};
|
||||||
|
|
||||||
using JobFunction = std::function<void()>;
|
using JobFunction = std::function<void()>;
|
||||||
|
|||||||
@@ -2751,6 +2751,12 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
|
|||||||
bool pLDo = true;
|
bool pLDo = true;
|
||||||
bool progress = false;
|
bool progress = false;
|
||||||
|
|
||||||
|
// For state/transaction node requests, store directly to db
|
||||||
|
// (not fetch pack) so partial sync queries can find them immediately
|
||||||
|
bool const directStore =
|
||||||
|
packet.type() == protocol::TMGetObjectByHash::otSTATE_NODE ||
|
||||||
|
packet.type() == protocol::TMGetObjectByHash::otTRANSACTION_NODE;
|
||||||
|
|
||||||
for (int i = 0; i < packet.objects_size(); ++i)
|
for (int i = 0; i < packet.objects_size(); ++i)
|
||||||
{
|
{
|
||||||
const protocol::TMIndexedObject& obj = packet.objects(i);
|
const protocol::TMIndexedObject& obj = packet.objects(i);
|
||||||
@@ -2783,10 +2789,33 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
|
|||||||
{
|
{
|
||||||
uint256 const hash{obj.hash()};
|
uint256 const hash{obj.hash()};
|
||||||
|
|
||||||
app_.getLedgerMaster().addFetchPack(
|
if (directStore)
|
||||||
hash,
|
{
|
||||||
std::make_shared<Blob>(
|
// Store directly to node store for immediate
|
||||||
obj.data().begin(), obj.data().end()));
|
// availability
|
||||||
|
auto const hotType =
|
||||||
|
(packet.type() ==
|
||||||
|
protocol::TMGetObjectByHash::otSTATE_NODE)
|
||||||
|
? hotACCOUNT_NODE
|
||||||
|
: hotTRANSACTION_NODE;
|
||||||
|
|
||||||
|
JLOG(p_journal_.warn())
|
||||||
|
<< "PRIORITY: received node " << hash << " for seq "
|
||||||
|
<< pLSeq << " storing to db";
|
||||||
|
|
||||||
|
app_.getNodeStore().store(
|
||||||
|
hotType,
|
||||||
|
Blob(obj.data().begin(), obj.data().end()),
|
||||||
|
hash,
|
||||||
|
pLSeq);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
app_.getLedgerMaster().addFetchPack(
|
||||||
|
hash,
|
||||||
|
std::make_shared<Blob>(
|
||||||
|
obj.data().begin(), obj.data().end()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -61,10 +61,12 @@ enum error_code_i {
|
|||||||
rpcAMENDMENT_BLOCKED = 14,
|
rpcAMENDMENT_BLOCKED = 14,
|
||||||
|
|
||||||
// Networking
|
// Networking
|
||||||
|
//@@start network-error-codes
|
||||||
rpcNO_CLOSED = 15,
|
rpcNO_CLOSED = 15,
|
||||||
rpcNO_CURRENT = 16,
|
rpcNO_CURRENT = 16,
|
||||||
rpcNO_NETWORK = 17,
|
rpcNO_NETWORK = 17,
|
||||||
rpcNOT_SYNCED = 18,
|
rpcNOT_SYNCED = 18,
|
||||||
|
//@@end network-error-codes
|
||||||
|
|
||||||
// Ledger state
|
// Ledger state
|
||||||
rpcACT_NOT_FOUND = 19,
|
rpcACT_NOT_FOUND = 19,
|
||||||
|
|||||||
@@ -88,9 +88,11 @@ constexpr static ErrorInfo unorderedErrorInfos[]{
|
|||||||
{rpcNOT_SUPPORTED, "notSupported", "Operation not supported.", 501},
|
{rpcNOT_SUPPORTED, "notSupported", "Operation not supported.", 501},
|
||||||
{rpcNO_CLOSED, "noClosed", "Closed ledger is unavailable.", 503},
|
{rpcNO_CLOSED, "noClosed", "Closed ledger is unavailable.", 503},
|
||||||
{rpcNO_CURRENT, "noCurrent", "Current ledger is unavailable.", 503},
|
{rpcNO_CURRENT, "noCurrent", "Current ledger is unavailable.", 503},
|
||||||
|
//@@start network-error-messages
|
||||||
{rpcNOT_SYNCED, "notSynced", "Not synced to the network.", 503},
|
{rpcNOT_SYNCED, "notSynced", "Not synced to the network.", 503},
|
||||||
{rpcNO_EVENTS, "noEvents", "Current transport does not support events.", 405},
|
{rpcNO_EVENTS, "noEvents", "Current transport does not support events.", 405},
|
||||||
{rpcNO_NETWORK, "noNetwork", "Not synced to the network.", 503},
|
{rpcNO_NETWORK, "noNetwork", "Not synced to the network.", 503},
|
||||||
|
//@@end network-error-messages
|
||||||
{rpcWRONG_NETWORK, "wrongNetwork", "Wrong network.", 503},
|
{rpcWRONG_NETWORK, "wrongNetwork", "Wrong network.", 503},
|
||||||
{rpcNO_PERMISSION, "noPermission", "You don't have permission for this command.", 401},
|
{rpcNO_PERMISSION, "noPermission", "You don't have permission for this command.", 401},
|
||||||
{rpcNO_PF_REQUEST, "noPathRequest", "No pathfinding request in progress.", 404},
|
{rpcNO_PF_REQUEST, "noPathRequest", "No pathfinding request in progress.", 404},
|
||||||
|
|||||||
@@ -147,6 +147,8 @@ doSubmit(RPC::JsonContext&);
|
|||||||
Json::Value
|
Json::Value
|
||||||
doSubmitMultiSigned(RPC::JsonContext&);
|
doSubmitMultiSigned(RPC::JsonContext&);
|
||||||
Json::Value
|
Json::Value
|
||||||
|
doSubmitAndWait(RPC::JsonContext&);
|
||||||
|
Json::Value
|
||||||
doSubscribe(RPC::JsonContext&);
|
doSubscribe(RPC::JsonContext&);
|
||||||
Json::Value
|
Json::Value
|
||||||
doTransactionEntry(RPC::JsonContext&);
|
doTransactionEntry(RPC::JsonContext&);
|
||||||
|
|||||||
329
src/ripple/rpc/handlers/SubmitAndWait.cpp
Normal file
329
src/ripple/rpc/handlers/SubmitAndWait.cpp
Normal file
@@ -0,0 +1,329 @@
|
|||||||
|
//------------------------------------------------------------------------------
|
||||||
|
/*
|
||||||
|
This file is part of rippled: https://github.com/ripple/rippled
|
||||||
|
Copyright (c) 2024 XRPL Labs
|
||||||
|
|
||||||
|
Permission to use, copy, modify, and/or distribute this software for any
|
||||||
|
purpose with or without fee is hereby granted, provided that the above
|
||||||
|
copyright notice and this permission notice appear in all copies.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||||
|
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||||
|
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||||
|
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||||
|
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||||
|
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||||
|
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||||
|
*/
|
||||||
|
//==============================================================================
|
||||||
|
|
||||||
|
#include <ripple/app/consensus/RCLValidations.h>
|
||||||
|
#include <ripple/app/ledger/InboundLedgers.h>
|
||||||
|
#include <ripple/app/ledger/LedgerMaster.h>
|
||||||
|
#include <ripple/app/main/Application.h>
|
||||||
|
#include <ripple/app/misc/NetworkOPs.h>
|
||||||
|
#include <ripple/app/misc/ValidatorList.h>
|
||||||
|
#include <ripple/basics/LocalValue.h>
|
||||||
|
#include <ripple/basics/StringUtilities.h>
|
||||||
|
#include <ripple/net/RPCErr.h>
|
||||||
|
#include <ripple/protocol/ErrorCodes.h>
|
||||||
|
#include <ripple/protocol/jss.h>
|
||||||
|
#include <ripple/rpc/Context.h>
|
||||||
|
#include <ripple/rpc/DeliveredAmount.h>
|
||||||
|
#include <ripple/rpc/impl/RPCHelpers.h>
|
||||||
|
|
||||||
|
namespace ripple {
|
||||||
|
|
||||||
|
// Custom journal partition for submit_and_wait debugging
|
||||||
|
// Configure with [rpc_startup] { "command": "log_level", "partition":
|
||||||
|
// "SubmitAndWait", "severity": "debug" }
|
||||||
|
#define SWLOG(level) JLOG(context.app.journal("SubmitAndWait").level())
|
||||||
|
|
||||||
|
// {
|
||||||
|
// tx_blob: <hex-encoded signed transaction>
|
||||||
|
// timeout: <optional, max wait time in seconds, default 60>
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// Submit a transaction and wait for it to appear in a VALIDATED ledger.
|
||||||
|
// Designed for partial sync mode where the node may not have full state
|
||||||
|
// to validate locally - broadcasts raw transaction and monitors incoming
|
||||||
|
// ledgers for the result.
|
||||||
|
//
|
||||||
|
// The handler waits until:
|
||||||
|
// 1. Transaction is found in a ledger, AND
|
||||||
|
// 2. That ledger reaches validation quorum (enough trusted validators)
|
||||||
|
//
|
||||||
|
// Response:
|
||||||
|
// "validated": true - Transaction confirmed in validated ledger
|
||||||
|
// "error": "timeout" - Timeout waiting
|
||||||
|
// "error": "expired" - LastLedgerSequence exceeded
|
||||||
|
Json::Value
|
||||||
|
doSubmitAndWait(RPC::JsonContext& context)
|
||||||
|
{
|
||||||
|
Json::Value jvResult;
|
||||||
|
|
||||||
|
// Must have coroutine for polling
|
||||||
|
if (!context.coro)
|
||||||
|
{
|
||||||
|
return RPC::make_error(
|
||||||
|
rpcINTERNAL, "submit_and_wait requires coroutine context");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse tx_blob
|
||||||
|
if (!context.params.isMember(jss::tx_blob))
|
||||||
|
{
|
||||||
|
return rpcError(rpcINVALID_PARAMS);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto const txBlobHex = context.params[jss::tx_blob].asString();
|
||||||
|
auto const txBlob = strUnHex(txBlobHex);
|
||||||
|
|
||||||
|
if (!txBlob || txBlob->empty())
|
||||||
|
{
|
||||||
|
return rpcError(rpcINVALID_PARAMS);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse the transaction to get hash and LastLedgerSequence
|
||||||
|
std::shared_ptr<STTx const> stx;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
SerialIter sit(makeSlice(*txBlob));
|
||||||
|
stx = std::make_shared<STTx const>(std::ref(sit));
|
||||||
|
}
|
||||||
|
catch (std::exception& e)
|
||||||
|
{
|
||||||
|
jvResult[jss::error] = "invalidTransaction";
|
||||||
|
jvResult[jss::error_exception] = e.what();
|
||||||
|
return jvResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint256 const txHash = stx->getTransactionID();
|
||||||
|
|
||||||
|
// Extract LastLedgerSequence if present
|
||||||
|
std::optional<std::uint32_t> lastLedgerSeq;
|
||||||
|
if (stx->isFieldPresent(sfLastLedgerSequence))
|
||||||
|
{
|
||||||
|
lastLedgerSeq = stx->getFieldU32(sfLastLedgerSequence);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse timeout (default 60 seconds, max 120 seconds)
|
||||||
|
auto timeout = std::chrono::seconds(60);
|
||||||
|
if (context.params.isMember("timeout"))
|
||||||
|
{
|
||||||
|
auto const t = context.params["timeout"].asUInt();
|
||||||
|
if (t > 120)
|
||||||
|
{
|
||||||
|
return RPC::make_error(
|
||||||
|
rpcINVALID_PARAMS, "timeout must be <= 120 seconds");
|
||||||
|
}
|
||||||
|
timeout = std::chrono::seconds(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set coroutine-local fetch timeout for SHAMap operations
|
||||||
|
setCoroFetchTimeout(
|
||||||
|
std::chrono::duration_cast<std::chrono::milliseconds>(timeout / 2));
|
||||||
|
|
||||||
|
SWLOG(warn) << "starting for tx=" << txHash
|
||||||
|
<< " lastLedgerSeq=" << (lastLedgerSeq ? *lastLedgerSeq : 0)
|
||||||
|
<< " timeout=" << timeout.count() << "s";
|
||||||
|
|
||||||
|
// Poll for the transaction result
|
||||||
|
constexpr auto pollInterval = std::chrono::milliseconds(10);
|
||||||
|
auto const startTime = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
|
// Broadcast IMMEDIATELY - don't wait for anything
|
||||||
|
SWLOG(warn) << "broadcasting tx=" << txHash;
|
||||||
|
auto broadcastResult = context.netOps.broadcastRawTransaction(*txBlob);
|
||||||
|
if (!broadcastResult)
|
||||||
|
{
|
||||||
|
SWLOG(warn) << "broadcast FAILED for tx=" << txHash;
|
||||||
|
jvResult[jss::error] = "broadcastFailed";
|
||||||
|
jvResult[jss::error_exception] =
|
||||||
|
"Failed to parse/broadcast transaction";
|
||||||
|
return jvResult;
|
||||||
|
}
|
||||||
|
SWLOG(warn) << "broadcast SUCCESS for tx=" << txHash;
|
||||||
|
|
||||||
|
// Prioritize TX fetching for ledgers in our window
|
||||||
|
// This makes TX nodes fetch before state nodes for faster detection
|
||||||
|
auto const startSeq = context.ledgerMaster.getValidLedgerIndex();
|
||||||
|
auto const endSeq = lastLedgerSeq.value_or(startSeq + 20);
|
||||||
|
context.app.getInboundLedgers().prioritizeTxForLedgers(startSeq, endSeq);
|
||||||
|
|
||||||
|
jvResult[jss::tx_hash] = to_string(txHash);
|
||||||
|
jvResult[jss::broadcast] = true;
|
||||||
|
|
||||||
|
// Track when we find the tx and in which ledger
|
||||||
|
std::optional<uint256> foundLedgerHash;
|
||||||
|
std::optional<std::uint32_t> foundLedgerSeq;
|
||||||
|
|
||||||
|
// Helper to check if a ledger is validated (has quorum)
|
||||||
|
auto isLedgerValidated = [&](uint256 const& ledgerHash) -> bool {
|
||||||
|
auto const quorum = context.app.validators().quorum();
|
||||||
|
if (quorum == 0)
|
||||||
|
return false; // No validators configured
|
||||||
|
|
||||||
|
auto const valCount =
|
||||||
|
context.app.getValidations().numTrustedForLedger(ledgerHash);
|
||||||
|
|
||||||
|
return valCount >= quorum;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Helper to read tx result from a ledger
|
||||||
|
auto readTxResult = [&](std::shared_ptr<Ledger const> const& ledger,
|
||||||
|
std::string const& source) -> bool {
|
||||||
|
if (!ledger)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
auto [sttx, stobj] = ledger->txRead(txHash);
|
||||||
|
if (!sttx || !stobj)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
jvResult[jss::status] = "success";
|
||||||
|
jvResult[jss::validated] = true;
|
||||||
|
jvResult["found_via"] = source;
|
||||||
|
jvResult[jss::tx_json] = sttx->getJson(JsonOptions::none);
|
||||||
|
jvResult[jss::metadata] = stobj->getJson(JsonOptions::none);
|
||||||
|
jvResult[jss::ledger_hash] = to_string(ledger->info().hash);
|
||||||
|
jvResult[jss::ledger_index] = ledger->info().seq;
|
||||||
|
|
||||||
|
// Extract result code from metadata
|
||||||
|
if (stobj->isFieldPresent(sfTransactionResult))
|
||||||
|
{
|
||||||
|
auto const result =
|
||||||
|
TER::fromInt(stobj->getFieldU8(sfTransactionResult));
|
||||||
|
std::string token;
|
||||||
|
std::string human;
|
||||||
|
transResultInfo(result, token, human);
|
||||||
|
jvResult[jss::engine_result] = token;
|
||||||
|
jvResult[jss::engine_result_code] = TERtoInt(result);
|
||||||
|
jvResult[jss::engine_result_message] = human;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
auto const elapsed = std::chrono::steady_clock::now() - startTime;
|
||||||
|
if (elapsed >= timeout)
|
||||||
|
{
|
||||||
|
jvResult[jss::error] = "transactionTimeout";
|
||||||
|
jvResult[jss::error_message] =
|
||||||
|
"Transaction not validated within timeout period";
|
||||||
|
if (foundLedgerSeq)
|
||||||
|
{
|
||||||
|
jvResult["found_in_ledger"] = *foundLedgerSeq;
|
||||||
|
auto const valCount =
|
||||||
|
context.app.getValidations().numTrustedForLedger(
|
||||||
|
*foundLedgerHash);
|
||||||
|
auto const quorum = context.app.validators().quorum();
|
||||||
|
jvResult["validation_count"] =
|
||||||
|
static_cast<unsigned int>(valCount);
|
||||||
|
jvResult["quorum"] = static_cast<unsigned int>(quorum);
|
||||||
|
}
|
||||||
|
return jvResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we already found the tx, check if its ledger is now validated
|
||||||
|
if (foundLedgerHash)
|
||||||
|
{
|
||||||
|
if (isLedgerValidated(*foundLedgerHash))
|
||||||
|
{
|
||||||
|
// Ledger is validated! Try to read from InboundLedgers first
|
||||||
|
auto ledger = context.app.getInboundLedgers().getPartialLedger(
|
||||||
|
*foundLedgerHash);
|
||||||
|
if (ledger && readTxResult(ledger, "InboundLedgers"))
|
||||||
|
{
|
||||||
|
return jvResult;
|
||||||
|
}
|
||||||
|
// Try LedgerMaster (for when synced)
|
||||||
|
if (foundLedgerSeq)
|
||||||
|
{
|
||||||
|
ledger =
|
||||||
|
context.ledgerMaster.getLedgerBySeq(*foundLedgerSeq);
|
||||||
|
if (ledger && readTxResult(ledger, "LedgerMaster"))
|
||||||
|
{
|
||||||
|
return jvResult;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Ledger validated but can't read yet - keep waiting
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
auto const currentValidatedSeq =
|
||||||
|
context.ledgerMaster.getValidLedgerIndex();
|
||||||
|
|
||||||
|
// Search InboundLedgers for the tx (partial sync mode)
|
||||||
|
auto const ledgerHash =
|
||||||
|
context.app.getInboundLedgers().findTxLedger(txHash);
|
||||||
|
|
||||||
|
if (ledgerHash)
|
||||||
|
{
|
||||||
|
auto const ledger =
|
||||||
|
context.app.getInboundLedgers().getPartialLedger(
|
||||||
|
*ledgerHash);
|
||||||
|
|
||||||
|
if (ledger)
|
||||||
|
{
|
||||||
|
foundLedgerHash = ledgerHash;
|
||||||
|
foundLedgerSeq = ledger->info().seq;
|
||||||
|
SWLOG(warn) << "FOUND tx in InboundLedgers seq="
|
||||||
|
<< ledger->info().seq;
|
||||||
|
|
||||||
|
if (isLedgerValidated(*ledgerHash))
|
||||||
|
{
|
||||||
|
if (readTxResult(ledger, "InboundLedgers"))
|
||||||
|
{
|
||||||
|
return jvResult;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Search LedgerMaster for the tx (synced mode via gossip)
|
||||||
|
// Check validated ledgers from startSeq to current
|
||||||
|
if (!foundLedgerHash)
|
||||||
|
{
|
||||||
|
for (auto seq = startSeq; seq <= currentValidatedSeq; ++seq)
|
||||||
|
{
|
||||||
|
auto ledger = context.ledgerMaster.getLedgerBySeq(seq);
|
||||||
|
if (ledger)
|
||||||
|
{
|
||||||
|
auto [sttx, stobj] = ledger->txRead(txHash);
|
||||||
|
if (sttx && stobj)
|
||||||
|
{
|
||||||
|
foundLedgerHash = ledger->info().hash;
|
||||||
|
foundLedgerSeq = seq;
|
||||||
|
SWLOG(warn)
|
||||||
|
<< "FOUND tx in LedgerMaster seq=" << seq;
|
||||||
|
|
||||||
|
// LedgerMaster ledgers are already validated
|
||||||
|
if (readTxResult(ledger, "LedgerMaster"))
|
||||||
|
{
|
||||||
|
return jvResult;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check LastLedgerSequence expiry
|
||||||
|
if (lastLedgerSeq && currentValidatedSeq > *lastLedgerSeq)
|
||||||
|
{
|
||||||
|
jvResult[jss::error] = "transactionExpired";
|
||||||
|
jvResult[jss::error_message] =
|
||||||
|
"LastLedgerSequence exceeded and transaction not found";
|
||||||
|
jvResult["last_ledger_sequence"] = *lastLedgerSeq;
|
||||||
|
jvResult["validated_ledger"] = currentValidatedSeq;
|
||||||
|
return jvResult;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sleep and continue polling
|
||||||
|
context.coro->sleepFor(pollInterval);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace ripple
|
||||||
@@ -147,6 +147,7 @@ Handler const handlerArray[]{
|
|||||||
byRef(&doSubmitMultiSigned),
|
byRef(&doSubmitMultiSigned),
|
||||||
Role::USER,
|
Role::USER,
|
||||||
NEEDS_CURRENT_LEDGER},
|
NEEDS_CURRENT_LEDGER},
|
||||||
|
{"submit_and_wait", byRef(&doSubmitAndWait), Role::USER, NO_CONDITION},
|
||||||
{"server_definitions",
|
{"server_definitions",
|
||||||
byRef(&doServerDefinitions),
|
byRef(&doServerDefinitions),
|
||||||
Role::USER,
|
Role::USER,
|
||||||
|
|||||||
@@ -107,6 +107,7 @@ conditionMet(Condition condition_required, T& context)
|
|||||||
return rpcEXPIRED_VALIDATOR_LIST;
|
return rpcEXPIRED_VALIDATOR_LIST;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//@@start network-condition-check
|
||||||
if ((condition_required & NEEDS_NETWORK_CONNECTION) &&
|
if ((condition_required & NEEDS_NETWORK_CONNECTION) &&
|
||||||
(context.netOps.getOperatingMode() < OperatingMode::SYNCING))
|
(context.netOps.getOperatingMode() < OperatingMode::SYNCING))
|
||||||
{
|
{
|
||||||
@@ -117,6 +118,7 @@ conditionMet(Condition condition_required, T& context)
|
|||||||
return rpcNO_NETWORK;
|
return rpcNO_NETWORK;
|
||||||
return rpcNOT_SYNCED;
|
return rpcNOT_SYNCED;
|
||||||
}
|
}
|
||||||
|
//@@end network-condition-check
|
||||||
|
|
||||||
if (!context.app.config().standalone() &&
|
if (!context.app.config().standalone() &&
|
||||||
condition_required & NEEDS_CURRENT_LEDGER)
|
condition_required & NEEDS_CURRENT_LEDGER)
|
||||||
|
|||||||
@@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
//==============================================================================
|
//==============================================================================
|
||||||
|
|
||||||
|
#include <ripple/app/ledger/InboundLedgers.h>
|
||||||
#include <ripple/app/ledger/LedgerMaster.h>
|
#include <ripple/app/ledger/LedgerMaster.h>
|
||||||
#include <ripple/app/ledger/LedgerToJson.h>
|
#include <ripple/app/ledger/LedgerToJson.h>
|
||||||
#include <ripple/app/ledger/OpenLedger.h>
|
#include <ripple/app/ledger/OpenLedger.h>
|
||||||
@@ -565,6 +566,11 @@ Status
|
|||||||
getLedger(T& ledger, uint256 const& ledgerHash, Context& context)
|
getLedger(T& ledger, uint256 const& ledgerHash, Context& context)
|
||||||
{
|
{
|
||||||
ledger = context.ledgerMaster.getLedgerByHash(ledgerHash);
|
ledger = context.ledgerMaster.getLedgerByHash(ledgerHash);
|
||||||
|
if (ledger == nullptr)
|
||||||
|
{
|
||||||
|
// Partial sync fallback: try to get incomplete ledger being acquired
|
||||||
|
ledger = context.app.getInboundLedgers().getPartialLedger(ledgerHash);
|
||||||
|
}
|
||||||
if (ledger == nullptr)
|
if (ledger == nullptr)
|
||||||
return {rpcLGR_NOT_FOUND, "ledgerNotFound"};
|
return {rpcLGR_NOT_FOUND, "ledgerNotFound"};
|
||||||
return Status::OK;
|
return Status::OK;
|
||||||
@@ -605,6 +611,14 @@ getLedger(T& ledger, uint32_t ledgerIndex, Context& context)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Partial sync fallback: try to get incomplete ledger being acquired
|
||||||
|
if (ledger == nullptr)
|
||||||
|
{
|
||||||
|
auto hash = context.ledgerMaster.getHashBySeq(ledgerIndex);
|
||||||
|
if (hash.isNonZero())
|
||||||
|
ledger = context.app.getInboundLedgers().getPartialLedger(hash);
|
||||||
|
}
|
||||||
|
|
||||||
if (ledger == nullptr)
|
if (ledger == nullptr)
|
||||||
return {rpcLGR_NOT_FOUND, "ledgerNotFound"};
|
return {rpcLGR_NOT_FOUND, "ledgerNotFound"};
|
||||||
|
|
||||||
@@ -627,19 +641,88 @@ template <class T>
|
|||||||
Status
|
Status
|
||||||
getLedger(T& ledger, LedgerShortcut shortcut, Context& context)
|
getLedger(T& ledger, LedgerShortcut shortcut, Context& context)
|
||||||
{
|
{
|
||||||
if (isValidatedOld(
|
//@@start sync-validation
|
||||||
context.ledgerMaster,
|
// TODO: Re-enable for production. Disabled for partial sync testing.
|
||||||
context.app.config().standalone() ||
|
// if (isValidatedOld(
|
||||||
context.app.config().reporting()))
|
// context.ledgerMaster,
|
||||||
{
|
// context.app.config().standalone() ||
|
||||||
if (context.apiVersion == 1)
|
// context.app.config().reporting()))
|
||||||
return {rpcNO_NETWORK, "InsufficientNetworkMode"};
|
// {
|
||||||
return {rpcNOT_SYNCED, "notSynced"};
|
// if (context.apiVersion == 1)
|
||||||
}
|
// return {rpcNO_NETWORK, "InsufficientNetworkMode"};
|
||||||
|
// return {rpcNOT_SYNCED, "notSynced"};
|
||||||
|
// }
|
||||||
|
//@@end sync-validation
|
||||||
|
|
||||||
if (shortcut == LedgerShortcut::VALIDATED)
|
if (shortcut == LedgerShortcut::VALIDATED)
|
||||||
{
|
{
|
||||||
ledger = context.ledgerMaster.getValidatedLedger();
|
ledger = context.ledgerMaster.getValidatedLedger();
|
||||||
|
|
||||||
|
// Partial sync fallback: try to get incomplete validated ledger
|
||||||
|
if (ledger == nullptr)
|
||||||
|
{
|
||||||
|
auto [hash, seq] = context.ledgerMaster.getLastValidatedLedger();
|
||||||
|
JLOG(context.j.warn())
|
||||||
|
<< "Partial sync: getValidatedLedger null, trying trusted hash="
|
||||||
|
<< hash << " seq=" << seq;
|
||||||
|
|
||||||
|
// If no trusted validations yet, try network-observed ledger
|
||||||
|
if (hash.isZero())
|
||||||
|
{
|
||||||
|
std::tie(hash, seq) =
|
||||||
|
context.ledgerMaster.getNetworkObservedLedger();
|
||||||
|
JLOG(context.j.warn())
|
||||||
|
<< "Partial sync: trying network-observed hash=" << hash
|
||||||
|
<< " seq=" << seq;
|
||||||
|
|
||||||
|
// Poll-wait for validations to arrive (up to ~10 seconds)
|
||||||
|
if (hash.isZero() && context.coro)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < 100 && hash.isZero(); ++i)
|
||||||
|
{
|
||||||
|
context.coro->sleepFor(std::chrono::milliseconds(100));
|
||||||
|
std::tie(hash, seq) =
|
||||||
|
context.ledgerMaster.getNetworkObservedLedger();
|
||||||
|
}
|
||||||
|
if (hash.isNonZero())
|
||||||
|
{
|
||||||
|
JLOG(context.j.warn())
|
||||||
|
<< "Partial sync: got network-observed hash="
|
||||||
|
<< hash << " seq=" << seq;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hash.isNonZero())
|
||||||
|
{
|
||||||
|
ledger = context.app.getInboundLedgers().getPartialLedger(hash);
|
||||||
|
// If no InboundLedger exists yet, trigger acquisition and wait
|
||||||
|
if (!ledger)
|
||||||
|
{
|
||||||
|
JLOG(context.j.warn())
|
||||||
|
<< "Partial sync: acquiring ledger " << hash;
|
||||||
|
context.app.getInboundLedgers().acquire(
|
||||||
|
hash, seq, InboundLedger::Reason::CONSENSUS);
|
||||||
|
|
||||||
|
// Poll-wait for the ledger header (up to ~10 seconds)
|
||||||
|
int i = 0;
|
||||||
|
for (; i < 100 && !ledger && context.coro; ++i)
|
||||||
|
{
|
||||||
|
context.coro->sleepFor(std::chrono::milliseconds(100));
|
||||||
|
ledger =
|
||||||
|
context.app.getInboundLedgers().getPartialLedger(
|
||||||
|
hash);
|
||||||
|
}
|
||||||
|
JLOG(context.j.warn())
|
||||||
|
<< "Partial sync: poll-wait completed after " << i
|
||||||
|
<< " iterations, ledger="
|
||||||
|
<< (ledger ? "found" : "null");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
JLOG(context.j.warn()) << "Partial sync: getPartialLedger returned "
|
||||||
|
<< (ledger ? "ledger" : "null");
|
||||||
|
}
|
||||||
|
|
||||||
if (ledger == nullptr)
|
if (ledger == nullptr)
|
||||||
{
|
{
|
||||||
if (context.apiVersion == 1)
|
if (context.apiVersion == 1)
|
||||||
|
|||||||
@@ -292,12 +292,14 @@ ServerHandlerImp::onRequest(Session& session)
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<Session> detachedSession = session.detach();
|
std::shared_ptr<Session> detachedSession = session.detach();
|
||||||
|
//@@start rpc-coro-usage
|
||||||
auto const postResult = m_jobQueue.postCoro(
|
auto const postResult = m_jobQueue.postCoro(
|
||||||
jtCLIENT_RPC,
|
jtCLIENT_RPC,
|
||||||
"RPC-Client",
|
"RPC-Client",
|
||||||
[this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
|
[this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
|
||||||
processSession(detachedSession, coro);
|
processSession(detachedSession, coro);
|
||||||
});
|
});
|
||||||
|
//@@end rpc-coro-usage
|
||||||
if (postResult == nullptr)
|
if (postResult == nullptr)
|
||||||
{
|
{
|
||||||
// The coroutine was rejected, probably because we're shutting down.
|
// The coroutine was rejected, probably because we're shutting down.
|
||||||
|
|||||||
@@ -81,9 +81,14 @@ public:
|
|||||||
*
|
*
|
||||||
* @param refNum Sequence of ledger to acquire.
|
* @param refNum Sequence of ledger to acquire.
|
||||||
* @param nodeHash Hash of missing node to report in throw.
|
* @param nodeHash Hash of missing node to report in throw.
|
||||||
|
* @param prioritize If true, prioritize fetching this specific node
|
||||||
|
* (used by partial sync mode for RPC queries).
|
||||||
*/
|
*/
|
||||||
virtual void
|
virtual void
|
||||||
missingNodeAcquireBySeq(std::uint32_t refNum, uint256 const& nodeHash) = 0;
|
missingNodeAcquireBySeq(
|
||||||
|
std::uint32_t refNum,
|
||||||
|
uint256 const& nodeHash,
|
||||||
|
bool prioritize = false) = 0;
|
||||||
|
|
||||||
/** Acquire ledger that has a missing node by ledger hash
|
/** Acquire ledger that has a missing node by ledger hash
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -83,7 +83,10 @@ public:
|
|||||||
reset() override;
|
reset() override;
|
||||||
|
|
||||||
void
|
void
|
||||||
missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& hash) override;
|
missingNodeAcquireBySeq(
|
||||||
|
std::uint32_t seq,
|
||||||
|
uint256 const& hash,
|
||||||
|
bool prioritize = false) override;
|
||||||
|
|
||||||
void
|
void
|
||||||
missingNodeAcquireByHash(uint256 const& hash, std::uint32_t seq) override
|
missingNodeAcquireByHash(uint256 const& hash, std::uint32_t seq) override
|
||||||
|
|||||||
@@ -200,6 +200,7 @@ SHAMapInnerNode::isEmptyBranch(int m) const
|
|||||||
return (isBranch_ & (1 << m)) == 0;
|
return (isBranch_ & (1 << m)) == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//@@start full-below-methods
|
||||||
inline bool
|
inline bool
|
||||||
SHAMapInnerNode::isFullBelow(std::uint32_t generation) const
|
SHAMapInnerNode::isFullBelow(std::uint32_t generation) const
|
||||||
{
|
{
|
||||||
@@ -211,6 +212,7 @@ SHAMapInnerNode::setFullBelowGen(std::uint32_t gen)
|
|||||||
{
|
{
|
||||||
fullBelowGen_ = gen;
|
fullBelowGen_ = gen;
|
||||||
}
|
}
|
||||||
|
//@@end full-below-methods
|
||||||
|
|
||||||
} // namespace ripple
|
} // namespace ripple
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@@ -29,11 +29,13 @@
|
|||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
|
//@@start shamap-type-enum
|
||||||
enum class SHAMapType {
|
enum class SHAMapType {
|
||||||
TRANSACTION = 1, // A tree of transactions
|
TRANSACTION = 1, // A tree of transactions
|
||||||
STATE = 2, // A tree of state nodes
|
STATE = 2, // A tree of state nodes
|
||||||
FREE = 3, // A tree not part of a ledger
|
FREE = 3, // A tree not part of a ledger
|
||||||
};
|
};
|
||||||
|
//@@end shamap-type-enum
|
||||||
|
|
||||||
inline std::string
|
inline std::string
|
||||||
to_string(SHAMapType t)
|
to_string(SHAMapType t)
|
||||||
@@ -52,6 +54,7 @@ to_string(SHAMapType t)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//@@start shamap-missing-node-class
|
||||||
class SHAMapMissingNode : public std::runtime_error
|
class SHAMapMissingNode : public std::runtime_error
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@@ -67,6 +70,7 @@ public:
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
//@@end shamap-missing-node-class
|
||||||
|
|
||||||
} // namespace ripple
|
} // namespace ripple
|
||||||
|
|
||||||
|
|||||||
@@ -89,8 +89,10 @@ public:
|
|||||||
reset() override;
|
reset() override;
|
||||||
|
|
||||||
void
|
void
|
||||||
missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash)
|
missingNodeAcquireBySeq(
|
||||||
override;
|
std::uint32_t seq,
|
||||||
|
uint256 const& nodeHash,
|
||||||
|
bool prioritize = false) override;
|
||||||
|
|
||||||
void
|
void
|
||||||
missingNodeAcquireByHash(uint256 const& hash, std::uint32_t seq) override
|
missingNodeAcquireByHash(uint256 const& hash, std::uint32_t seq) override
|
||||||
|
|||||||
@@ -66,9 +66,13 @@ NodeFamily::reset()
|
|||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
NodeFamily::missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash)
|
NodeFamily::missingNodeAcquireBySeq(
|
||||||
|
std::uint32_t seq,
|
||||||
|
uint256 const& nodeHash,
|
||||||
|
bool prioritize)
|
||||||
{
|
{
|
||||||
JLOG(j_.error()) << "Missing node in " << seq;
|
JLOG(j_.error()) << "Missing node in " << seq << " hash=" << nodeHash
|
||||||
|
<< (prioritize ? " [PRIORITY]" : "");
|
||||||
if (app_.config().reporting())
|
if (app_.config().reporting())
|
||||||
{
|
{
|
||||||
std::stringstream ss;
|
std::stringstream ss;
|
||||||
@@ -77,6 +81,10 @@ NodeFamily::missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash)
|
|||||||
Throw<std::runtime_error>(ss.str());
|
Throw<std::runtime_error>(ss.str());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add priority for the specific node hash needed by the query
|
||||||
|
if (prioritize && nodeHash.isNonZero())
|
||||||
|
app_.getInboundLedgers().addPriorityNode(seq, nodeHash);
|
||||||
|
|
||||||
std::unique_lock<std::mutex> lock(maxSeqMutex_);
|
std::unique_lock<std::mutex> lock(maxSeqMutex_);
|
||||||
if (maxSeq_ == 0)
|
if (maxSeq_ == 0)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -17,13 +17,16 @@
|
|||||||
*/
|
*/
|
||||||
//==============================================================================
|
//==============================================================================
|
||||||
|
|
||||||
|
#include <ripple/basics/LocalValue.h>
|
||||||
#include <ripple/basics/contract.h>
|
#include <ripple/basics/contract.h>
|
||||||
|
#include <ripple/core/JobQueue.h>
|
||||||
#include <ripple/shamap/SHAMap.h>
|
#include <ripple/shamap/SHAMap.h>
|
||||||
#include <ripple/shamap/SHAMapAccountStateLeafNode.h>
|
#include <ripple/shamap/SHAMapAccountStateLeafNode.h>
|
||||||
#include <ripple/shamap/SHAMapNodeID.h>
|
#include <ripple/shamap/SHAMapNodeID.h>
|
||||||
#include <ripple/shamap/SHAMapSyncFilter.h>
|
#include <ripple/shamap/SHAMapSyncFilter.h>
|
||||||
#include <ripple/shamap/SHAMapTxLeafNode.h>
|
#include <ripple/shamap/SHAMapTxLeafNode.h>
|
||||||
#include <ripple/shamap/SHAMapTxPlusMetaLeafNode.h>
|
#include <ripple/shamap/SHAMapTxPlusMetaLeafNode.h>
|
||||||
|
#include <chrono>
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
@@ -150,6 +153,7 @@ SHAMap::walkTowardsKey(uint256 const& id, SharedPtrNodeStack* stack) const
|
|||||||
return static_cast<SHAMapLeafNode*>(inNode.get());
|
return static_cast<SHAMapLeafNode*>(inNode.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//@@start find-key
|
||||||
SHAMapLeafNode*
|
SHAMapLeafNode*
|
||||||
SHAMap::findKey(uint256 const& id) const
|
SHAMap::findKey(uint256 const& id) const
|
||||||
{
|
{
|
||||||
@@ -158,6 +162,7 @@ SHAMap::findKey(uint256 const& id) const
|
|||||||
leaf = nullptr;
|
leaf = nullptr;
|
||||||
return leaf;
|
return leaf;
|
||||||
}
|
}
|
||||||
|
//@@end find-key
|
||||||
|
|
||||||
std::shared_ptr<SHAMapTreeNode>
|
std::shared_ptr<SHAMapTreeNode>
|
||||||
SHAMap::fetchNodeFromDB(SHAMapHash const& hash) const
|
SHAMap::fetchNodeFromDB(SHAMapHash const& hash) const
|
||||||
@@ -183,6 +188,65 @@ SHAMap::finishFetch(
|
|||||||
full_ = false;
|
full_ = false;
|
||||||
f_.missingNodeAcquireBySeq(ledgerSeq_, hash.as_uint256());
|
f_.missingNodeAcquireBySeq(ledgerSeq_, hash.as_uint256());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we're in a coroutine context, poll-wait for the node
|
||||||
|
if (auto* coro = static_cast<JobQueue::Coro*>(getCurrentCoroPtr()))
|
||||||
|
{
|
||||||
|
using namespace std::chrono;
|
||||||
|
constexpr auto pollInterval = 50ms;
|
||||||
|
constexpr auto defaultTimeout = 30s;
|
||||||
|
// Use coroutine-local timeout if set, otherwise default
|
||||||
|
auto coroTimeout = getCoroFetchTimeout();
|
||||||
|
auto timeout =
|
||||||
|
coroTimeout.count() > 0 ? coroTimeout : defaultTimeout;
|
||||||
|
auto const deadline = steady_clock::now() + timeout;
|
||||||
|
|
||||||
|
// Linear backoff for re-requests: 50ms, 100ms, 150ms... up to
|
||||||
|
// 2s
|
||||||
|
auto nextRequestDelay = 50ms;
|
||||||
|
constexpr auto maxRequestDelay = 2000ms;
|
||||||
|
constexpr auto backoffStep = 50ms;
|
||||||
|
auto nextRequestTime = steady_clock::now() + nextRequestDelay;
|
||||||
|
|
||||||
|
JLOG(journal_.debug())
|
||||||
|
<< "finishFetch: waiting for node " << hash;
|
||||||
|
|
||||||
|
while (steady_clock::now() < deadline)
|
||||||
|
{
|
||||||
|
// Sleep for the poll interval (yields coroutine, frees job
|
||||||
|
// thread)
|
||||||
|
coro->sleepFor(pollInterval);
|
||||||
|
|
||||||
|
// Try to fetch from cache/db again
|
||||||
|
if (auto obj = f_.db().fetchNodeObject(
|
||||||
|
hash.as_uint256(), ledgerSeq_))
|
||||||
|
{
|
||||||
|
JLOG(journal_.debug())
|
||||||
|
<< "finishFetch: got node " << hash;
|
||||||
|
auto node = SHAMapTreeNode::makeFromPrefix(
|
||||||
|
makeSlice(obj->getData()), hash);
|
||||||
|
if (node)
|
||||||
|
canonicalize(hash, node);
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-request with priority using linear backoff
|
||||||
|
auto now = steady_clock::now();
|
||||||
|
if (now >= nextRequestTime)
|
||||||
|
{
|
||||||
|
f_.missingNodeAcquireBySeq(
|
||||||
|
ledgerSeq_, hash.as_uint256(), true /*prioritize*/);
|
||||||
|
// Increase delay for next request (linear backoff)
|
||||||
|
if (nextRequestDelay < maxRequestDelay)
|
||||||
|
nextRequestDelay += backoffStep;
|
||||||
|
nextRequestTime = now + nextRequestDelay;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
JLOG(journal_.warn())
|
||||||
|
<< "finishFetch: timeout waiting for node " << hash;
|
||||||
|
}
|
||||||
|
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -264,6 +328,7 @@ SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const
|
|||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
//@@start fetch-with-timeout
|
||||||
std::shared_ptr<SHAMapTreeNode>
|
std::shared_ptr<SHAMapTreeNode>
|
||||||
SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const
|
SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const
|
||||||
{
|
{
|
||||||
@@ -305,6 +370,7 @@ SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const
|
|||||||
|
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
//@@end fetch-with-timeout
|
||||||
|
|
||||||
std::shared_ptr<SHAMapTreeNode>
|
std::shared_ptr<SHAMapTreeNode>
|
||||||
SHAMap::fetchNodeNT(SHAMapHash const& hash) const
|
SHAMap::fetchNodeNT(SHAMapHash const& hash) const
|
||||||
@@ -329,6 +395,7 @@ SHAMap::fetchNode(SHAMapHash const& hash) const
|
|||||||
return node;
|
return node;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//@@start throw-on-missing
|
||||||
SHAMapTreeNode*
|
SHAMapTreeNode*
|
||||||
SHAMap::descendThrow(SHAMapInnerNode* parent, int branch) const
|
SHAMap::descendThrow(SHAMapInnerNode* parent, int branch) const
|
||||||
{
|
{
|
||||||
@@ -339,6 +406,7 @@ SHAMap::descendThrow(SHAMapInnerNode* parent, int branch) const
|
|||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
//@@end throw-on-missing
|
||||||
|
|
||||||
std::shared_ptr<SHAMapTreeNode>
|
std::shared_ptr<SHAMapTreeNode>
|
||||||
SHAMap::descendThrow(std::shared_ptr<SHAMapInnerNode> const& parent, int branch)
|
SHAMap::descendThrow(std::shared_ptr<SHAMapInnerNode> const& parent, int branch)
|
||||||
@@ -426,6 +494,7 @@ SHAMap::descend(
|
|||||||
return std::make_pair(child, parentID.getChildNodeID(branch));
|
return std::make_pair(child, parentID.getChildNodeID(branch));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//@@start async-fetch
|
||||||
SHAMapTreeNode*
|
SHAMapTreeNode*
|
||||||
SHAMap::descendAsync(
|
SHAMap::descendAsync(
|
||||||
SHAMapInnerNode* parent,
|
SHAMapInnerNode* parent,
|
||||||
@@ -448,6 +517,7 @@ SHAMap::descendAsync(
|
|||||||
if (filter)
|
if (filter)
|
||||||
ptr = checkFilter(hash, filter);
|
ptr = checkFilter(hash, filter);
|
||||||
|
|
||||||
|
//@@start db-async-fetch
|
||||||
if (!ptr && backed_)
|
if (!ptr && backed_)
|
||||||
{
|
{
|
||||||
f_.db().asyncFetch(
|
f_.db().asyncFetch(
|
||||||
@@ -461,6 +531,7 @@ SHAMap::descendAsync(
|
|||||||
pending = true;
|
pending = true;
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
//@@end db-async-fetch
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ptr)
|
if (ptr)
|
||||||
@@ -468,6 +539,7 @@ SHAMap::descendAsync(
|
|||||||
|
|
||||||
return ptr.get();
|
return ptr.get();
|
||||||
}
|
}
|
||||||
|
//@@end async-fetch
|
||||||
|
|
||||||
template <class Node>
|
template <class Node>
|
||||||
std::shared_ptr<Node>
|
std::shared_ptr<Node>
|
||||||
|
|||||||
@@ -153,11 +153,17 @@ ShardFamily::reset()
|
|||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
ShardFamily::missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash)
|
ShardFamily::missingNodeAcquireBySeq(
|
||||||
|
std::uint32_t seq,
|
||||||
|
uint256 const& nodeHash,
|
||||||
|
bool prioritize)
|
||||||
{
|
{
|
||||||
std::ignore = nodeHash;
|
|
||||||
JLOG(j_.error()) << "Missing node in ledger sequence " << seq;
|
JLOG(j_.error()) << "Missing node in ledger sequence " << seq;
|
||||||
|
|
||||||
|
// Add priority for the specific node hash needed by the query
|
||||||
|
if (prioritize && nodeHash.isNonZero())
|
||||||
|
app_.getInboundLedgers().addPriorityNode(seq, nodeHash);
|
||||||
|
|
||||||
std::unique_lock<std::mutex> lock(maxSeqMutex_);
|
std::unique_lock<std::mutex> lock(maxSeqMutex_);
|
||||||
if (maxSeq_ == 0)
|
if (maxSeq_ == 0)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -126,6 +126,34 @@ public:
|
|||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
virtual std::shared_ptr<Ledger const>
|
||||||
|
getPartialLedger(uint256 const& hash) override
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual std::optional<uint256>
|
||||||
|
findTxLedger(uint256 const& txHash) override
|
||||||
|
{
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual void
|
||||||
|
addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) override
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual void
|
||||||
|
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) override
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual bool
|
||||||
|
isTxPrioritized(std::uint32_t seq) const override
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
virtual bool
|
virtual bool
|
||||||
gotLedgerData(
|
gotLedgerData(
|
||||||
LedgerHash const& ledgerHash,
|
LedgerHash const& ledgerHash,
|
||||||
|
|||||||
@@ -105,8 +105,10 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
missingNodeAcquireBySeq(std::uint32_t refNum, uint256 const& nodeHash)
|
missingNodeAcquireBySeq(
|
||||||
override
|
std::uint32_t refNum,
|
||||||
|
uint256 const& nodeHash,
|
||||||
|
bool prioritize = false) override
|
||||||
{
|
{
|
||||||
Throw<std::runtime_error>("missing node");
|
Throw<std::runtime_error>("missing node");
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user