Merge remote-tracking branch 'ripple/develop' into develop

This commit is contained in:
Richard Holland
2022-05-20 08:05:08 +00:00
304 changed files with 21096 additions and 7004 deletions

View File

@@ -37,9 +37,8 @@
#include <ripple/app/misc/ValidatorKeys.h>
#include <ripple/app/misc/ValidatorList.h>
#include <ripple/app/misc/impl/AccountTxPaging.h>
#include <ripple/app/rdb/RelationalDBInterface.h>
#include <ripple/app/rdb/backend/RelationalDBInterfacePostgres.h>
#include <ripple/app/rdb/backend/RelationalDBInterfaceSqlite.h>
#include <ripple/app/rdb/backend/PostgresDatabase.h>
#include <ripple/app/rdb/backend/SQLiteDatabase.h>
#include <ripple/app/reporting/ReportingETL.h>
#include <ripple/app/tx/apply.h>
#include <ripple/app/hook/applyHook.h>
@@ -446,9 +445,9 @@ public:
pubLedger(std::shared_ptr<ReadView const> const& lpAccepted) override;
void
pubProposedTransaction(
std::shared_ptr<ReadView const> const& lpCurrent,
std::shared_ptr<STTx const> const& stTxn,
TER terResult) override;
std::shared_ptr<ReadView const> const& ledger,
std::shared_ptr<STTx const> const& transaction,
TER result) override;
void
pubValidation(std::shared_ptr<STValidation> const& val) override;
@@ -613,20 +612,26 @@ private:
Json::Value
transJson(
const STTx& stTxn,
TER terResult,
bool bValidated,
std::shared_ptr<ReadView const> const& lpCurrent);
const STTx& transaction,
TER result,
bool validated,
std::shared_ptr<ReadView const> const& ledger);
void
pubValidatedTransaction(
std::shared_ptr<ReadView const> const& alAccepted,
const AcceptedLedgerTx& alTransaction);
std::shared_ptr<ReadView const> const& ledger,
AcceptedLedgerTx const& transaction);
void
pubAccountTransaction(
std::shared_ptr<ReadView const> const& lpCurrent,
const AcceptedLedgerTx& alTransaction,
bool isAccepted);
std::shared_ptr<ReadView const> const& ledger,
AcceptedLedgerTx const& transaction);
void
pubProposedAccountTransaction(
std::shared_ptr<ReadView const> const& ledger,
std::shared_ptr<STTx const> const& transaction,
TER result);
void
pubServer();
@@ -1200,6 +1205,7 @@ NetworkOPsImp::processTransaction(
if ((newFlags & SF_BAD) != 0)
{
// cached bad
JLOG(m_journal.warn()) << transaction->getID() << ": cached bad!\n";
transaction->setStatus(INVALID);
transaction->setResult(temBAD_SIGNATURE);
return;
@@ -1798,7 +1804,7 @@ NetworkOPsImp::switchLastClosedLedger(
auto const lastVal = app_.getLedgerMaster().getValidatedLedger();
std::optional<Rules> rules;
if (lastVal)
rules.emplace(*lastVal, app_.config().features);
rules = makeRulesGivenLedger(*lastVal, app_.config().features);
else
rules.emplace(app_.config().features);
app_.openLedger().accept(
@@ -2699,11 +2705,11 @@ NetworkOPsImp::getLedgerFetchInfo()
void
NetworkOPsImp::pubProposedTransaction(
std::shared_ptr<ReadView const> const& lpCurrent,
std::shared_ptr<STTx const> const& stTxn,
TER terResult)
std::shared_ptr<ReadView const> const& ledger,
std::shared_ptr<STTx const> const& transaction,
TER result)
{
Json::Value jvObj = transJson(*stTxn, terResult, false, lpCurrent);
Json::Value jvObj = transJson(*transaction, result, false, ledger);
{
std::lock_guard sl(mSubLock);
@@ -2724,10 +2730,8 @@ NetworkOPsImp::pubProposedTransaction(
}
}
}
AcceptedLedgerTx alt(
lpCurrent, stTxn, terResult, app_.accountIDCache(), app_.logs());
JLOG(m_journal.trace()) << "pubProposed: " << alt.getJson();
pubAccountTransaction(lpCurrent, alt, false);
pubProposedAccountTransaction(ledger, transaction, result);
}
void
@@ -2902,9 +2906,13 @@ NetworkOPsImp::pubLedger(std::shared_ptr<ReadView const> const& lpAccepted)
lpAccepted->info().hash, alpAccepted);
}
assert(alpAccepted->getLedger().get() == lpAccepted.get());
{
JLOG(m_journal.debug())
<< "Publishing ledger = " << lpAccepted->info().seq;
<< "Publishing ledger " << lpAccepted->info().seq << " "
<< lpAccepted->info().hash;
std::lock_guard sl(mSubLock);
if (!mStreamMaps[sLedger].empty())
@@ -2924,7 +2932,7 @@ NetworkOPsImp::pubLedger(std::shared_ptr<ReadView const> const& lpAccepted)
jvObj[jss::reserve_inc] =
lpAccepted->fees().increment.jsonClipped();
jvObj[jss::txn_count] = Json::UInt(alpAccepted->getTxnCount());
jvObj[jss::txn_count] = Json::UInt(alpAccepted->size());
if (mMode >= OperatingMode::SYNCING)
{
@@ -2938,10 +2946,6 @@ NetworkOPsImp::pubLedger(std::shared_ptr<ReadView const> const& lpAccepted)
InfoSub::pointer p = it->second.lock();
if (p)
{
JLOG(m_journal.debug())
<< "Publishing ledger = " << lpAccepted->info().seq
<< " : consumer = " << p->getConsumer()
<< " : obj = " << jvObj;
p->send(jvObj, true);
++it;
}
@@ -2973,9 +2977,8 @@ NetworkOPsImp::pubLedger(std::shared_ptr<ReadView const> const& lpAccepted)
}
// Don't lock since pubAcceptedTransaction is locking.
for (auto const& [_, accTx] : alpAccepted->getMap())
for (auto const& accTx : *alpAccepted)
{
(void)_;
JLOG(m_journal.trace()) << "pubAccepted: " << accTx->getJson();
pubValidatedTransaction(lpAccepted, *accTx);
}
@@ -3025,26 +3028,26 @@ NetworkOPsImp::getLocalTxCount()
// transactions.
Json::Value
NetworkOPsImp::transJson(
const STTx& stTxn,
TER terResult,
bool bValidated,
std::shared_ptr<ReadView const> const& lpCurrent)
const STTx& transaction,
TER result,
bool validated,
std::shared_ptr<ReadView const> const& ledger)
{
Json::Value jvObj(Json::objectValue);
std::string sToken;
std::string sHuman;
transResultInfo(terResult, sToken, sHuman);
transResultInfo(result, sToken, sHuman);
jvObj[jss::type] = "transaction";
jvObj[jss::transaction] = stTxn.getJson(JsonOptions::none);
jvObj[jss::transaction] = transaction.getJson(JsonOptions::none);
if (bValidated)
if (validated)
{
jvObj[jss::ledger_index] = lpCurrent->info().seq;
jvObj[jss::ledger_hash] = to_string(lpCurrent->info().hash);
jvObj[jss::ledger_index] = ledger->info().seq;
jvObj[jss::ledger_hash] = to_string(ledger->info().hash);
jvObj[jss::transaction][jss::date] =
lpCurrent->info().closeTime.time_since_epoch().count();
ledger->info().closeTime.time_since_epoch().count();
jvObj[jss::validated] = true;
// WRITEME: Put the account next seq here
@@ -3052,24 +3055,24 @@ NetworkOPsImp::transJson(
else
{
jvObj[jss::validated] = false;
jvObj[jss::ledger_current_index] = lpCurrent->info().seq;
jvObj[jss::ledger_current_index] = ledger->info().seq;
}
jvObj[jss::status] = bValidated ? "closed" : "proposed";
jvObj[jss::status] = validated ? "closed" : "proposed";
jvObj[jss::engine_result] = sToken;
jvObj[jss::engine_result_code] = terResult;
jvObj[jss::engine_result_code] = result;
jvObj[jss::engine_result_message] = sHuman;
if (stTxn.getTxnType() == ttOFFER_CREATE)
if (transaction.getTxnType() == ttOFFER_CREATE)
{
auto const account = stTxn.getAccountID(sfAccount);
auto const amount = stTxn.getFieldAmount(sfTakerGets);
auto const account = transaction.getAccountID(sfAccount);
auto const amount = transaction.getFieldAmount(sfTakerGets);
// If the offer create is not self funded then add the owner balance
if (account != amount.issue().account)
{
auto const ownerFunds = accountFunds(
*lpCurrent,
*ledger,
account,
amount,
fhIGNORE_FREEZE,
@@ -3083,17 +3086,18 @@ NetworkOPsImp::transJson(
void
NetworkOPsImp::pubValidatedTransaction(
std::shared_ptr<ReadView const> const& alAccepted,
const AcceptedLedgerTx& alTx)
std::shared_ptr<ReadView const> const& ledger,
const AcceptedLedgerTx& transaction)
{
std::shared_ptr<STTx const> stTxn = alTx.getTxn();
Json::Value jvObj = transJson(*stTxn, alTx.getResult(), true, alAccepted);
auto const& stTxn = transaction.getTxn();
Json::Value jvObj =
transJson(*stTxn, transaction.getResult(), true, ledger);
if (auto const txMeta = alTx.getMeta())
{
jvObj[jss::meta] = txMeta->getJson(JsonOptions::none);
RPC::insertDeliveredAmount(
jvObj[jss::meta], *alAccepted, stTxn, *txMeta);
auto const& meta = transaction.getMeta();
jvObj[jss::meta] = meta.getJson(JsonOptions::none);
RPC::insertDeliveredAmount(jvObj[jss::meta], *ledger, stTxn, meta);
}
{
@@ -3128,32 +3132,31 @@ NetworkOPsImp::pubValidatedTransaction(
it = mStreamMaps[sRTTransactions].erase(it);
}
}
app_.getOrderBookDB().processTxn(alAccepted, alTx, jvObj);
pubAccountTransaction(alAccepted, alTx, true);
if (transaction.getResult() == tesSUCCESS)
app_.getOrderBookDB().processTxn(ledger, transaction, jvObj);
pubAccountTransaction(ledger, transaction);
}
void
NetworkOPsImp::pubAccountTransaction(
std::shared_ptr<ReadView const> const& lpCurrent,
const AcceptedLedgerTx& alTx,
bool bAccepted)
std::shared_ptr<ReadView const> const& ledger,
AcceptedLedgerTx const& transaction)
{
hash_set<InfoSub::pointer> notify;
int iProposed = 0;
int iAccepted = 0;
std::vector<SubAccountHistoryInfo> accountHistoryNotify;
auto const currLedgerSeq = lpCurrent->seq();
auto const currLedgerSeq = ledger->seq();
{
std::lock_guard sl(mSubLock);
if (!bAccepted && mSubRTAccount.empty())
return;
if (!mSubAccount.empty() || (!mSubRTAccount.empty()) ||
if (!mSubAccount.empty() || !mSubRTAccount.empty() ||
!mSubAccountHistory.empty())
{
for (auto const& affectedAccount : alTx.getAffected())
for (auto const& affectedAccount : transaction.getAffected())
{
if (auto simiIt = mSubRTAccount.find(affectedAccount);
simiIt != mSubRTAccount.end())
@@ -3175,80 +3178,140 @@ NetworkOPsImp::pubAccountTransaction(
}
}
if (bAccepted)
if (auto simiIt = mSubAccount.find(affectedAccount);
simiIt != mSubAccount.end())
{
if (auto simiIt = mSubAccount.find(affectedAccount);
simiIt != mSubAccount.end())
auto it = simiIt->second.begin();
while (it != simiIt->second.end())
{
auto it = simiIt->second.begin();
while (it != simiIt->second.end())
{
InfoSub::pointer p = it->second.lock();
InfoSub::pointer p = it->second.lock();
if (p)
{
notify.insert(p);
++it;
++iAccepted;
}
else
it = simiIt->second.erase(it);
if (p)
{
notify.insert(p);
++it;
++iAccepted;
}
else
it = simiIt->second.erase(it);
}
}
if (auto histoIt = mSubAccountHistory.find(affectedAccount);
histoIt != mSubAccountHistory.end())
{
auto& subs = histoIt->second;
auto it = subs.begin();
while (it != subs.end())
{
SubAccountHistoryInfoWeak const& info = it->second;
if (currLedgerSeq <= info.index_->separationLedgerSeq_)
{
++it;
continue;
}
if (auto isSptr = info.sinkWptr_.lock(); isSptr)
{
accountHistoryNotify.emplace_back(
SubAccountHistoryInfo{isSptr, info.index_});
++it;
}
else
{
it = subs.erase(it);
}
}
if (auto histoIt = mSubAccountHistory.find(affectedAccount);
histoIt != mSubAccountHistory.end())
{
auto& subs = histoIt->second;
auto it = subs.begin();
while (it != subs.end())
{
SubAccountHistoryInfoWeak const& info = it->second;
if (currLedgerSeq <=
info.index_->separationLedgerSeq_)
{
++it;
continue;
}
if (auto isSptr = info.sinkWptr_.lock(); isSptr)
{
accountHistoryNotify.emplace_back(
SubAccountHistoryInfo{isSptr, info.index_});
++it;
}
else
{
it = subs.erase(it);
}
}
if (subs.empty())
mSubAccountHistory.erase(histoIt);
}
if (subs.empty())
mSubAccountHistory.erase(histoIt);
}
}
}
}
JLOG(m_journal.trace())
<< "pubAccountTransaction:"
<< " iProposed=" << iProposed << " iAccepted=" << iAccepted;
<< "pubAccountTransaction: "
<< "proposed=" << iProposed << ", accepted=" << iAccepted;
if (!notify.empty() || !accountHistoryNotify.empty())
{
std::shared_ptr<STTx const> stTxn = alTx.getTxn();
Json::Value jvObj =
transJson(*stTxn, alTx.getResult(), bAccepted, lpCurrent);
auto const& stTxn = transaction.getTxn();
Json::Value jvObj =
transJson(*stTxn, transaction.getResult(), true, ledger);
if (alTx.isApplied())
{
if (auto const txMeta = alTx.getMeta())
auto const& meta = transaction.getMeta();
jvObj[jss::meta] = meta.getJson(JsonOptions::none);
RPC::insertDeliveredAmount(jvObj[jss::meta], *ledger, stTxn, meta);
}
for (InfoSub::ref isrListener : notify)
isrListener->send(jvObj, true);
assert(!jvObj.isMember(jss::account_history_tx_stream));
for (auto& info : accountHistoryNotify)
{
auto& index = info.index_;
if (index->forwardTxIndex_ == 0 && !index->haveHistorical_)
jvObj[jss::account_history_tx_first] = true;
jvObj[jss::account_history_tx_index] = index->forwardTxIndex_++;
info.sink_->send(jvObj, true);
}
}
}
void
NetworkOPsImp::pubProposedAccountTransaction(
std::shared_ptr<ReadView const> const& ledger,
std::shared_ptr<STTx const> const& tx,
TER result)
{
hash_set<InfoSub::pointer> notify;
int iProposed = 0;
std::vector<SubAccountHistoryInfo> accountHistoryNotify;
{
std::lock_guard sl(mSubLock);
if (mSubRTAccount.empty())
return;
if (!mSubAccount.empty() || !mSubRTAccount.empty() ||
!mSubAccountHistory.empty())
{
for (auto const& affectedAccount : tx->getMentionedAccounts())
{
jvObj[jss::meta] = txMeta->getJson(JsonOptions::none);
RPC::insertDeliveredAmount(
jvObj[jss::meta], *lpCurrent, stTxn, *txMeta);
if (auto simiIt = mSubRTAccount.find(affectedAccount);
simiIt != mSubRTAccount.end())
{
auto it = simiIt->second.begin();
while (it != simiIt->second.end())
{
InfoSub::pointer p = it->second.lock();
if (p)
{
notify.insert(p);
++it;
++iProposed;
}
else
it = simiIt->second.erase(it);
}
}
}
}
}
JLOG(m_journal.trace()) << "pubProposedAccountTransaction: " << iProposed;
if (!notify.empty() || !accountHistoryNotify.empty())
{
Json::Value jvObj = transJson(*tx, result, false, ledger);
for (InfoSub::ref isrListener : notify)
isrListener->send(jvObj, true);
@@ -3358,8 +3421,9 @@ NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
#ifdef RIPPLED_REPORTING
if (app_.config().reporting())
{
if (dynamic_cast<RelationalDBInterfacePostgres*>(
&app_.getRelationalDBInterface()))
// Use a dynamic_cast to return DatabaseType::None
// on failure.
if (dynamic_cast<PostgresDatabase*>(&app_.getRelationalDatabase()))
{
return DatabaseType::Postgres;
}
@@ -3367,16 +3431,18 @@ NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
}
else
{
if (dynamic_cast<RelationalDBInterfaceSqlite*>(
&app_.getRelationalDBInterface()))
// Use a dynamic_cast to return DatabaseType::None
// on failure.
if (dynamic_cast<SQLiteDatabase*>(&app_.getRelationalDatabase()))
{
return DatabaseType::Sqlite;
}
return DatabaseType::None;
}
#else
if (dynamic_cast<RelationalDBInterfaceSqlite*>(
&app_.getRelationalDBInterface()))
// Use a dynamic_cast to return DatabaseType::None
// on failure.
if (dynamic_cast<SQLiteDatabase*>(&app_.getRelationalDatabase()))
{
return DatabaseType::Sqlite;
}
@@ -3462,17 +3528,16 @@ NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
auto getMoreTxns =
[&](std::uint32_t minLedger,
std::uint32_t maxLedger,
std::optional<RelationalDBInterface::AccountTxMarker>
marker)
std::optional<RelationalDatabase::AccountTxMarker> marker)
-> std::optional<std::pair<
RelationalDBInterface::AccountTxs,
std::optional<RelationalDBInterface::AccountTxMarker>>> {
RelationalDatabase::AccountTxs,
std::optional<RelationalDatabase::AccountTxMarker>>> {
switch (dbType)
{
case Postgres: {
auto db = static_cast<RelationalDBInterfacePostgres*>(
&app_.getRelationalDBInterface());
RelationalDBInterface::AccountTxArgs args;
auto db = static_cast<PostgresDatabase*>(
&app_.getRelationalDatabase());
RelationalDatabase::AccountTxArgs args;
args.account = accountId;
LedgerRange range{minLedger, maxLedger};
args.ledger = range;
@@ -3488,7 +3553,7 @@ NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
}
if (auto txns =
std::get_if<RelationalDBInterface::AccountTxs>(
std::get_if<RelationalDatabase::AccountTxs>(
&txResult.transactions);
txns)
{
@@ -3504,9 +3569,9 @@ NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
}
}
case Sqlite: {
auto db = static_cast<RelationalDBInterfaceSqlite*>(
&app_.getRelationalDBInterface());
RelationalDBInterface::AccountTxPageOptions options{
auto db = static_cast<SQLiteDatabase*>(
&app_.getRelationalDatabase());
RelationalDatabase::AccountTxPageOptions options{
accountId, minLedger, maxLedger, marker, 0, true};
return db->newestAccountTxPage(options);
}
@@ -3567,7 +3632,7 @@ NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
return;
}
std::optional<RelationalDBInterface::AccountTxMarker> marker{};
std::optional<RelationalDatabase::AccountTxMarker> marker{};
while (!subInfo.index_->stopHistorical_)
{
auto dbResult =