Merge remote-tracking branch 'origin/develop' into pratik/Fix_asan_lsan_flagged_issues

This commit is contained in:
Pratik Mankawde
2026-02-13 13:11:22 +00:00
60 changed files with 263 additions and 227 deletions

View File

@@ -0,0 +1,224 @@
#pragma once
#include <xrpl/basics/CountedObject.h>
#include <xrpl/json/json_value.h>
#include <xrpl/protocol/Book.h>
#include <xrpl/protocol/ErrorCodes.h>
#include <xrpl/resource/Consumer.h>
#include <xrpl/server/Manifest.h>
namespace xrpl {
// Operations that clients may wish to perform against the network
// Master operational handler, server sequencer, network tracker
class InfoSubRequest : public CountedObject<InfoSubRequest>
{
public:
using pointer = std::shared_ptr<InfoSubRequest>;
virtual ~InfoSubRequest() = default;
virtual Json::Value
doClose() = 0;
virtual Json::Value
doStatus(Json::Value const&) = 0;
};
/** Manages a client's subscription to data feeds.
*/
class InfoSub : public CountedObject<InfoSub>
{
public:
using pointer = std::shared_ptr<InfoSub>;
// VFALCO TODO Standardize on the names of weak / strong pointer type
// aliases.
using wptr = std::weak_ptr<InfoSub>;
using ref = std::shared_ptr<InfoSub> const&;
using Consumer = Resource::Consumer;
public:
/** Abstracts the source of subscription data.
*/
class Source
{
public:
virtual ~Source() = default;
// For some reason, these were originally called "rt"
// for "real time". They actually refer to whether
// you get transactions as they occur or once their
// results are confirmed
virtual void
subAccount(ref ispListener, hash_set<AccountID> const& vnaAccountIDs, bool realTime) = 0;
// for normal use, removes from InfoSub and server
virtual void
unsubAccount(ref isplistener, hash_set<AccountID> const& vnaAccountIDs, bool realTime) = 0;
// for use during InfoSub destruction
// Removes only from the server
virtual void
unsubAccountInternal(std::uint64_t uListener, hash_set<AccountID> const& vnaAccountIDs, bool realTime) = 0;
/**
* subscribe an account's new transactions and retrieve the account's
* historical transactions
* @return rpcSUCCESS if successful, otherwise an error code
*/
virtual error_code_i
subAccountHistory(ref ispListener, AccountID const& account) = 0;
/**
* unsubscribe an account's transactions
* @param historyOnly if true, only stop historical transactions
* @note once a client receives enough historical transactions,
* it should unsubscribe with historyOnly == true to stop receiving
* more historical transactions. It will continue to receive new
* transactions.
*/
virtual void
unsubAccountHistory(ref ispListener, AccountID const& account, bool historyOnly) = 0;
virtual void
unsubAccountHistoryInternal(std::uint64_t uListener, AccountID const& account, bool historyOnly) = 0;
// VFALCO TODO Document the bool return value
virtual bool
subLedger(ref ispListener, Json::Value& jvResult) = 0;
virtual bool
unsubLedger(std::uint64_t uListener) = 0;
virtual bool
subBookChanges(ref ispListener) = 0;
virtual bool
unsubBookChanges(std::uint64_t uListener) = 0;
virtual bool
subManifests(ref ispListener) = 0;
virtual bool
unsubManifests(std::uint64_t uListener) = 0;
virtual void
pubManifest(Manifest const&) = 0;
virtual bool
subServer(ref ispListener, Json::Value& jvResult, bool admin) = 0;
virtual bool
unsubServer(std::uint64_t uListener) = 0;
virtual bool
subBook(ref ispListener, Book const&) = 0;
virtual bool
unsubBook(std::uint64_t uListener, Book const&) = 0;
virtual bool
subTransactions(ref ispListener) = 0;
virtual bool
unsubTransactions(std::uint64_t uListener) = 0;
virtual bool
subRTTransactions(ref ispListener) = 0;
virtual bool
unsubRTTransactions(std::uint64_t uListener) = 0;
virtual bool
subValidations(ref ispListener) = 0;
virtual bool
unsubValidations(std::uint64_t uListener) = 0;
virtual bool
subPeerStatus(ref ispListener) = 0;
virtual bool
unsubPeerStatus(std::uint64_t uListener) = 0;
virtual void
pubPeerStatus(std::function<Json::Value(void)> const&) = 0;
virtual bool
subConsensus(ref ispListener) = 0;
virtual bool
unsubConsensus(std::uint64_t uListener) = 0;
// VFALCO TODO Remove
// This was added for one particular partner, it
// "pushes" subscription data to a particular URL.
//
virtual pointer
findRpcSub(std::string const& strUrl) = 0;
virtual pointer
addRpcSub(std::string const& strUrl, ref rspEntry) = 0;
virtual bool
tryRemoveRpcSub(std::string const& strUrl) = 0;
};
public:
InfoSub(Source& source);
InfoSub(Source& source, Consumer consumer);
virtual ~InfoSub();
Consumer&
getConsumer();
virtual void
send(Json::Value const& jvObj, bool broadcast) = 0;
std::uint64_t
getSeq();
void
onSendEmpty();
void
insertSubAccountInfo(AccountID const& account, bool rt);
void
deleteSubAccountInfo(AccountID const& account, bool rt);
// return false if already subscribed to this account
bool
insertSubAccountHistory(AccountID const& account);
void
deleteSubAccountHistory(AccountID const& account);
void
clearRequest();
void
setRequest(std::shared_ptr<InfoSubRequest> const& req);
std::shared_ptr<InfoSubRequest> const&
getRequest();
void
setApiVersion(unsigned int apiVersion);
unsigned int
getApiVersion() const noexcept;
protected:
std::mutex mLock;
private:
Consumer m_consumer;
Source& m_source;
hash_set<AccountID> realTimeSubscriptions_;
hash_set<AccountID> normalSubscriptions_;
std::shared_ptr<InfoSubRequest> request_;
std::uint64_t mSeq;
hash_set<AccountID> accountHistorySubscriptions_;
unsigned int apiVersion_ = 0;
static int
assign_id()
{
static std::atomic<std::uint64_t> id(0);
return ++id;
}
};
} // namespace xrpl

