From e2a42184b9acae9062f2f6ff7d87d393572b300e Mon Sep 17 00:00:00 2001 From: Peng Wang Date: Wed, 31 Mar 2021 10:11:47 -0400 Subject: [PATCH] Implement tx stream with history --- src/ripple/app/misc/NetworkOPs.cpp | 712 ++++++++++++++++++++++-- src/ripple/net/InfoSub.h | 37 ++ src/ripple/net/impl/InfoSub.cpp | 17 + src/ripple/protocol/jss.h | 50 +- src/ripple/rpc/handlers/Subscribe.cpp | 27 + src/ripple/rpc/handlers/Unsubscribe.cpp | 24 + src/test/rpc/Subscribe_test.cpp | 403 ++++++++++++++ 7 files changed, 1203 insertions(+), 67 deletions(-) diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index fc7a4778a..1b3fcd1de 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -38,6 +38,8 @@ #include #include #include +#include +#include #include #include #include @@ -51,6 +53,7 @@ #include #include #include +#include #include #include #include @@ -58,6 +61,7 @@ #include #include #include +#include #include #include #include @@ -229,6 +233,7 @@ public: , mMode(start_valid ? OperatingMode::FULL : OperatingMode::DISCONNECTED) , heartbeatTimer_(io_svc) , clusterTimer_(io_svc) + , accountHistoryTxTimer_(io_svc) , mConsensus( app, make_FeeVote( @@ -481,6 +486,21 @@ public: hash_set const& vnaAccountIDs, bool rt) override; + error_code_i + subAccountHistory(InfoSub::ref ispListener, AccountID const& account) + override; + void + unsubAccountHistory( + InfoSub::ref ispListener, + AccountID const& account, + bool historyOnly) override; + + void + unsubAccountHistoryInternal( + std::uint64_t seq, + AccountID const& account, + bool historyOnly) override; + bool subLedger(InfoSub::ref ispListener, Json::Value& jvResult) override; bool @@ -559,6 +579,15 @@ public: << "NetworkOPs: clusterTimer cancel error: " << ec.message(); } + + ec.clear(); + accountHistoryTxTimer_.cancel(ec); + if (ec) + { + JLOG(m_journal.error()) + << "NetworkOPs: accountHistoryTxTimer cancel error: " + << ec.message(); + } } // Make sure that any waitHandlers pending in our timers are done. using namespace std::chrono_literals; @@ -567,6 +596,12 @@ public: private: void + setTimer( + boost::asio::steady_timer& timer, + std::chrono::milliseconds const& expiry_time, + std::function onExpire, + std::function onError); + void setHeartbeatTimer(); void setClusterTimer(); @@ -605,6 +640,63 @@ private: using SubInfoMapType = hash_map; using subRpcMapType = hash_map; + /* + * With a validated ledger to separate history and future, the node + * streams historical txns with negative indexes starting from -1, + * and streams future txns starting from index 0. + * The SubAccountHistoryIndex struct maintains these indexes. + * It also has a flag stopHistorical_ for stopping streaming + * the historical txns. + */ + struct SubAccountHistoryIndex + { + AccountID const accountId_; + // forward + std::uint32_t forwardTxIndex_; + // separate backward and forward + std::uint32_t separationLedgerSeq_; + // history, backward + std::uint32_t historyLastLedgerSeq_; + std::int32_t historyTxIndex_; + bool haveHistorical_; + std::atomic stopHistorical_; + + SubAccountHistoryIndex(AccountID const& accountId) + : accountId_(accountId) + , forwardTxIndex_(0) + , separationLedgerSeq_(0) + , historyLastLedgerSeq_(0) + , historyTxIndex_(-1) + , haveHistorical_(false) + , stopHistorical_(false) + { + } + }; + struct SubAccountHistoryInfo + { + InfoSub::pointer sink_; + std::shared_ptr index_; + }; + struct SubAccountHistoryInfoWeak + { + InfoSub::wptr sinkWptr_; + std::shared_ptr index_; + }; + using SubAccountHistoryMapType = + hash_map>; + + /** + * @note called while holding mSubLock + */ + void + subAccountHistoryStart( + std::shared_ptr const& ledger, + SubAccountHistoryInfoWeak& subInfo); + void + addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo); + void + setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo); + Application& app_; beast::Journal m_journal; @@ -622,6 +714,7 @@ private: ClosureCounter waitHandlerCounter_; boost::asio::steady_timer heartbeatTimer_; boost::asio::steady_timer clusterTimer_; + boost::asio::steady_timer accountHistoryTxTimer_; RCLConsensus mConsensus; @@ -634,6 +727,8 @@ private: subRpcMapType mRpcSubMap; + SubAccountHistoryMapType mSubAccountHistory; + enum SubTypes { sLedger, // Accepted ledgers. sManifests, // Received validator manifests. @@ -742,6 +837,10 @@ std::array const Json::StaticString(stateNames[3]), Json::StaticString(stateNames[4])}}; +static auto const genesisAccountId = calcAccountID( + generateKeyPair(KeyType::secp256k1, generateSeed("masterpassphrase")) + .first); + //------------------------------------------------------------------------------ inline OperatingMode NetworkOPsImp::getOperatingMode() const @@ -812,18 +911,19 @@ NetworkOPsImp::setStateTimer() } void -NetworkOPsImp::setHeartbeatTimer() +NetworkOPsImp::setTimer( + boost::asio::steady_timer& timer, + const std::chrono::milliseconds& expiry_time, + std::function onExpire, + std::function onError) { // Only start the timer if waitHandlerCounter_ is not yet joined. if (auto optionalCountedHandler = waitHandlerCounter_.wrap( - [this](boost::system::error_code const& e) { + [this, onExpire, onError](boost::system::error_code const& e) { if ((e.value() == boost::system::errc::success) && (!m_job_queue.isStopped())) { - m_job_queue.addJob( - jtNETOP_TIMER, "NetOPs.heartbeat", [this](Job&) { - processHeartbeatTimer(); - }); + onExpire(); } // Recover as best we can if an unexpected error occurs. if (e.value() != boost::system::errc::success && @@ -831,47 +931,57 @@ NetworkOPsImp::setHeartbeatTimer() { // Try again later and hope for the best. JLOG(m_journal.error()) - << "Heartbeat timer got error '" << e.message() + << "Timer got error '" << e.message() << "'. Restarting timer."; - setHeartbeatTimer(); + onError(); } })) { - heartbeatTimer_.expires_from_now(mConsensus.parms().ledgerGRANULARITY); - heartbeatTimer_.async_wait(std::move(*optionalCountedHandler)); + timer.expires_from_now(expiry_time); + timer.async_wait(std::move(*optionalCountedHandler)); } } +void +NetworkOPsImp::setHeartbeatTimer() +{ + setTimer( + heartbeatTimer_, + mConsensus.parms().ledgerGRANULARITY, + [this]() { + m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this](Job&) { + processHeartbeatTimer(); + }); + }, + [this]() { setHeartbeatTimer(); }); +} + void NetworkOPsImp::setClusterTimer() { - // Only start the timer if waitHandlerCounter_ is not yet joined. - if (auto optionalCountedHandler = waitHandlerCounter_.wrap( - [this](boost::system::error_code const& e) { - if ((e.value() == boost::system::errc::success) && - (!m_job_queue.isStopped())) - { - m_job_queue.addJob( - jtNETOP_CLUSTER, "NetOPs.cluster", [this](Job&) { - processClusterTimer(); - }); - } - // Recover as best we can if an unexpected error occurs. - if (e.value() != boost::system::errc::success && - e.value() != boost::asio::error::operation_aborted) - { - // Try again later and hope for the best. - JLOG(m_journal.error()) - << "Cluster timer got error '" << e.message() - << "'. Restarting timer."; - setClusterTimer(); - } - })) - { - using namespace std::chrono_literals; - clusterTimer_.expires_from_now(10s); - clusterTimer_.async_wait(std::move(*optionalCountedHandler)); - } + using namespace std::chrono_literals; + setTimer( + clusterTimer_, + 10s, + [this]() { + m_job_queue.addJob(jtNETOP_CLUSTER, "NetOPs.cluster", [this](Job&) { + processClusterTimer(); + }); + }, + [this]() { setClusterTimer(); }); +} + +void +NetworkOPsImp::setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo) +{ + JLOG(m_journal.debug()) << "Scheduling AccountHistory job for account " + << toBase58(subInfo.index_->accountId_); + using namespace std::chrono_literals; + setTimer( + accountHistoryTxTimer_, + 4s, + [this, subInfo]() { addAccountHistoryJob(subInfo); }, + [this, subInfo]() { setAccountHistoryJobTimer(subInfo); }); } void @@ -2780,6 +2890,27 @@ NetworkOPsImp::pubLedger(std::shared_ptr const& lpAccepted) it = mStreamMaps[sLedger].erase(it); } } + + { + static bool firstTime = true; + if (firstTime) + { + // First validated ledger, start delayed SubAccountHistory + firstTime = false; + for (auto& outer : mSubAccountHistory) + { + for (auto& inner : outer.second) + { + auto& subInfo = inner.second; + if (subInfo.index_->separationLedgerSeq_ == 0) + { + subAccountHistoryStart( + alpAccepted->getLedger(), subInfo); + } + } + } + } + } } // Don't lock since pubAcceptedTransaction is locking. @@ -2952,18 +3083,21 @@ NetworkOPsImp::pubAccountTransaction( int iProposed = 0; int iAccepted = 0; + std::vector accountHistoryNotify; + auto const currLedgerSeq = lpCurrent->seq(); { std::lock_guard sl(mSubLock); if (!bAccepted && mSubRTAccount.empty()) return; - if (!mSubAccount.empty() || (!mSubRTAccount.empty())) + if (!mSubAccount.empty() || (!mSubRTAccount.empty()) || + !mSubAccountHistory.empty()) { for (auto const& affectedAccount : alTx.getAffected()) { - auto simiIt = mSubRTAccount.find(affectedAccount); - if (simiIt != mSubRTAccount.end()) + if (auto simiIt = mSubRTAccount.find(affectedAccount); + simiIt != mSubRTAccount.end()) { auto it = simiIt->second.begin(); @@ -2984,9 +3118,8 @@ NetworkOPsImp::pubAccountTransaction( if (bAccepted) { - simiIt = mSubAccount.find(affectedAccount); - - if (simiIt != mSubAccount.end()) + if (auto simiIt = mSubAccount.find(affectedAccount); + simiIt != mSubAccount.end()) { auto it = simiIt->second.begin(); while (it != simiIt->second.end()) @@ -3003,15 +3136,46 @@ NetworkOPsImp::pubAccountTransaction( it = simiIt->second.erase(it); } } + + if (auto histoIt = mSubAccountHistory.find(affectedAccount); + histoIt != mSubAccountHistory.end()) + { + auto& subs = histoIt->second; + auto it = subs.begin(); + while (it != subs.end()) + { + SubAccountHistoryInfoWeak const& info = it->second; + if (currLedgerSeq <= + info.index_->separationLedgerSeq_) + { + ++it; + continue; + } + + if (auto isSptr = info.sinkWptr_.lock(); isSptr) + { + accountHistoryNotify.emplace_back( + SubAccountHistoryInfo{isSptr, info.index_}); + ++it; + } + else + { + it = subs.erase(it); + } + } + if (subs.empty()) + mSubAccountHistory.erase(histoIt); + } } } } } + JLOG(m_journal.trace()) << "pubAccountTransaction:" << " iProposed=" << iProposed << " iAccepted=" << iAccepted; - if (!notify.empty()) + if (!notify.empty() || !accountHistoryNotify.empty()) { std::shared_ptr stTxn = alTx.getTxn(); Json::Value jvObj = @@ -3029,6 +3193,16 @@ NetworkOPsImp::pubAccountTransaction( for (InfoSub::ref isrListener : notify) isrListener->send(jvObj, true); + + assert(!jvObj.isMember(jss::account_history_tx_stream)); + for (auto& info : accountHistoryNotify) + { + auto& index = info.index_; + if (index->forwardTxIndex_ == 0 && !index->haveHistorical_) + jvObj[jss::account_history_tx_first] = true; + jvObj[jss::account_history_tx_index] = index->forwardTxIndex_++; + info.sink_->send(jvObj, true); + } } } @@ -3117,6 +3291,456 @@ NetworkOPsImp::unsubAccountInternal( } } +void +NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo) +{ + enum DatabaseType { Postgres, Sqlite, None }; + static const auto databaseType = [&]() -> DatabaseType { +#ifdef RIPPLED_REPORTING + if (app_.config().reporting()) + { + if (dynamic_cast( + &app_.getRelationalDBInterface())) + { + return DatabaseType::Postgres; + } + return DatabaseType::None; + } + else + { + if (dynamic_cast( + &app_.getRelationalDBInterface())) + { + return DatabaseType::Sqlite; + } + return DatabaseType::None; + } +#else + if (dynamic_cast( + &app_.getRelationalDBInterface())) + { + return DatabaseType::Sqlite; + } + return DatabaseType::None; +#endif + }(); + + if (databaseType == DatabaseType::None) + { + JLOG(m_journal.error()) + << "AccountHistory job for account " + << toBase58(subInfo.index_->accountId_) << " no database"; + if (auto sptr = subInfo.sinkWptr_.lock(); sptr) + { + sptr->send(rpcError(rpcINTERNAL), true); + unsubAccountHistory(sptr, subInfo.index_->accountId_, false); + } + return; + } + + app_.getJobQueue().addJob( + jtCLIENT, + "AccountHistoryTxStream", + [this, dbType = databaseType, subInfo](Job&) { + auto const& accountId = subInfo.index_->accountId_; + auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_; + auto& txHistoryIndex = subInfo.index_->historyTxIndex_; + + JLOG(m_journal.trace()) + << "AccountHistory job for account " << toBase58(accountId) + << " started. lastLedgerSeq=" << lastLedgerSeq; + + auto isFirstTx = [&](std::shared_ptr const& tx, + std::shared_ptr const& meta) -> bool { + /* + * genesis account: first tx is the one with seq 1 + * other account: first tx is the one created the account + */ + if (accountId == genesisAccountId) + { + auto stx = tx->getSTransaction(); + if (stx->getAccountID(sfAccount) == accountId && + stx->getSeqProxy().value() == 1) + return true; + } + + for (auto& node : meta->getNodes()) + { + if (node.getFieldU16(sfLedgerEntryType) != ltACCOUNT_ROOT) + continue; + + if (node.isFieldPresent(sfNewFields)) + { + if (auto inner = dynamic_cast( + node.peekAtPField(sfNewFields)); + inner) + { + if (inner->isFieldPresent(sfAccount) && + inner->getAccountID(sfAccount) == accountId) + { + return true; + } + } + } + } + + return false; + }; + + auto send = [&](Json::Value const& jvObj, + bool unsubscribe) -> bool { + if (auto sptr = subInfo.sinkWptr_.lock(); sptr) + { + sptr->send(jvObj, true); + if (unsubscribe) + unsubAccountHistory(sptr, accountId, false); + return true; + } + + return false; + }; + + auto getMoreTxns = + [&](std::uint32_t minLedger, + std::uint32_t maxLedger, + std::optional + marker) + -> std::optional>> { + switch (dbType) + { + case Postgres: { + auto db = static_cast( + &app_.getRelationalDBInterface()); + RelationalDBInterface::AccountTxArgs args; + args.account = accountId; + LedgerRange range{minLedger, maxLedger}; + args.ledger = range; + args.marker = marker; + auto [txResult, status] = db->getAccountTx(args); + if (status != rpcSUCCESS) + { + JLOG(m_journal.debug()) + << "AccountHistory job for account " + << toBase58(accountId) + << " getAccountTx failed"; + return {}; + } + + if (auto txns = + std::get_if( + &txResult.transactions); + txns) + { + return std::make_pair(*txns, txResult.marker); + } + else + { + JLOG(m_journal.debug()) + << "AccountHistory job for account " + << toBase58(accountId) + << " getAccountTx wrong data"; + return {}; + } + } + case Sqlite: { + auto db = static_cast( + &app_.getRelationalDBInterface()); + RelationalDBInterface::AccountTxPageOptions options{ + accountId, minLedger, maxLedger, marker, 0, true}; + return db->newestAccountTxPage(options); + } + default: { + assert(false); + return {}; + } + } + }; + + /* + * search backward until the genesis ledger or asked to stop + */ + while (lastLedgerSeq >= 2 && !subInfo.index_->stopHistorical_) + { + int feeChargeCount = 0; + if (auto sptr = subInfo.sinkWptr_.lock(); sptr) + { + sptr->getConsumer().charge(Resource::feeMediumBurdenRPC); + ++feeChargeCount; + } + else + { + JLOG(m_journal.trace()) + << "AccountHistory job for account " + << toBase58(accountId) << " no InfoSub. Fee charged " + << feeChargeCount << " times."; + return; + } + + // try to search in 1024 ledgers till reaching genesis ledgers + auto startLedgerSeq = + (lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 : 2); + JLOG(m_journal.trace()) + << "AccountHistory job for account " << toBase58(accountId) + << ", working on ledger range [" << startLedgerSeq << "," + << lastLedgerSeq << "]"; + + auto haveRange = [&]() -> bool { + std::uint32_t validatedMin = UINT_MAX; + std::uint32_t validatedMax = 0; + auto haveSomeValidatedLedgers = + app_.getLedgerMaster().getValidatedRange( + validatedMin, validatedMax); + + return haveSomeValidatedLedgers && + validatedMin <= startLedgerSeq && + lastLedgerSeq <= validatedMax; + }(); + + if (!haveRange) + { + JLOG(m_journal.debug()) + << "AccountHistory reschedule job for account " + << toBase58(accountId) << ", incomplete ledger range [" + << startLedgerSeq << "," << lastLedgerSeq << "]"; + setAccountHistoryJobTimer(subInfo); + return; + } + + std::optional marker{}; + while (!subInfo.index_->stopHistorical_) + { + auto dbResult = + getMoreTxns(startLedgerSeq, lastLedgerSeq, marker); + if (!dbResult) + { + JLOG(m_journal.debug()) + << "AccountHistory job for account " + << toBase58(accountId) << " getMoreTxns failed."; + send(rpcError(rpcINTERNAL), true); + return; + } + + auto const& txns = dbResult->first; + marker = dbResult->second; + for (auto const& [tx, meta] : txns) + { + if (!tx || !meta) + { + JLOG(m_journal.debug()) + << "AccountHistory job for account " + << toBase58(accountId) << " empty tx or meta."; + send(rpcError(rpcINTERNAL), true); + return; + } + auto curTxLedger = + app_.getLedgerMaster().getLedgerBySeq( + tx->getLedger()); + if (!curTxLedger) + { + JLOG(m_journal.debug()) + << "AccountHistory job for account " + << toBase58(accountId) << " no ledger."; + send(rpcError(rpcINTERNAL), true); + return; + } + std::shared_ptr stTxn = + tx->getSTransaction(); + if (!stTxn) + { + JLOG(m_journal.debug()) + << "AccountHistory job for account " + << toBase58(accountId) + << " getSTransaction failed."; + send(rpcError(rpcINTERNAL), true); + return; + } + Json::Value jvTx = transJson( + *stTxn, meta->getResultTER(), true, curTxLedger); + jvTx[jss::meta] = meta->getJson(JsonOptions::none); + jvTx[jss::account_history_tx_index] = txHistoryIndex--; + RPC::insertDeliveredAmount( + jvTx[jss::meta], *curTxLedger, stTxn, *meta); + if (isFirstTx(tx, meta)) + { + jvTx[jss::account_history_tx_first] = true; + send(jvTx, false); + + JLOG(m_journal.trace()) + << "AccountHistory job for account " + << toBase58(accountId) + << " done, found last tx."; + return; + } + else + { + send(jvTx, false); + } + } + + if (marker) + { + JLOG(m_journal.trace()) + << "AccountHistory job for account " + << toBase58(accountId) + << " paging, marker=" << marker->ledgerSeq << ":" + << marker->txnSeq; + } + else + { + break; + } + } + + if (!subInfo.index_->stopHistorical_) + { + lastLedgerSeq = startLedgerSeq - 1; + if (lastLedgerSeq <= 1) + { + JLOG(m_journal.trace()) + << "AccountHistory job for account " + << toBase58(accountId) + << " done, reached genesis ledger."; + return; + } + } + } + }); +} + +void +NetworkOPsImp::subAccountHistoryStart( + std::shared_ptr const& ledger, + SubAccountHistoryInfoWeak& subInfo) +{ + subInfo.index_->separationLedgerSeq_ = ledger->seq(); + auto const& accountId = subInfo.index_->accountId_; + auto const accountKeylet = keylet::account(accountId); + if (!ledger->exists(accountKeylet)) + { + JLOG(m_journal.debug()) + << "subAccountHistoryStart, no account " << toBase58(accountId) + << ", no need to add AccountHistory job."; + return; + } + if (accountId == genesisAccountId) + { + if (auto const sleAcct = ledger->read(accountKeylet); sleAcct) + { + if (sleAcct->getFieldU32(sfSequence) == 1) + { + JLOG(m_journal.debug()) + << "subAccountHistoryStart, genesis account " + << toBase58(accountId) + << " does not have tx, no need to add AccountHistory job."; + return; + } + } + else + { + assert(false); + return; + } + } + subInfo.index_->historyLastLedgerSeq_ = ledger->seq(); + subInfo.index_->haveHistorical_ = true; + + JLOG(m_journal.debug()) + << "subAccountHistoryStart, add AccountHistory job: accountId=" + << toBase58(accountId) << ", currentLedgerSeq=" << ledger->seq(); + + addAccountHistoryJob(subInfo); +} + +error_code_i +NetworkOPsImp::subAccountHistory( + InfoSub::ref isrListener, + AccountID const& accountId) +{ + if (!isrListener->insertSubAccountHistory(accountId)) + { + JLOG(m_journal.debug()) + << "subAccountHistory, already subscribed to account " + << toBase58(accountId); + return rpcINVALID_PARAMS; + } + + std::lock_guard sl(mSubLock); + SubAccountHistoryInfoWeak ahi{ + isrListener, std::make_shared(accountId)}; + auto simIterator = mSubAccountHistory.find(accountId); + if (simIterator == mSubAccountHistory.end()) + { + hash_map inner; + inner.emplace(isrListener->getSeq(), ahi); + mSubAccountHistory.insert( + simIterator, std::make_pair(accountId, inner)); + } + else + { + simIterator->second.emplace(isrListener->getSeq(), ahi); + } + + auto const ledger = app_.getLedgerMaster().getValidatedLedger(); + if (ledger) + { + subAccountHistoryStart(ledger, ahi); + } + else + { + // The node does not have validated ledgers, so wait for + // one before start streaming. + // In this case, the subscription is also considered successful. + JLOG(m_journal.debug()) + << "subAccountHistory, no validated ledger yet, delay start"; + } + + return rpcSUCCESS; +} + +void +NetworkOPsImp::unsubAccountHistory( + InfoSub::ref isrListener, + AccountID const& account, + bool historyOnly) +{ + if (!historyOnly) + isrListener->deleteSubAccountHistory(account); + unsubAccountHistoryInternal(isrListener->getSeq(), account, historyOnly); +} + +void +NetworkOPsImp::unsubAccountHistoryInternal( + std::uint64_t seq, + const AccountID& account, + bool historyOnly) +{ + std::lock_guard sl(mSubLock); + auto simIterator = mSubAccountHistory.find(account); + if (simIterator != mSubAccountHistory.end()) + { + auto& subInfoMap = simIterator->second; + auto subInfoIter = subInfoMap.find(seq); + if (subInfoIter != subInfoMap.end()) + { + subInfoIter->second.index_->stopHistorical_ = true; + } + + if (!historyOnly) + { + simIterator->second.erase(seq); + if (simIterator->second.empty()) + { + mSubAccountHistory.erase(simIterator); + } + } + JLOG(m_journal.debug()) + << "unsubAccountHistory, account " << toBase58(account) + << ", historyOnly = " << (historyOnly ? "true" : "false"); + } +} + bool NetworkOPsImp::subBook(InfoSub::ref isrListener, Book const& book) { diff --git a/src/ripple/net/InfoSub.h b/src/ripple/net/InfoSub.h index c776e6f99..bc6460ea8 100644 --- a/src/ripple/net/InfoSub.h +++ b/src/ripple/net/InfoSub.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -82,6 +83,34 @@ public: hash_set const& vnaAccountIDs, bool realTime) = 0; + /** + * subscribe an account's new transactions and retrieve the account's + * historical transactions + * @return rpcSUCCESS if successful, otherwise an error code + */ + virtual error_code_i + subAccountHistory(ref ispListener, AccountID const& account) = 0; + + /** + * unsubscribe an account's transactions + * @param historyOnly if true, only stop historical transactions + * @note once a client receives enough historical transactions, + * it should unsubscribe with historyOnly == true to stop receiving + * more historical transactions. It will continue to receive new + * transactions. + */ + virtual void + unsubAccountHistory( + ref ispListener, + AccountID const& account, + bool historyOnly) = 0; + + virtual void + unsubAccountHistoryInternal( + std::uint64_t uListener, + AccountID const& account, + bool historyOnly) = 0; + // VFALCO TODO Document the bool return value virtual bool subLedger(ref ispListener, Json::Value& jvResult) = 0; @@ -168,6 +197,13 @@ public: void deleteSubAccountInfo(AccountID const& account, bool rt); + // return false if already subscribed to this account + bool + insertSubAccountHistory(AccountID const& account); + + void + deleteSubAccountHistory(AccountID const& account); + void clearPathRequest(); @@ -187,6 +223,7 @@ private: hash_set normalSubscriptions_; std::shared_ptr mPathRequest; std::uint64_t mSeq; + hash_set accountHistorySubscriptions_; static int assign_id() diff --git a/src/ripple/net/impl/InfoSub.cpp b/src/ripple/net/impl/InfoSub.cpp index 19cdb5364..26849f29c 100644 --- a/src/ripple/net/impl/InfoSub.cpp +++ b/src/ripple/net/impl/InfoSub.cpp @@ -60,6 +60,9 @@ InfoSub::~InfoSub() if (!normalSubscriptions_.empty()) m_source.unsubAccountInternal(mSeq, normalSubscriptions_, false); + + for (auto const& account : accountHistorySubscriptions_) + m_source.unsubAccountHistoryInternal(mSeq, account, false); } Resource::Consumer& @@ -101,6 +104,20 @@ InfoSub::deleteSubAccountInfo(AccountID const& account, bool rt) normalSubscriptions_.erase(account); } +bool +InfoSub::insertSubAccountHistory(AccountID const& account) +{ + std::lock_guard sl(mLock); + return accountHistorySubscriptions_.insert(account).second; +} + +void +InfoSub::deleteSubAccountHistory(AccountID const& account) +{ + std::lock_guard sl(mLock); + accountHistorySubscriptions_.erase(account); +} + void InfoSub::clearPathRequest() { diff --git a/src/ripple/protocol/jss.h b/src/ripple/protocol/jss.h index f1218474a..361de64ad 100644 --- a/src/ripple/protocol/jss.h +++ b/src/ripple/protocol/jss.h @@ -112,6 +112,9 @@ JSS(account_objects); // out: AccountObjects JSS(account_root); // in: LedgerEntry JSS(account_sequence_next); // out: SubmitTransaction JSS(account_sequence_available); // out: SubmitTransaction +JSS(account_history_tx_stream); // in: Subscribe, Unsubscribe +JSS(account_history_tx_index); // out: Account txn history subscribe +JSS(account_history_tx_first); // out: Account txn history subscribe JSS(accounts); // in: LedgerEntry, Subscribe, // handlers/Ledger, Unsubscribe JSS(accounts_proposed); // in: Subscribe, Unsubscribe @@ -518,29 +521,30 @@ JSS(source_tag); // out: AccountChannels JSS(stand_alone); // out: NetworkOPs JSS(start); // in: TxHistory JSS(started); -JSS(state); // out: Logic.h, ServerState, LedgerData -JSS(state_accounting); // out: NetworkOPs -JSS(state_now); // in: Subscribe -JSS(status); // error -JSS(stop); // in: LedgerCleaner -JSS(storedSeqs); // out: NodeToShardStatus -JSS(streams); // in: Subscribe, Unsubscribe -JSS(strict); // in: AccountCurrencies, AccountInfo -JSS(sub_index); // in: LedgerEntry -JSS(subcommand); // in: PathFind -JSS(success); // rpc -JSS(supported); // out: AmendmentTableImpl -JSS(system_time_offset); // out: NetworkOPs -JSS(tag); // out: Peers -JSS(taker); // in: Subscribe, BookOffers -JSS(taker_gets); // in: Subscribe, Unsubscribe, BookOffers -JSS(taker_gets_funded); // out: NetworkOPs -JSS(taker_pays); // in: Subscribe, Unsubscribe, BookOffers -JSS(taker_pays_funded); // out: NetworkOPs -JSS(threshold); // in: Blacklist -JSS(ticket); // in: AccountObjects -JSS(ticket_count); // out: AccountInfo -JSS(ticket_seq); // in: LedgerEntry +JSS(state); // out: Logic.h, ServerState, LedgerData +JSS(state_accounting); // out: NetworkOPs +JSS(state_now); // in: Subscribe +JSS(status); // error +JSS(stop); // in: LedgerCleaner +JSS(stop_history_tx_only); // in: Unsubscribe, stop history tx stream +JSS(storedSeqs); // out: NodeToShardStatus +JSS(streams); // in: Subscribe, Unsubscribe +JSS(strict); // in: AccountCurrencies, AccountInfo +JSS(sub_index); // in: LedgerEntry +JSS(subcommand); // in: PathFind +JSS(success); // rpc +JSS(supported); // out: AmendmentTableImpl +JSS(system_time_offset); // out: NetworkOPs +JSS(tag); // out: Peers +JSS(taker); // in: Subscribe, BookOffers +JSS(taker_gets); // in: Subscribe, Unsubscribe, BookOffers +JSS(taker_gets_funded); // out: NetworkOPs +JSS(taker_pays); // in: Subscribe, Unsubscribe, BookOffers +JSS(taker_pays_funded); // out: NetworkOPs +JSS(threshold); // in: Blacklist +JSS(ticket); // in: AccountObjects +JSS(ticket_count); // out: AccountInfo +JSS(ticket_seq); // in: LedgerEntry JSS(time); JSS(timeouts); // out: InboundLedger JSS(track); // out: PeerImp diff --git a/src/ripple/rpc/handlers/Subscribe.cpp b/src/ripple/rpc/handlers/Subscribe.cpp index 3cbea6ab6..a3c1d1e1c 100644 --- a/src/ripple/rpc/handlers/Subscribe.cpp +++ b/src/ripple/rpc/handlers/Subscribe.cpp @@ -201,6 +201,33 @@ doSubscribe(RPC::JsonContext& context) JLOG(context.j.debug()) << "doSubscribe: accounts: " << ids.size(); } + if (context.params.isMember(jss::account_history_tx_stream)) + { + if (!context.app.config().useTxTables()) + return rpcError(rpcNOT_ENABLED); + + context.loadType = Resource::feeMediumBurdenRPC; + auto const& req = context.params[jss::account_history_tx_stream]; + if (!req.isMember(jss::account) || !req[jss::account].isString()) + return rpcError(rpcINVALID_PARAMS); + + auto const id = parseBase58(req[jss::account].asString()); + if (!id) + return rpcError(rpcINVALID_PARAMS); + + if (auto result = context.netOps.subAccountHistory(ispSub, *id); + result != rpcSUCCESS) + { + return rpcError(result); + } + + jvResult[jss::warning] = + "account_history_tx_stream is an experimental feature and likely " + "to be removed in the future"; + JLOG(context.j.debug()) + << "doSubscribe: account_history_tx_stream: " << toBase58(*id); + } + if (context.params.isMember(jss::books)) { if (!context.params[jss::books].isArray()) diff --git a/src/ripple/rpc/handlers/Unsubscribe.cpp b/src/ripple/rpc/handlers/Unsubscribe.cpp index d8840b09e..8a606a26d 100644 --- a/src/ripple/rpc/handlers/Unsubscribe.cpp +++ b/src/ripple/rpc/handlers/Unsubscribe.cpp @@ -134,6 +134,30 @@ doUnsubscribe(RPC::JsonContext& context) context.netOps.unsubAccount(ispSub, ids, false); } + if (context.params.isMember(jss::account_history_tx_stream)) + { + auto const& req = context.params[jss::account_history_tx_stream]; + if (!req.isMember(jss::account) || !req[jss::account].isString()) + return rpcError(rpcINVALID_PARAMS); + + auto const id = parseBase58(req[jss::account].asString()); + if (!id) + return rpcError(rpcINVALID_PARAMS); + + bool stopHistoryOnly = false; + if (req.isMember(jss::stop_history_tx_only)) + { + if (!req[jss::stop_history_tx_only].isBool()) + return rpcError(rpcINVALID_PARAMS); + stopHistoryOnly = req[jss::stop_history_tx_only].asBool(); + } + context.netOps.unsubAccountHistory(ispSub, *id, stopHistoryOnly); + + JLOG(context.j.debug()) + << "doUnsubscribe: account_history_tx_stream: " << toBase58(*id) + << " stopHistoryOnly=" << (stopHistoryOnly ? "true" : "false"); + } + if (context.params.isMember(jss::books)) { if (!context.params[jss::books].isArray()) diff --git a/src/test/rpc/Subscribe_test.cpp b/src/test/rpc/Subscribe_test.cpp index 65a85ef25..1a0773f26 100644 --- a/src/test/rpc/Subscribe_test.cpp +++ b/src/test/rpc/Subscribe_test.cpp @@ -735,6 +735,408 @@ public: } } + void + testHistoryTxStream() + { + testcase("HistoryTxStream"); + + using namespace std::chrono_literals; + using namespace jtx; + using IdxHashVec = std::vector>; + + Account alice("alice"); + Account bob("bob"); + Account carol("carol"); + Account david("david"); + /////////////////////////////////////////////////////////////////// + + /* + * return true if the subscribe or unsubscribe result is a success + */ + auto goodSubRPC = [](Json::Value const& subReply) -> bool { + return subReply.isMember(jss::result) && + subReply[jss::result].isMember(jss::status) && + subReply[jss::result][jss::status] == jss::success; + }; + + /* + * try to receive txns from the tx stream subscription via the WSClient. + * return {true, true} if received numReplies replies and also + * received a tx with the account_history_tx_first == true + */ + auto getTxHash = [](WSClient& wsc, + IdxHashVec& v, + int numReplies) -> std::pair { + bool first_flag = false; + + for (int i = 0; i < numReplies; ++i) + { + std::uint32_t idx{0}; + auto reply = wsc.getMsg(100ms); + if (reply) + { + auto r = *reply; + if (r.isMember(jss::account_history_tx_index)) + idx = r[jss::account_history_tx_index].asInt(); + if (r.isMember(jss::account_history_tx_first)) + first_flag = true; + if (r.isMember(jss::transaction) && + r[jss::transaction].isMember(jss::hash)) + { + v.emplace_back( + idx, r[jss::transaction][jss::hash].asString()); + continue; + } + } + return {false, first_flag}; + } + + return {true, first_flag}; + }; + + /* + * send payments between the two accounts a and b, + * and close ledgersToClose ledgers + */ + auto sendPayments = [](Env& env, + Account const& a, + Account const& b, + int newTxns, + std::uint32_t ledgersToClose, + int numXRP = 10) { + env.memoize(a); + env.memoize(b); + for (int i = 0; i < newTxns; ++i) + { + auto& from = (i % 2 == 0) ? a : b; + auto& to = (i % 2 == 0) ? b : a; + env.apply( + pay(from, to, jtx::XRP(numXRP)), + jtx::seq(jtx::autofill), + jtx::fee(jtx::autofill), + jtx::sig(jtx::autofill)); + } + for (int i = 0; i < ledgersToClose; ++i) + env.close(); + return newTxns; + }; + + /* + * Check if txHistoryVec has every item of accountVec, + * and in the same order. + * If sizeCompare is false, txHistoryVec is allowed to be larger. + */ + auto hashCompare = [](IdxHashVec const& accountVec, + IdxHashVec const& txHistoryVec, + bool sizeCompare) -> bool { + if (accountVec.empty() || txHistoryVec.empty()) + return false; + if (sizeCompare && accountVec.size() != (txHistoryVec.size())) + return false; + + hash_map txHistoryMap; + for (auto const& tx : txHistoryVec) + { + txHistoryMap.emplace(tx.second, tx.first); + } + + auto getHistoryIndex = [&](std::size_t i) -> std::optional { + if (i >= accountVec.size()) + return {}; + auto it = txHistoryMap.find(accountVec[i].second); + if (it == txHistoryMap.end()) + return {}; + return it->second; + }; + + auto firstHistoryIndex = getHistoryIndex(0); + if (!firstHistoryIndex) + return false; + for (std::size_t i = 1; i < accountVec.size(); ++i) + { + if (auto idx = getHistoryIndex(i); + !idx || *idx != *firstHistoryIndex + i) + return false; + } + return true; + }; + + /////////////////////////////////////////////////////////////////// + + { + /* + * subscribe to an account twice with same WS client, + * the second should fail + * + * also test subscribe to the account before it is created + */ + Env env(*this); + auto wscTxHistory = makeWSClient(env.app().config()); + Json::Value request; + request[jss::account_history_tx_stream] = Json::objectValue; + request[jss::account_history_tx_stream][jss::account] = + alice.human(); + auto jv = wscTxHistory->invoke("subscribe", request); + if (!BEAST_EXPECT(goodSubRPC(jv))) + return; + jv = wscTxHistory->invoke("subscribe", request); + BEAST_EXPECT(!goodSubRPC(jv)); + + /* + * unsubscribe history only, future txns should still be streamed + */ + request[jss::account_history_tx_stream][jss::stop_history_tx_only] = + true; + jv = wscTxHistory->invoke("unsubscribe", request); + if (!BEAST_EXPECT(goodSubRPC(jv))) + return; + + sendPayments(env, env.master, alice, 1, 1, 123456); + + IdxHashVec vec; + auto r = getTxHash(*wscTxHistory, vec, 1); + if (!BEAST_EXPECT(r.first && r.second)) + return; + + /* + * unsubscribe, future txns should not be streamed + */ + request[jss::account_history_tx_stream][jss::stop_history_tx_only] = + false; + jv = wscTxHistory->invoke("unsubscribe", request); + BEAST_EXPECT(goodSubRPC(jv)); + + sendPayments(env, env.master, alice, 1, 1); + r = getTxHash(*wscTxHistory, vec, 1); + BEAST_EXPECT(!r.first); + } + + { + /* + * subscribe genesis account tx history without txns + * subscribe to bob's account after it is created + */ + Env env(*this); + auto wscTxHistory = makeWSClient(env.app().config()); + Json::Value request; + request[jss::account_history_tx_stream] = Json::objectValue; + request[jss::account_history_tx_stream][jss::account] = + "rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh"; + auto jv = wscTxHistory->invoke("subscribe", request); + if (!BEAST_EXPECT(goodSubRPC(jv))) + return; + IdxHashVec genesisFullHistoryVec; + if (!BEAST_EXPECT( + !getTxHash(*wscTxHistory, genesisFullHistoryVec, 1).first)) + return; + + /* + * create bob's account with one tx + * the two subscriptions should both stream it + */ + sendPayments(env, env.master, bob, 1, 1, 654321); + + auto r = getTxHash(*wscTxHistory, genesisFullHistoryVec, 1); + if (!BEAST_EXPECT(r.first && r.second)) + return; + + request[jss::account_history_tx_stream][jss::account] = bob.human(); + jv = wscTxHistory->invoke("subscribe", request); + if (!BEAST_EXPECT(goodSubRPC(jv))) + return; + IdxHashVec bobFullHistoryVec; + r = getTxHash(*wscTxHistory, bobFullHistoryVec, 1); + if (!BEAST_EXPECT(r.first && r.second)) + return; + BEAST_EXPECT( + bobFullHistoryVec.back().second == + genesisFullHistoryVec.back().second); + + /* + * unsubscribe to prepare next test + */ + jv = wscTxHistory->invoke("unsubscribe", request); + if (!BEAST_EXPECT(goodSubRPC(jv))) + return; + request[jss::account_history_tx_stream][jss::account] = + "rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh"; + jv = wscTxHistory->invoke("unsubscribe", request); + BEAST_EXPECT(goodSubRPC(jv)); + + /* + * add more txns, then subscribe bob tx history and + * genesis account tx history. Their earliest txns should match. + */ + sendPayments(env, env.master, bob, 30, 300); + wscTxHistory = makeWSClient(env.app().config()); + request[jss::account_history_tx_stream][jss::account] = bob.human(); + jv = wscTxHistory->invoke("subscribe", request); + + bobFullHistoryVec.clear(); + BEAST_EXPECT( + getTxHash(*wscTxHistory, bobFullHistoryVec, 31).second); + jv = wscTxHistory->invoke("unsubscribe", request); + + request[jss::account_history_tx_stream][jss::account] = + "rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh"; + jv = wscTxHistory->invoke("subscribe", request); + genesisFullHistoryVec.clear(); + BEAST_EXPECT( + getTxHash(*wscTxHistory, genesisFullHistoryVec, 31).second); + jv = wscTxHistory->invoke("unsubscribe", request); + + BEAST_EXPECT( + bobFullHistoryVec.back().second == + genesisFullHistoryVec.back().second); + } + + { + /* + * subscribe account and subscribe account tx history + * and compare txns streamed + */ + Env env(*this); + auto wscAccount = makeWSClient(env.app().config()); + auto wscTxHistory = makeWSClient(env.app().config()); + + std::array accounts = {alice, bob}; + env.fund(XRP(222222), accounts); + env.close(); + + // subscribe account + Json::Value stream = Json::objectValue; + stream[jss::accounts] = Json::arrayValue; + stream[jss::accounts].append(alice.human()); + auto jv = wscAccount->invoke("subscribe", stream); + + sendPayments(env, alice, bob, 5, 1); + sendPayments(env, alice, bob, 5, 1); + IdxHashVec accountVec; + if (!BEAST_EXPECT(getTxHash(*wscAccount, accountVec, 10).first)) + return; + + // subscribe account tx history + Json::Value request; + request[jss::account_history_tx_stream] = Json::objectValue; + request[jss::account_history_tx_stream][jss::account] = + alice.human(); + jv = wscTxHistory->invoke("subscribe", request); + + // compare historical txns + IdxHashVec txHistoryVec; + if (!BEAST_EXPECT(getTxHash(*wscTxHistory, txHistoryVec, 10).first)) + return; + if (!BEAST_EXPECT(hashCompare(accountVec, txHistoryVec, true))) + return; + + { + // take out all history txns from stream to prepare next test + IdxHashVec initFundTxns; + if (!BEAST_EXPECT( + getTxHash(*wscTxHistory, initFundTxns, 10).second)) + return; + } + + // compare future txns + sendPayments(env, alice, bob, 10, 1); + if (!BEAST_EXPECT(getTxHash(*wscAccount, accountVec, 10).first)) + return; + if (!BEAST_EXPECT(getTxHash(*wscTxHistory, txHistoryVec, 10).first)) + return; + if (!BEAST_EXPECT(hashCompare(accountVec, txHistoryVec, true))) + return; + wscTxHistory->invoke("unsubscribe", request); + wscAccount->invoke("unsubscribe", stream); + } + + { + /* + * alice issues USD to carol + * mix USD and XRP payments + */ + Env env(*this); + auto const USD_a = alice["USD"]; + + std::array accounts = {alice, carol}; + env.fund(XRP(333333), accounts); + env.trust(USD_a(20000), carol); + env.close(); + + auto mixedPayments = [&]() -> int { + sendPayments(env, alice, carol, 1, 0); + env(pay(alice, carol, USD_a(100))); + env.close(); + return 2; + }; + + // subscribe + Json::Value request; + request[jss::account_history_tx_stream] = Json::objectValue; + request[jss::account_history_tx_stream][jss::account] = + carol.human(); + auto ws = makeWSClient(env.app().config()); + auto jv = ws->invoke("subscribe", request); + { + // take out existing txns from the stream + IdxHashVec tempVec; + getTxHash(*ws, tempVec, 100); + } + + auto count = mixedPayments(); + IdxHashVec vec1; + if (!BEAST_EXPECT(getTxHash(*ws, vec1, count).first)) + return; + ws->invoke("unsubscribe", request); + } + + { + /* + * long transaction history + */ + Env env(*this); + std::array accounts = {alice, carol}; + env.fund(XRP(444444), accounts); + env.close(); + + // many payments, and close lots of ledgers + auto oneRound = [&](int numPayments) { + return sendPayments(env, alice, carol, numPayments, 300); + }; + + // subscribe + Json::Value request; + request[jss::account_history_tx_stream] = Json::objectValue; + request[jss::account_history_tx_stream][jss::account] = + carol.human(); + auto wscLong = makeWSClient(env.app().config()); + auto jv = wscLong->invoke("subscribe", request); + { + // take out existing txns from the stream + IdxHashVec tempVec; + getTxHash(*wscLong, tempVec, 100); + } + + // repeat the payments many rounds + for (int kk = 2; kk < 10; ++kk) + { + auto count = oneRound(kk); + IdxHashVec vec1; + if (!BEAST_EXPECT(getTxHash(*wscLong, vec1, count).first)) + return; + + // another subscribe, only for this round + auto wscShort = makeWSClient(env.app().config()); + auto jv = wscShort->invoke("subscribe", request); + IdxHashVec vec2; + if (!BEAST_EXPECT(getTxHash(*wscShort, vec2, count).first)) + return; + if (!BEAST_EXPECT(hashCompare(vec1, vec2, true))) + return; + wscShort->invoke("unsubscribe", request); + } + } + } + void run() override { @@ -746,6 +1148,7 @@ public: testSubErrors(true); testSubErrors(false); testSubByUrl(); + testHistoryTxStream(); } };