Implement tx stream with history

This commit is contained in:
Peng Wang
2021-03-31 10:11:47 -04:00
committed by manojsdoshi
parent 00a4c3a478
commit e2a42184b9
7 changed files with 1203 additions and 67 deletions

View File

@@ -38,6 +38,8 @@
#include <ripple/app/misc/ValidatorList.h>
#include <ripple/app/misc/impl/AccountTxPaging.h>
#include <ripple/app/rdb/RelationalDBInterface.h>
#include <ripple/app/rdb/backend/RelationalDBInterfacePostgres.h>
#include <ripple/app/rdb/backend/RelationalDBInterfaceSqlite.h>
#include <ripple/app/reporting/ReportingETL.h>
#include <ripple/app/tx/apply.h>
#include <ripple/basics/PerfLog.h>
@@ -51,6 +53,7 @@
#include <ripple/crypto/RFC1751.h>
#include <ripple/crypto/csprng.h>
#include <ripple/json/to_string.h>
#include <ripple/net/RPCErr.h>
#include <ripple/nodestore/DatabaseShard.h>
#include <ripple/overlay/Cluster.h>
#include <ripple/overlay/Overlay.h>
@@ -58,6 +61,7 @@
#include <ripple/protocol/BuildInfo.h>
#include <ripple/protocol/Feature.h>
#include <ripple/protocol/STParsedJSON.h>
#include <ripple/resource/Fees.h>
#include <ripple/resource/ResourceManager.h>
#include <ripple/rpc/DeliveredAmount.h>
#include <ripple/rpc/impl/RPCHelpers.h>
@@ -229,6 +233,7 @@ public:
, mMode(start_valid ? OperatingMode::FULL : OperatingMode::DISCONNECTED)
, heartbeatTimer_(io_svc)
, clusterTimer_(io_svc)
, accountHistoryTxTimer_(io_svc)
, mConsensus(
app,
make_FeeVote(
@@ -481,6 +486,21 @@ public:
hash_set<AccountID> const& vnaAccountIDs,
bool rt) override;
error_code_i
subAccountHistory(InfoSub::ref ispListener, AccountID const& account)
override;
void
unsubAccountHistory(
InfoSub::ref ispListener,
AccountID const& account,
bool historyOnly) override;
void
unsubAccountHistoryInternal(
std::uint64_t seq,
AccountID const& account,
bool historyOnly) override;
bool
subLedger(InfoSub::ref ispListener, Json::Value& jvResult) override;
bool
@@ -559,6 +579,15 @@ public:
<< "NetworkOPs: clusterTimer cancel error: "
<< ec.message();
}
ec.clear();
accountHistoryTxTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "NetworkOPs: accountHistoryTxTimer cancel error: "
<< ec.message();
}
}
// Make sure that any waitHandlers pending in our timers are done.
using namespace std::chrono_literals;
@@ -567,6 +596,12 @@ public:
private:
void
setTimer(
boost::asio::steady_timer& timer,
std::chrono::milliseconds const& expiry_time,
std::function<void()> onExpire,
std::function<void()> onError);
void
setHeartbeatTimer();
void
setClusterTimer();
@@ -605,6 +640,63 @@ private:
using SubInfoMapType = hash_map<AccountID, SubMapType>;
using subRpcMapType = hash_map<std::string, InfoSub::pointer>;
/*
* With a validated ledger to separate history and future, the node
* streams historical txns with negative indexes starting from -1,
* and streams future txns starting from index 0.
* The SubAccountHistoryIndex struct maintains these indexes.
* It also has a flag stopHistorical_ for stopping streaming
* the historical txns.
*/
struct SubAccountHistoryIndex
{
AccountID const accountId_;
// forward
std::uint32_t forwardTxIndex_;
// separate backward and forward
std::uint32_t separationLedgerSeq_;
// history, backward
std::uint32_t historyLastLedgerSeq_;
std::int32_t historyTxIndex_;
bool haveHistorical_;
std::atomic<bool> stopHistorical_;
SubAccountHistoryIndex(AccountID const& accountId)
: accountId_(accountId)
, forwardTxIndex_(0)
, separationLedgerSeq_(0)
, historyLastLedgerSeq_(0)
, historyTxIndex_(-1)
, haveHistorical_(false)
, stopHistorical_(false)
{
}
};
struct SubAccountHistoryInfo
{
InfoSub::pointer sink_;
std::shared_ptr<SubAccountHistoryIndex> index_;
};
struct SubAccountHistoryInfoWeak
{
InfoSub::wptr sinkWptr_;
std::shared_ptr<SubAccountHistoryIndex> index_;
};
using SubAccountHistoryMapType =
hash_map<AccountID, hash_map<std::uint64_t, SubAccountHistoryInfoWeak>>;
/**
* @note called while holding mSubLock
*/
void
subAccountHistoryStart(
std::shared_ptr<ReadView const> const& ledger,
SubAccountHistoryInfoWeak& subInfo);
void
addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo);
void
setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo);
Application& app_;
beast::Journal m_journal;
@@ -622,6 +714,7 @@ private:
ClosureCounter<void, boost::system::error_code const&> waitHandlerCounter_;
boost::asio::steady_timer heartbeatTimer_;
boost::asio::steady_timer clusterTimer_;
boost::asio::steady_timer accountHistoryTxTimer_;
RCLConsensus mConsensus;
@@ -634,6 +727,8 @@ private:
subRpcMapType mRpcSubMap;
SubAccountHistoryMapType mSubAccountHistory;
enum SubTypes {
sLedger, // Accepted ledgers.
sManifests, // Received validator manifests.
@@ -742,6 +837,10 @@ std::array<Json::StaticString const, 5> const
Json::StaticString(stateNames[3]),
Json::StaticString(stateNames[4])}};
static auto const genesisAccountId = calcAccountID(
generateKeyPair(KeyType::secp256k1, generateSeed("masterpassphrase"))
.first);
//------------------------------------------------------------------------------
inline OperatingMode
NetworkOPsImp::getOperatingMode() const
@@ -812,18 +911,19 @@ NetworkOPsImp::setStateTimer()
}
void
NetworkOPsImp::setHeartbeatTimer()
NetworkOPsImp::setTimer(
boost::asio::steady_timer& timer,
const std::chrono::milliseconds& expiry_time,
std::function<void()> onExpire,
std::function<void()> onError)
{
// Only start the timer if waitHandlerCounter_ is not yet joined.
if (auto optionalCountedHandler = waitHandlerCounter_.wrap(
[this](boost::system::error_code const& e) {
[this, onExpire, onError](boost::system::error_code const& e) {
if ((e.value() == boost::system::errc::success) &&
(!m_job_queue.isStopped()))
{
m_job_queue.addJob(
jtNETOP_TIMER, "NetOPs.heartbeat", [this](Job&) {
processHeartbeatTimer();
});
onExpire();
}
// Recover as best we can if an unexpected error occurs.
if (e.value() != boost::system::errc::success &&
@@ -831,47 +931,57 @@ NetworkOPsImp::setHeartbeatTimer()
{
// Try again later and hope for the best.
JLOG(m_journal.error())
<< "Heartbeat timer got error '" << e.message()
<< "Timer got error '" << e.message()
<< "'. Restarting timer.";
setHeartbeatTimer();
onError();
}
}))
{
heartbeatTimer_.expires_from_now(mConsensus.parms().ledgerGRANULARITY);
heartbeatTimer_.async_wait(std::move(*optionalCountedHandler));
timer.expires_from_now(expiry_time);
timer.async_wait(std::move(*optionalCountedHandler));
}
}
void
NetworkOPsImp::setHeartbeatTimer()
{
setTimer(
heartbeatTimer_,
mConsensus.parms().ledgerGRANULARITY,
[this]() {
m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this](Job&) {
processHeartbeatTimer();
});
},
[this]() { setHeartbeatTimer(); });
}
void
NetworkOPsImp::setClusterTimer()
{
// Only start the timer if waitHandlerCounter_ is not yet joined.
if (auto optionalCountedHandler = waitHandlerCounter_.wrap(
[this](boost::system::error_code const& e) {
if ((e.value() == boost::system::errc::success) &&
(!m_job_queue.isStopped()))
{
m_job_queue.addJob(
jtNETOP_CLUSTER, "NetOPs.cluster", [this](Job&) {
processClusterTimer();
});
}
// Recover as best we can if an unexpected error occurs.
if (e.value() != boost::system::errc::success &&
e.value() != boost::asio::error::operation_aborted)
{
// Try again later and hope for the best.
JLOG(m_journal.error())
<< "Cluster timer got error '" << e.message()
<< "'. Restarting timer.";
setClusterTimer();
}
}))
{
using namespace std::chrono_literals;
clusterTimer_.expires_from_now(10s);
clusterTimer_.async_wait(std::move(*optionalCountedHandler));
}
using namespace std::chrono_literals;
setTimer(
clusterTimer_,
10s,
[this]() {
m_job_queue.addJob(jtNETOP_CLUSTER, "NetOPs.cluster", [this](Job&) {
processClusterTimer();
});
},
[this]() { setClusterTimer(); });
}
void
NetworkOPsImp::setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
{
JLOG(m_journal.debug()) << "Scheduling AccountHistory job for account "
<< toBase58(subInfo.index_->accountId_);
using namespace std::chrono_literals;
setTimer(
accountHistoryTxTimer_,
4s,
[this, subInfo]() { addAccountHistoryJob(subInfo); },
[this, subInfo]() { setAccountHistoryJobTimer(subInfo); });
}
void
@@ -2780,6 +2890,27 @@ NetworkOPsImp::pubLedger(std::shared_ptr<ReadView const> const& lpAccepted)
it = mStreamMaps[sLedger].erase(it);
}
}
{
static bool firstTime = true;
if (firstTime)
{
// First validated ledger, start delayed SubAccountHistory
firstTime = false;
for (auto& outer : mSubAccountHistory)
{
for (auto& inner : outer.second)
{
auto& subInfo = inner.second;
if (subInfo.index_->separationLedgerSeq_ == 0)
{
subAccountHistoryStart(
alpAccepted->getLedger(), subInfo);
}
}
}
}
}
}
// Don't lock since pubAcceptedTransaction is locking.
@@ -2952,18 +3083,21 @@ NetworkOPsImp::pubAccountTransaction(
int iProposed = 0;
int iAccepted = 0;
std::vector<SubAccountHistoryInfo> accountHistoryNotify;
auto const currLedgerSeq = lpCurrent->seq();
{
std::lock_guard sl(mSubLock);
if (!bAccepted && mSubRTAccount.empty())
return;
if (!mSubAccount.empty() || (!mSubRTAccount.empty()))
if (!mSubAccount.empty() || (!mSubRTAccount.empty()) ||
!mSubAccountHistory.empty())
{
for (auto const& affectedAccount : alTx.getAffected())
{
auto simiIt = mSubRTAccount.find(affectedAccount);
if (simiIt != mSubRTAccount.end())
if (auto simiIt = mSubRTAccount.find(affectedAccount);
simiIt != mSubRTAccount.end())
{
auto it = simiIt->second.begin();
@@ -2984,9 +3118,8 @@ NetworkOPsImp::pubAccountTransaction(
if (bAccepted)
{
simiIt = mSubAccount.find(affectedAccount);
if (simiIt != mSubAccount.end())
if (auto simiIt = mSubAccount.find(affectedAccount);
simiIt != mSubAccount.end())
{
auto it = simiIt->second.begin();
while (it != simiIt->second.end())
@@ -3003,15 +3136,46 @@ NetworkOPsImp::pubAccountTransaction(
it = simiIt->second.erase(it);
}
}
if (auto histoIt = mSubAccountHistory.find(affectedAccount);
histoIt != mSubAccountHistory.end())
{
auto& subs = histoIt->second;
auto it = subs.begin();
while (it != subs.end())
{
SubAccountHistoryInfoWeak const& info = it->second;
if (currLedgerSeq <=
info.index_->separationLedgerSeq_)
{
++it;
continue;
}
if (auto isSptr = info.sinkWptr_.lock(); isSptr)
{
accountHistoryNotify.emplace_back(
SubAccountHistoryInfo{isSptr, info.index_});
++it;
}
else
{
it = subs.erase(it);
}
}
if (subs.empty())
mSubAccountHistory.erase(histoIt);
}
}
}
}
}
JLOG(m_journal.trace())
<< "pubAccountTransaction:"
<< " iProposed=" << iProposed << " iAccepted=" << iAccepted;
if (!notify.empty())
if (!notify.empty() || !accountHistoryNotify.empty())
{
std::shared_ptr<STTx const> stTxn = alTx.getTxn();
Json::Value jvObj =
@@ -3029,6 +3193,16 @@ NetworkOPsImp::pubAccountTransaction(
for (InfoSub::ref isrListener : notify)
isrListener->send(jvObj, true);
assert(!jvObj.isMember(jss::account_history_tx_stream));
for (auto& info : accountHistoryNotify)
{
auto& index = info.index_;
if (index->forwardTxIndex_ == 0 && !index->haveHistorical_)
jvObj[jss::account_history_tx_first] = true;
jvObj[jss::account_history_tx_index] = index->forwardTxIndex_++;
info.sink_->send(jvObj, true);
}
}
}
@@ -3117,6 +3291,456 @@ NetworkOPsImp::unsubAccountInternal(
}
}
void
NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
{
enum DatabaseType { Postgres, Sqlite, None };
static const auto databaseType = [&]() -> DatabaseType {
#ifdef RIPPLED_REPORTING
if (app_.config().reporting())
{
if (dynamic_cast<RelationalDBInterfacePostgres*>(
&app_.getRelationalDBInterface()))
{
return DatabaseType::Postgres;
}
return DatabaseType::None;
}
else
{
if (dynamic_cast<RelationalDBInterfaceSqlite*>(
&app_.getRelationalDBInterface()))
{
return DatabaseType::Sqlite;
}
return DatabaseType::None;
}
#else
if (dynamic_cast<RelationalDBInterfaceSqlite*>(
&app_.getRelationalDBInterface()))
{
return DatabaseType::Sqlite;
}
return DatabaseType::None;
#endif
}();
if (databaseType == DatabaseType::None)
{
JLOG(m_journal.error())
<< "AccountHistory job for account "
<< toBase58(subInfo.index_->accountId_) << " no database";
if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
{
sptr->send(rpcError(rpcINTERNAL), true);
unsubAccountHistory(sptr, subInfo.index_->accountId_, false);
}
return;
}
app_.getJobQueue().addJob(
jtCLIENT,
"AccountHistoryTxStream",
[this, dbType = databaseType, subInfo](Job&) {
auto const& accountId = subInfo.index_->accountId_;
auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_;
auto& txHistoryIndex = subInfo.index_->historyTxIndex_;
JLOG(m_journal.trace())
<< "AccountHistory job for account " << toBase58(accountId)
<< " started. lastLedgerSeq=" << lastLedgerSeq;
auto isFirstTx = [&](std::shared_ptr<Transaction> const& tx,
std::shared_ptr<TxMeta> const& meta) -> bool {
/*
* genesis account: first tx is the one with seq 1
* other account: first tx is the one created the account
*/
if (accountId == genesisAccountId)
{
auto stx = tx->getSTransaction();
if (stx->getAccountID(sfAccount) == accountId &&
stx->getSeqProxy().value() == 1)
return true;
}
for (auto& node : meta->getNodes())
{
if (node.getFieldU16(sfLedgerEntryType) != ltACCOUNT_ROOT)
continue;
if (node.isFieldPresent(sfNewFields))
{
if (auto inner = dynamic_cast<const STObject*>(
node.peekAtPField(sfNewFields));
inner)
{
if (inner->isFieldPresent(sfAccount) &&
inner->getAccountID(sfAccount) == accountId)
{
return true;
}
}
}
}
return false;
};
auto send = [&](Json::Value const& jvObj,
bool unsubscribe) -> bool {
if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
{
sptr->send(jvObj, true);
if (unsubscribe)
unsubAccountHistory(sptr, accountId, false);
return true;
}
return false;
};
auto getMoreTxns =
[&](std::uint32_t minLedger,
std::uint32_t maxLedger,
std::optional<RelationalDBInterface::AccountTxMarker>
marker)
-> std::optional<std::pair<
RelationalDBInterface::AccountTxs,
std::optional<RelationalDBInterface::AccountTxMarker>>> {
switch (dbType)
{
case Postgres: {
auto db = static_cast<RelationalDBInterfacePostgres*>(
&app_.getRelationalDBInterface());
RelationalDBInterface::AccountTxArgs args;
args.account = accountId;
LedgerRange range{minLedger, maxLedger};
args.ledger = range;
args.marker = marker;
auto [txResult, status] = db->getAccountTx(args);
if (status != rpcSUCCESS)
{
JLOG(m_journal.debug())
<< "AccountHistory job for account "
<< toBase58(accountId)
<< " getAccountTx failed";
return {};
}
if (auto txns =
std::get_if<RelationalDBInterface::AccountTxs>(
&txResult.transactions);
txns)
{
return std::make_pair(*txns, txResult.marker);
}
else
{
JLOG(m_journal.debug())
<< "AccountHistory job for account "
<< toBase58(accountId)
<< " getAccountTx wrong data";
return {};
}
}
case Sqlite: {
auto db = static_cast<RelationalDBInterfaceSqlite*>(
&app_.getRelationalDBInterface());
RelationalDBInterface::AccountTxPageOptions options{
accountId, minLedger, maxLedger, marker, 0, true};
return db->newestAccountTxPage(options);
}
default: {
assert(false);
return {};
}
}
};
/*
* search backward until the genesis ledger or asked to stop
*/
while (lastLedgerSeq >= 2 && !subInfo.index_->stopHistorical_)
{
int feeChargeCount = 0;
if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
{
sptr->getConsumer().charge(Resource::feeMediumBurdenRPC);
++feeChargeCount;
}
else
{
JLOG(m_journal.trace())
<< "AccountHistory job for account "
<< toBase58(accountId) << " no InfoSub. Fee charged "
<< feeChargeCount << " times.";
return;
}
// try to search in 1024 ledgers till reaching genesis ledgers
auto startLedgerSeq =
(lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 : 2);
JLOG(m_journal.trace())
<< "AccountHistory job for account " << toBase58(accountId)
<< ", working on ledger range [" << startLedgerSeq << ","
<< lastLedgerSeq << "]";
auto haveRange = [&]() -> bool {
std::uint32_t validatedMin = UINT_MAX;
std::uint32_t validatedMax = 0;
auto haveSomeValidatedLedgers =
app_.getLedgerMaster().getValidatedRange(
validatedMin, validatedMax);
return haveSomeValidatedLedgers &&
validatedMin <= startLedgerSeq &&
lastLedgerSeq <= validatedMax;
}();
if (!haveRange)
{
JLOG(m_journal.debug())
<< "AccountHistory reschedule job for account "
<< toBase58(accountId) << ", incomplete ledger range ["
<< startLedgerSeq << "," << lastLedgerSeq << "]";
setAccountHistoryJobTimer(subInfo);
return;
}
std::optional<RelationalDBInterface::AccountTxMarker> marker{};
while (!subInfo.index_->stopHistorical_)
{
auto dbResult =
getMoreTxns(startLedgerSeq, lastLedgerSeq, marker);
if (!dbResult)
{
JLOG(m_journal.debug())
<< "AccountHistory job for account "
<< toBase58(accountId) << " getMoreTxns failed.";
send(rpcError(rpcINTERNAL), true);
return;
}
auto const& txns = dbResult->first;
marker = dbResult->second;
for (auto const& [tx, meta] : txns)
{
if (!tx || !meta)
{
JLOG(m_journal.debug())
<< "AccountHistory job for account "
<< toBase58(accountId) << " empty tx or meta.";
send(rpcError(rpcINTERNAL), true);
return;
}
auto curTxLedger =
app_.getLedgerMaster().getLedgerBySeq(
tx->getLedger());
if (!curTxLedger)
{
JLOG(m_journal.debug())
<< "AccountHistory job for account "
<< toBase58(accountId) << " no ledger.";
send(rpcError(rpcINTERNAL), true);
return;
}
std::shared_ptr<STTx const> stTxn =
tx->getSTransaction();
if (!stTxn)
{
JLOG(m_journal.debug())
<< "AccountHistory job for account "
<< toBase58(accountId)
<< " getSTransaction failed.";
send(rpcError(rpcINTERNAL), true);
return;
}
Json::Value jvTx = transJson(
*stTxn, meta->getResultTER(), true, curTxLedger);
jvTx[jss::meta] = meta->getJson(JsonOptions::none);
jvTx[jss::account_history_tx_index] = txHistoryIndex--;
RPC::insertDeliveredAmount(
jvTx[jss::meta], *curTxLedger, stTxn, *meta);
if (isFirstTx(tx, meta))
{
jvTx[jss::account_history_tx_first] = true;
send(jvTx, false);
JLOG(m_journal.trace())
<< "AccountHistory job for account "
<< toBase58(accountId)
<< " done, found last tx.";
return;
}
else
{
send(jvTx, false);
}
}
if (marker)
{
JLOG(m_journal.trace())
<< "AccountHistory job for account "
<< toBase58(accountId)
<< " paging, marker=" << marker->ledgerSeq << ":"
<< marker->txnSeq;
}
else
{
break;
}
}
if (!subInfo.index_->stopHistorical_)
{
lastLedgerSeq = startLedgerSeq - 1;
if (lastLedgerSeq <= 1)
{
JLOG(m_journal.trace())
<< "AccountHistory job for account "
<< toBase58(accountId)
<< " done, reached genesis ledger.";
return;
}
}
}
});
}
void
NetworkOPsImp::subAccountHistoryStart(
std::shared_ptr<ReadView const> const& ledger,
SubAccountHistoryInfoWeak& subInfo)
{
subInfo.index_->separationLedgerSeq_ = ledger->seq();
auto const& accountId = subInfo.index_->accountId_;
auto const accountKeylet = keylet::account(accountId);
if (!ledger->exists(accountKeylet))
{
JLOG(m_journal.debug())
<< "subAccountHistoryStart, no account " << toBase58(accountId)
<< ", no need to add AccountHistory job.";
return;
}
if (accountId == genesisAccountId)
{
if (auto const sleAcct = ledger->read(accountKeylet); sleAcct)
{
if (sleAcct->getFieldU32(sfSequence) == 1)
{
JLOG(m_journal.debug())
<< "subAccountHistoryStart, genesis account "
<< toBase58(accountId)
<< " does not have tx, no need to add AccountHistory job.";
return;
}
}
else
{
assert(false);
return;
}
}
subInfo.index_->historyLastLedgerSeq_ = ledger->seq();
subInfo.index_->haveHistorical_ = true;
JLOG(m_journal.debug())
<< "subAccountHistoryStart, add AccountHistory job: accountId="
<< toBase58(accountId) << ", currentLedgerSeq=" << ledger->seq();
addAccountHistoryJob(subInfo);
}
error_code_i
NetworkOPsImp::subAccountHistory(
InfoSub::ref isrListener,
AccountID const& accountId)
{
if (!isrListener->insertSubAccountHistory(accountId))
{
JLOG(m_journal.debug())
<< "subAccountHistory, already subscribed to account "
<< toBase58(accountId);
return rpcINVALID_PARAMS;
}
std::lock_guard sl(mSubLock);
SubAccountHistoryInfoWeak ahi{
isrListener, std::make_shared<SubAccountHistoryIndex>(accountId)};
auto simIterator = mSubAccountHistory.find(accountId);
if (simIterator == mSubAccountHistory.end())
{
hash_map<std::uint64_t, SubAccountHistoryInfoWeak> inner;
inner.emplace(isrListener->getSeq(), ahi);
mSubAccountHistory.insert(
simIterator, std::make_pair(accountId, inner));
}
else
{
simIterator->second.emplace(isrListener->getSeq(), ahi);
}
auto const ledger = app_.getLedgerMaster().getValidatedLedger();
if (ledger)
{
subAccountHistoryStart(ledger, ahi);
}
else
{
// The node does not have validated ledgers, so wait for
// one before start streaming.
// In this case, the subscription is also considered successful.
JLOG(m_journal.debug())
<< "subAccountHistory, no validated ledger yet, delay start";
}
return rpcSUCCESS;
}
void
NetworkOPsImp::unsubAccountHistory(
InfoSub::ref isrListener,
AccountID const& account,
bool historyOnly)
{
if (!historyOnly)
isrListener->deleteSubAccountHistory(account);
unsubAccountHistoryInternal(isrListener->getSeq(), account, historyOnly);
}
void
NetworkOPsImp::unsubAccountHistoryInternal(
std::uint64_t seq,
const AccountID& account,
bool historyOnly)
{
std::lock_guard sl(mSubLock);
auto simIterator = mSubAccountHistory.find(account);
if (simIterator != mSubAccountHistory.end())
{
auto& subInfoMap = simIterator->second;
auto subInfoIter = subInfoMap.find(seq);
if (subInfoIter != subInfoMap.end())
{
subInfoIter->second.index_->stopHistorical_ = true;
}
if (!historyOnly)
{
simIterator->second.erase(seq);
if (simIterator->second.empty())
{
mSubAccountHistory.erase(simIterator);
}
}
JLOG(m_journal.debug())
<< "unsubAccountHistory, account " << toBase58(account)
<< ", historyOnly = " << (historyOnly ? "true" : "false");
}
}
bool
NetworkOPsImp::subBook(InfoSub::ref isrListener, Book const& book)
{

View File

@@ -24,6 +24,7 @@
#include <ripple/basics/CountedObject.h>
#include <ripple/json/json_value.h>
#include <ripple/protocol/Book.h>
#include <ripple/protocol/ErrorCodes.h>
#include <ripple/resource/Consumer.h>
#include <mutex>
@@ -82,6 +83,34 @@ public:
hash_set<AccountID> const& vnaAccountIDs,
bool realTime) = 0;
/**
* subscribe an account's new transactions and retrieve the account's
* historical transactions
* @return rpcSUCCESS if successful, otherwise an error code
*/
virtual error_code_i
subAccountHistory(ref ispListener, AccountID const& account) = 0;
/**
* unsubscribe an account's transactions
* @param historyOnly if true, only stop historical transactions
* @note once a client receives enough historical transactions,
* it should unsubscribe with historyOnly == true to stop receiving
* more historical transactions. It will continue to receive new
* transactions.
*/
virtual void
unsubAccountHistory(
ref ispListener,
AccountID const& account,
bool historyOnly) = 0;
virtual void
unsubAccountHistoryInternal(
std::uint64_t uListener,
AccountID const& account,
bool historyOnly) = 0;
// VFALCO TODO Document the bool return value
virtual bool
subLedger(ref ispListener, Json::Value& jvResult) = 0;
@@ -168,6 +197,13 @@ public:
void
deleteSubAccountInfo(AccountID const& account, bool rt);
// return false if already subscribed to this account
bool
insertSubAccountHistory(AccountID const& account);
void
deleteSubAccountHistory(AccountID const& account);
void
clearPathRequest();
@@ -187,6 +223,7 @@ private:
hash_set<AccountID> normalSubscriptions_;
std::shared_ptr<PathRequest> mPathRequest;
std::uint64_t mSeq;
hash_set<AccountID> accountHistorySubscriptions_;
static int
assign_id()

View File

@@ -60,6 +60,9 @@ InfoSub::~InfoSub()
if (!normalSubscriptions_.empty())
m_source.unsubAccountInternal(mSeq, normalSubscriptions_, false);
for (auto const& account : accountHistorySubscriptions_)
m_source.unsubAccountHistoryInternal(mSeq, account, false);
}
Resource::Consumer&
@@ -101,6 +104,20 @@ InfoSub::deleteSubAccountInfo(AccountID const& account, bool rt)
normalSubscriptions_.erase(account);
}
bool
InfoSub::insertSubAccountHistory(AccountID const& account)
{
std::lock_guard sl(mLock);
return accountHistorySubscriptions_.insert(account).second;
}
void
InfoSub::deleteSubAccountHistory(AccountID const& account)
{
std::lock_guard sl(mLock);
accountHistorySubscriptions_.erase(account);
}
void
InfoSub::clearPathRequest()
{

View File

@@ -112,6 +112,9 @@ JSS(account_objects); // out: AccountObjects
JSS(account_root); // in: LedgerEntry
JSS(account_sequence_next); // out: SubmitTransaction
JSS(account_sequence_available); // out: SubmitTransaction
JSS(account_history_tx_stream); // in: Subscribe, Unsubscribe
JSS(account_history_tx_index); // out: Account txn history subscribe
JSS(account_history_tx_first); // out: Account txn history subscribe
JSS(accounts); // in: LedgerEntry, Subscribe,
// handlers/Ledger, Unsubscribe
JSS(accounts_proposed); // in: Subscribe, Unsubscribe
@@ -518,29 +521,30 @@ JSS(source_tag); // out: AccountChannels
JSS(stand_alone); // out: NetworkOPs
JSS(start); // in: TxHistory
JSS(started);
JSS(state); // out: Logic.h, ServerState, LedgerData
JSS(state_accounting); // out: NetworkOPs
JSS(state_now); // in: Subscribe
JSS(status); // error
JSS(stop); // in: LedgerCleaner
JSS(storedSeqs); // out: NodeToShardStatus
JSS(streams); // in: Subscribe, Unsubscribe
JSS(strict); // in: AccountCurrencies, AccountInfo
JSS(sub_index); // in: LedgerEntry
JSS(subcommand); // in: PathFind
JSS(success); // rpc
JSS(supported); // out: AmendmentTableImpl
JSS(system_time_offset); // out: NetworkOPs
JSS(tag); // out: Peers
JSS(taker); // in: Subscribe, BookOffers
JSS(taker_gets); // in: Subscribe, Unsubscribe, BookOffers
JSS(taker_gets_funded); // out: NetworkOPs
JSS(taker_pays); // in: Subscribe, Unsubscribe, BookOffers
JSS(taker_pays_funded); // out: NetworkOPs
JSS(threshold); // in: Blacklist
JSS(ticket); // in: AccountObjects
JSS(ticket_count); // out: AccountInfo
JSS(ticket_seq); // in: LedgerEntry
JSS(state); // out: Logic.h, ServerState, LedgerData
JSS(state_accounting); // out: NetworkOPs
JSS(state_now); // in: Subscribe
JSS(status); // error
JSS(stop); // in: LedgerCleaner
JSS(stop_history_tx_only); // in: Unsubscribe, stop history tx stream
JSS(storedSeqs); // out: NodeToShardStatus
JSS(streams); // in: Subscribe, Unsubscribe
JSS(strict); // in: AccountCurrencies, AccountInfo
JSS(sub_index); // in: LedgerEntry
JSS(subcommand); // in: PathFind
JSS(success); // rpc
JSS(supported); // out: AmendmentTableImpl
JSS(system_time_offset); // out: NetworkOPs
JSS(tag); // out: Peers
JSS(taker); // in: Subscribe, BookOffers
JSS(taker_gets); // in: Subscribe, Unsubscribe, BookOffers
JSS(taker_gets_funded); // out: NetworkOPs
JSS(taker_pays); // in: Subscribe, Unsubscribe, BookOffers
JSS(taker_pays_funded); // out: NetworkOPs
JSS(threshold); // in: Blacklist
JSS(ticket); // in: AccountObjects
JSS(ticket_count); // out: AccountInfo
JSS(ticket_seq); // in: LedgerEntry
JSS(time);
JSS(timeouts); // out: InboundLedger
JSS(track); // out: PeerImp

View File

@@ -201,6 +201,33 @@ doSubscribe(RPC::JsonContext& context)
JLOG(context.j.debug()) << "doSubscribe: accounts: " << ids.size();
}
if (context.params.isMember(jss::account_history_tx_stream))
{
if (!context.app.config().useTxTables())
return rpcError(rpcNOT_ENABLED);
context.loadType = Resource::feeMediumBurdenRPC;
auto const& req = context.params[jss::account_history_tx_stream];
if (!req.isMember(jss::account) || !req[jss::account].isString())
return rpcError(rpcINVALID_PARAMS);
auto const id = parseBase58<AccountID>(req[jss::account].asString());
if (!id)
return rpcError(rpcINVALID_PARAMS);
if (auto result = context.netOps.subAccountHistory(ispSub, *id);
result != rpcSUCCESS)
{
return rpcError(result);
}
jvResult[jss::warning] =
"account_history_tx_stream is an experimental feature and likely "
"to be removed in the future";
JLOG(context.j.debug())
<< "doSubscribe: account_history_tx_stream: " << toBase58(*id);
}
if (context.params.isMember(jss::books))
{
if (!context.params[jss::books].isArray())

View File

@@ -134,6 +134,30 @@ doUnsubscribe(RPC::JsonContext& context)
context.netOps.unsubAccount(ispSub, ids, false);
}
if (context.params.isMember(jss::account_history_tx_stream))
{
auto const& req = context.params[jss::account_history_tx_stream];
if (!req.isMember(jss::account) || !req[jss::account].isString())
return rpcError(rpcINVALID_PARAMS);
auto const id = parseBase58<AccountID>(req[jss::account].asString());
if (!id)
return rpcError(rpcINVALID_PARAMS);
bool stopHistoryOnly = false;
if (req.isMember(jss::stop_history_tx_only))
{
if (!req[jss::stop_history_tx_only].isBool())
return rpcError(rpcINVALID_PARAMS);
stopHistoryOnly = req[jss::stop_history_tx_only].asBool();
}
context.netOps.unsubAccountHistory(ispSub, *id, stopHistoryOnly);
JLOG(context.j.debug())
<< "doUnsubscribe: account_history_tx_stream: " << toBase58(*id)
<< " stopHistoryOnly=" << (stopHistoryOnly ? "true" : "false");
}
if (context.params.isMember(jss::books))
{
if (!context.params[jss::books].isArray())

View File

@@ -735,6 +735,408 @@ public:
}
}
void
testHistoryTxStream()
{
testcase("HistoryTxStream");
using namespace std::chrono_literals;
using namespace jtx;
using IdxHashVec = std::vector<std::pair<int, std::string>>;
Account alice("alice");
Account bob("bob");
Account carol("carol");
Account david("david");
///////////////////////////////////////////////////////////////////
/*
* return true if the subscribe or unsubscribe result is a success
*/
auto goodSubRPC = [](Json::Value const& subReply) -> bool {
return subReply.isMember(jss::result) &&
subReply[jss::result].isMember(jss::status) &&
subReply[jss::result][jss::status] == jss::success;
};
/*
* try to receive txns from the tx stream subscription via the WSClient.
* return {true, true} if received numReplies replies and also
* received a tx with the account_history_tx_first == true
*/
auto getTxHash = [](WSClient& wsc,
IdxHashVec& v,
int numReplies) -> std::pair<bool, bool> {
bool first_flag = false;
for (int i = 0; i < numReplies; ++i)
{
std::uint32_t idx{0};
auto reply = wsc.getMsg(100ms);
if (reply)
{
auto r = *reply;
if (r.isMember(jss::account_history_tx_index))
idx = r[jss::account_history_tx_index].asInt();
if (r.isMember(jss::account_history_tx_first))
first_flag = true;
if (r.isMember(jss::transaction) &&
r[jss::transaction].isMember(jss::hash))
{
v.emplace_back(
idx, r[jss::transaction][jss::hash].asString());
continue;
}
}
return {false, first_flag};
}
return {true, first_flag};
};
/*
* send payments between the two accounts a and b,
* and close ledgersToClose ledgers
*/
auto sendPayments = [](Env& env,
Account const& a,
Account const& b,
int newTxns,
std::uint32_t ledgersToClose,
int numXRP = 10) {
env.memoize(a);
env.memoize(b);
for (int i = 0; i < newTxns; ++i)
{
auto& from = (i % 2 == 0) ? a : b;
auto& to = (i % 2 == 0) ? b : a;
env.apply(
pay(from, to, jtx::XRP(numXRP)),
jtx::seq(jtx::autofill),
jtx::fee(jtx::autofill),
jtx::sig(jtx::autofill));
}
for (int i = 0; i < ledgersToClose; ++i)
env.close();
return newTxns;
};
/*
* Check if txHistoryVec has every item of accountVec,
* and in the same order.
* If sizeCompare is false, txHistoryVec is allowed to be larger.
*/
auto hashCompare = [](IdxHashVec const& accountVec,
IdxHashVec const& txHistoryVec,
bool sizeCompare) -> bool {
if (accountVec.empty() || txHistoryVec.empty())
return false;
if (sizeCompare && accountVec.size() != (txHistoryVec.size()))
return false;
hash_map<std::string, int> txHistoryMap;
for (auto const& tx : txHistoryVec)
{
txHistoryMap.emplace(tx.second, tx.first);
}
auto getHistoryIndex = [&](std::size_t i) -> std::optional<int> {
if (i >= accountVec.size())
return {};
auto it = txHistoryMap.find(accountVec[i].second);
if (it == txHistoryMap.end())
return {};
return it->second;
};
auto firstHistoryIndex = getHistoryIndex(0);
if (!firstHistoryIndex)
return false;
for (std::size_t i = 1; i < accountVec.size(); ++i)
{
if (auto idx = getHistoryIndex(i);
!idx || *idx != *firstHistoryIndex + i)
return false;
}
return true;
};
///////////////////////////////////////////////////////////////////
{
/*
* subscribe to an account twice with same WS client,
* the second should fail
*
* also test subscribe to the account before it is created
*/
Env env(*this);
auto wscTxHistory = makeWSClient(env.app().config());
Json::Value request;
request[jss::account_history_tx_stream] = Json::objectValue;
request[jss::account_history_tx_stream][jss::account] =
alice.human();
auto jv = wscTxHistory->invoke("subscribe", request);
if (!BEAST_EXPECT(goodSubRPC(jv)))
return;
jv = wscTxHistory->invoke("subscribe", request);
BEAST_EXPECT(!goodSubRPC(jv));
/*
* unsubscribe history only, future txns should still be streamed
*/
request[jss::account_history_tx_stream][jss::stop_history_tx_only] =
true;
jv = wscTxHistory->invoke("unsubscribe", request);
if (!BEAST_EXPECT(goodSubRPC(jv)))
return;
sendPayments(env, env.master, alice, 1, 1, 123456);
IdxHashVec vec;
auto r = getTxHash(*wscTxHistory, vec, 1);
if (!BEAST_EXPECT(r.first && r.second))
return;
/*
* unsubscribe, future txns should not be streamed
*/
request[jss::account_history_tx_stream][jss::stop_history_tx_only] =
false;
jv = wscTxHistory->invoke("unsubscribe", request);
BEAST_EXPECT(goodSubRPC(jv));
sendPayments(env, env.master, alice, 1, 1);
r = getTxHash(*wscTxHistory, vec, 1);
BEAST_EXPECT(!r.first);
}
{
/*
* subscribe genesis account tx history without txns
* subscribe to bob's account after it is created
*/
Env env(*this);
auto wscTxHistory = makeWSClient(env.app().config());
Json::Value request;
request[jss::account_history_tx_stream] = Json::objectValue;
request[jss::account_history_tx_stream][jss::account] =
"rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh";
auto jv = wscTxHistory->invoke("subscribe", request);
if (!BEAST_EXPECT(goodSubRPC(jv)))
return;
IdxHashVec genesisFullHistoryVec;
if (!BEAST_EXPECT(
!getTxHash(*wscTxHistory, genesisFullHistoryVec, 1).first))
return;
/*
* create bob's account with one tx
* the two subscriptions should both stream it
*/
sendPayments(env, env.master, bob, 1, 1, 654321);
auto r = getTxHash(*wscTxHistory, genesisFullHistoryVec, 1);
if (!BEAST_EXPECT(r.first && r.second))
return;
request[jss::account_history_tx_stream][jss::account] = bob.human();
jv = wscTxHistory->invoke("subscribe", request);
if (!BEAST_EXPECT(goodSubRPC(jv)))
return;
IdxHashVec bobFullHistoryVec;
r = getTxHash(*wscTxHistory, bobFullHistoryVec, 1);
if (!BEAST_EXPECT(r.first && r.second))
return;
BEAST_EXPECT(
bobFullHistoryVec.back().second ==
genesisFullHistoryVec.back().second);
/*
* unsubscribe to prepare next test
*/
jv = wscTxHistory->invoke("unsubscribe", request);
if (!BEAST_EXPECT(goodSubRPC(jv)))
return;
request[jss::account_history_tx_stream][jss::account] =
"rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh";
jv = wscTxHistory->invoke("unsubscribe", request);
BEAST_EXPECT(goodSubRPC(jv));
/*
* add more txns, then subscribe bob tx history and
* genesis account tx history. Their earliest txns should match.
*/
sendPayments(env, env.master, bob, 30, 300);
wscTxHistory = makeWSClient(env.app().config());
request[jss::account_history_tx_stream][jss::account] = bob.human();
jv = wscTxHistory->invoke("subscribe", request);
bobFullHistoryVec.clear();
BEAST_EXPECT(
getTxHash(*wscTxHistory, bobFullHistoryVec, 31).second);
jv = wscTxHistory->invoke("unsubscribe", request);
request[jss::account_history_tx_stream][jss::account] =
"rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh";
jv = wscTxHistory->invoke("subscribe", request);
genesisFullHistoryVec.clear();
BEAST_EXPECT(
getTxHash(*wscTxHistory, genesisFullHistoryVec, 31).second);
jv = wscTxHistory->invoke("unsubscribe", request);
BEAST_EXPECT(
bobFullHistoryVec.back().second ==
genesisFullHistoryVec.back().second);
}
{
/*
* subscribe account and subscribe account tx history
* and compare txns streamed
*/
Env env(*this);
auto wscAccount = makeWSClient(env.app().config());
auto wscTxHistory = makeWSClient(env.app().config());
std::array<Account, 2> accounts = {alice, bob};
env.fund(XRP(222222), accounts);
env.close();
// subscribe account
Json::Value stream = Json::objectValue;
stream[jss::accounts] = Json::arrayValue;
stream[jss::accounts].append(alice.human());
auto jv = wscAccount->invoke("subscribe", stream);
sendPayments(env, alice, bob, 5, 1);
sendPayments(env, alice, bob, 5, 1);
IdxHashVec accountVec;
if (!BEAST_EXPECT(getTxHash(*wscAccount, accountVec, 10).first))
return;
// subscribe account tx history
Json::Value request;
request[jss::account_history_tx_stream] = Json::objectValue;
request[jss::account_history_tx_stream][jss::account] =
alice.human();
jv = wscTxHistory->invoke("subscribe", request);
// compare historical txns
IdxHashVec txHistoryVec;
if (!BEAST_EXPECT(getTxHash(*wscTxHistory, txHistoryVec, 10).first))
return;
if (!BEAST_EXPECT(hashCompare(accountVec, txHistoryVec, true)))
return;
{
// take out all history txns from stream to prepare next test
IdxHashVec initFundTxns;
if (!BEAST_EXPECT(
getTxHash(*wscTxHistory, initFundTxns, 10).second))
return;
}
// compare future txns
sendPayments(env, alice, bob, 10, 1);
if (!BEAST_EXPECT(getTxHash(*wscAccount, accountVec, 10).first))
return;
if (!BEAST_EXPECT(getTxHash(*wscTxHistory, txHistoryVec, 10).first))
return;
if (!BEAST_EXPECT(hashCompare(accountVec, txHistoryVec, true)))
return;
wscTxHistory->invoke("unsubscribe", request);
wscAccount->invoke("unsubscribe", stream);
}
{
/*
* alice issues USD to carol
* mix USD and XRP payments
*/
Env env(*this);
auto const USD_a = alice["USD"];
std::array<Account, 2> accounts = {alice, carol};
env.fund(XRP(333333), accounts);
env.trust(USD_a(20000), carol);
env.close();
auto mixedPayments = [&]() -> int {
sendPayments(env, alice, carol, 1, 0);
env(pay(alice, carol, USD_a(100)));
env.close();
return 2;
};
// subscribe
Json::Value request;
request[jss::account_history_tx_stream] = Json::objectValue;
request[jss::account_history_tx_stream][jss::account] =
carol.human();
auto ws = makeWSClient(env.app().config());
auto jv = ws->invoke("subscribe", request);
{
// take out existing txns from the stream
IdxHashVec tempVec;
getTxHash(*ws, tempVec, 100);
}
auto count = mixedPayments();
IdxHashVec vec1;
if (!BEAST_EXPECT(getTxHash(*ws, vec1, count).first))
return;
ws->invoke("unsubscribe", request);
}
{
/*
* long transaction history
*/
Env env(*this);
std::array<Account, 2> accounts = {alice, carol};
env.fund(XRP(444444), accounts);
env.close();
// many payments, and close lots of ledgers
auto oneRound = [&](int numPayments) {
return sendPayments(env, alice, carol, numPayments, 300);
};
// subscribe
Json::Value request;
request[jss::account_history_tx_stream] = Json::objectValue;
request[jss::account_history_tx_stream][jss::account] =
carol.human();
auto wscLong = makeWSClient(env.app().config());
auto jv = wscLong->invoke("subscribe", request);
{
// take out existing txns from the stream
IdxHashVec tempVec;
getTxHash(*wscLong, tempVec, 100);
}
// repeat the payments many rounds
for (int kk = 2; kk < 10; ++kk)
{
auto count = oneRound(kk);
IdxHashVec vec1;
if (!BEAST_EXPECT(getTxHash(*wscLong, vec1, count).first))
return;
// another subscribe, only for this round
auto wscShort = makeWSClient(env.app().config());
auto jv = wscShort->invoke("subscribe", request);
IdxHashVec vec2;
if (!BEAST_EXPECT(getTxHash(*wscShort, vec2, count).first))
return;
if (!BEAST_EXPECT(hashCompare(vec1, vec2, true)))
return;
wscShort->invoke("unsubscribe", request);
}
}
}
void
run() override
{
@@ -746,6 +1148,7 @@ public:
testSubErrors(true);
testSubErrors(false);
testSubByUrl();
testHistoryTxStream();
}
};