View File

@@ -0,0 +1,249 @@
#pragma once
#include <xrpl/core/JobQueue.h>
#include <xrpl/core/ServiceRegistry.h>
#include <xrpl/ledger/ReadView.h>
#include <xrpl/protocol/STValidation.h>
#include <xrpl/protocol/TER.h>
#include <xrpl/protocol/messages.h>
#include <xrpl/server/InfoSub.h>
#include <xrpl/shamap/SHAMap.h>
#include <boost/asio.hpp>
#include <memory>
namespace xrpl {
// Operations that clients may wish to perform against the network
// Master operational handler, server sequencer, network tracker
class Peer;
class LedgerMaster;
class Transaction;
class ValidatorKeys;
class CanonicalTXSet;
class RCLCxPeerPos;
// This is the primary interface into the "client" portion of the program.
// Code that wants to do normal operations on the network such as
// creating and monitoring accounts, creating transactions, and so on
// should use this interface. The RPC code will primarily be a light wrapper
// over this code.
//
// Eventually, it will check the node's operating mode (synced, unsynced,
// etcetera) and defer to the correct means of processing. The current
// code assumes this node is synced (and will continue to do so until
// there's a functional network.
//
/** Specifies the mode under which the server believes it's operating.
This has implications about how the server processes transactions and
how it responds to requests (e.g. account balance request).
@note Other code relies on the numerical values of these constants; do
not change them without verifying each use and ensuring that it is
not a breaking change.
*/
enum class OperatingMode {
DISCONNECTED = 0, //!< not ready to process requests
CONNECTED = 1, //!< convinced we are talking to the network
SYNCING = 2, //!< fallen slightly behind
TRACKING = 3, //!< convinced we agree with the network
FULL = 4 //!< we have the ledger and can even validate
};
/** Provides server functionality for clients.
Clients include backend applications, local commands, and connected
clients. This class acts as a proxy, fulfilling the command with local
data if possible, or asking the network and returning the results if
needed.
A backend application or local client can trust a local instance of
rippled / NetworkOPs. However, client software connecting to non-local
instances of rippled will need to be hardened to protect against hostile
or unreliable servers.
*/
class NetworkOPs : public InfoSub::Source
{
public:
using clock_type = beast::abstract_clock<std::chrono::steady_clock>;
enum class FailHard : unsigned char { no, yes };
static inline FailHard
doFailHard(bool noMeansDont)
{
return noMeansDont ? FailHard::yes : FailHard::no;
}
public:
~NetworkOPs() override = default;
virtual void
stop() = 0;
//--------------------------------------------------------------------------
//
// Network information
//
virtual OperatingMode
getOperatingMode() const = 0;
virtual std::string
strOperatingMode(OperatingMode const mode, bool const admin = false) const = 0;
virtual std::string
strOperatingMode(bool const admin = false) const = 0;
//--------------------------------------------------------------------------
//
// Transaction processing
//
// must complete immediately
virtual void
submitTransaction(std::shared_ptr<STTx const> const&) = 0;
/**
* Process transactions as they arrive from the network or which are
* submitted by clients. Process local transactions synchronously
*
* @param transaction Transaction object
* @param bUnlimited Whether a privileged client connection submitted it.
* @param bLocal Client submission.
* @param failType fail_hard setting from transaction submission.
*/
virtual void
processTransaction(std::shared_ptr<Transaction>& transaction, bool bUnlimited, bool bLocal, FailHard failType) = 0;
/**
* Process a set of transactions synchronously, and ensuring that they are
* processed in one batch.
*
* @param set Transaction object set
*/
virtual void
processTransactionSet(CanonicalTXSet const& set) = 0;
//--------------------------------------------------------------------------
//
// Owner functions
//
virtual Json::Value
getOwnerInfo(std::shared_ptr<ReadView const> lpLedger, AccountID const& account) = 0;
//--------------------------------------------------------------------------
//
// Book functions
//
virtual void
getBookPage(
std::shared_ptr<ReadView const>& lpLedger,
Book const& book,
AccountID const& uTakerID,
bool const bProof,
unsigned int iLimit,
Json::Value const& jvMarker,
Json::Value& jvResult) = 0;
//--------------------------------------------------------------------------
// ledger proposal/close functions
virtual bool
processTrustedProposal(RCLCxPeerPos peerPos) = 0;
virtual bool
recvValidation(std::shared_ptr<STValidation> const& val, std::string const& source) = 0;
virtual void
mapComplete(std::shared_ptr<SHAMap> const& map, bool fromAcquire) = 0;
// network state machine
virtual bool
beginConsensus(uint256 const& netLCL, std::unique_ptr<std::stringstream> const& clog) = 0;
virtual void
endConsensus(std::unique_ptr<std::stringstream> const& clog) = 0;
virtual void
setStandAlone() = 0;
virtual void
setStateTimer() = 0;
virtual void
setNeedNetworkLedger() = 0;
virtual void
clearNeedNetworkLedger() = 0;
virtual bool
isNeedNetworkLedger() = 0;
virtual bool
isFull() = 0;
virtual void
setMode(OperatingMode om) = 0;
virtual bool
isBlocked() = 0;
virtual bool
isAmendmentBlocked() = 0;
virtual void
setAmendmentBlocked() = 0;
virtual bool
isAmendmentWarned() = 0;
virtual void
setAmendmentWarned() = 0;
virtual void
clearAmendmentWarned() = 0;
virtual bool
isUNLBlocked() = 0;
virtual void
setUNLBlocked() = 0;
virtual void
clearUNLBlocked() = 0;
virtual void
consensusViewChange() = 0;
virtual Json::Value
getConsensusInfo() = 0;
virtual Json::Value
getServerInfo(bool human, bool admin, bool counters) = 0;
virtual void
clearLedgerFetch() = 0;
virtual Json::Value
getLedgerFetchInfo() = 0;
/** Accepts the current transaction tree, return the new ledger's sequence
This API is only used via RPC with the server in STANDALONE mode and
performs a virtual consensus round, with all the transactions we are
proposing being accepted.
*/
virtual std::uint32_t
acceptLedger(std::optional<std::chrono::milliseconds> consensusDelay = std::nullopt) = 0;
virtual void
reportFeeChange() = 0;
virtual void
updateLocalTx(ReadView const& newValidLedger) = 0;
virtual std::size_t
getLocalTxCount() = 0;
//--------------------------------------------------------------------------
//
// Monitoring: publisher side
//
virtual void
pubLedger(std::shared_ptr<ReadView const> const& lpAccepted) = 0;
virtual void
pubProposedTransaction(
std::shared_ptr<ReadView const> const& ledger,
std::shared_ptr<STTx const> const& transaction,
TER result) = 0;
virtual void
pubValidation(std::shared_ptr<STValidation> const& val) = 0;
virtual void
stateAccounting(Json::Value& obj) = 0;
};
} // namespace xrpl