20#include <xrpld/app/consensus/RCLConsensus.h>
21#include <xrpld/app/consensus/RCLValidations.h>
22#include <xrpld/app/ledger/AcceptedLedger.h>
23#include <xrpld/app/ledger/InboundLedgers.h>
24#include <xrpld/app/ledger/LedgerMaster.h>
25#include <xrpld/app/ledger/LedgerToJson.h>
26#include <xrpld/app/ledger/LocalTxs.h>
27#include <xrpld/app/ledger/OpenLedger.h>
28#include <xrpld/app/ledger/OrderBookDB.h>
29#include <xrpld/app/ledger/TransactionMaster.h>
30#include <xrpld/app/main/LoadManager.h>
31#include <xrpld/app/main/Tuning.h>
32#include <xrpld/app/misc/AmendmentTable.h>
33#include <xrpld/app/misc/DeliverMax.h>
34#include <xrpld/app/misc/HashRouter.h>
35#include <xrpld/app/misc/LoadFeeTrack.h>
36#include <xrpld/app/misc/NetworkOPs.h>
37#include <xrpld/app/misc/Transaction.h>
38#include <xrpld/app/misc/TxQ.h>
39#include <xrpld/app/misc/ValidatorKeys.h>
40#include <xrpld/app/misc/ValidatorList.h>
41#include <xrpld/app/misc/detail/AccountTxPaging.h>
42#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
43#include <xrpld/app/tx/apply.h>
44#include <xrpld/consensus/Consensus.h>
45#include <xrpld/consensus/ConsensusParms.h>
46#include <xrpld/overlay/Cluster.h>
47#include <xrpld/overlay/Overlay.h>
48#include <xrpld/overlay/predicates.h>
49#include <xrpld/perflog/PerfLog.h>
50#include <xrpld/rpc/BookChanges.h>
51#include <xrpld/rpc/CTID.h>
52#include <xrpld/rpc/DeliveredAmount.h>
53#include <xrpld/rpc/MPTokenIssuanceID.h>
54#include <xrpld/rpc/ServerHandler.h>
56#include <xrpl/basics/UptimeClock.h>
57#include <xrpl/basics/mulDiv.h>
58#include <xrpl/basics/safe_cast.h>
59#include <xrpl/basics/scope.h>
60#include <xrpl/beast/utility/rngfill.h>
61#include <xrpl/crypto/RFC1751.h>
62#include <xrpl/crypto/csprng.h>
63#include <xrpl/protocol/BuildInfo.h>
64#include <xrpl/protocol/Feature.h>
65#include <xrpl/protocol/MultiApiJson.h>
66#include <xrpl/protocol/NFTSyntheticSerializer.h>
67#include <xrpl/protocol/RPCErr.h>
68#include <xrpl/protocol/TxFlags.h>
69#include <xrpl/protocol/jss.h>
70#include <xrpl/resource/Fees.h>
71#include <xrpl/resource/ResourceManager.h>
73#include <boost/asio/ip/host_name.hpp>
74#include <boost/asio/steady_timer.hpp>
113 "ripple::NetworkOPsImp::TransactionStatus::TransactionStatus : "
156 std::chrono::steady_clock::time_point
start_ =
217 return !(*
this != b);
236 boost::asio::io_context& io_svc,
250 app_.logs().journal(
"FeeVote")),
253 app.getInboundTransactions(),
254 beast::get_abstract_clock<
std::chrono::steady_clock>(),
256 app_.logs().journal(
"LedgerConsensus"))
258 validatorKeys.keys ? validatorKeys.keys->publicKey
261 validatorKeys.keys ? validatorKeys.keys->masterPublicKey
456 getServerInfo(
bool human,
bool admin,
bool counters)
override;
483 TER result)
override;
517 bool historyOnly)
override;
523 bool historyOnly)
override;
595 catch (boost::system::system_error
const& e)
598 <<
"NetworkOPs: heartbeatTimer cancel error: " << e.what();
605 catch (boost::system::system_error
const& e)
608 <<
"NetworkOPs: clusterTimer cancel error: " << e.what();
615 catch (boost::system::system_error
const& e)
618 <<
"NetworkOPs: accountHistoryTxTimer cancel error: "
623 using namespace std::chrono_literals;
633 boost::asio::steady_timer& timer,
816 template <
class Handler>
818 Handler
const& handler,
820 :
hook(collector->make_hook(handler))
823 "Disconnected_duration"))
826 "Connected_duration"))
828 collector->make_gauge(
"State_Accounting",
"Syncing_duration"))
831 "Tracking_duration"))
833 collector->make_gauge(
"State_Accounting",
"Full_duration"))
836 "Disconnected_transitions"))
839 "Connected_transitions"))
842 "Syncing_transitions"))
845 "Tracking_transitions"))
847 collector->make_gauge(
"State_Accounting",
"Full_transitions"))
876 {
"disconnected",
"connected",
"syncing",
"tracking",
"full"}};
938 static std::string const hostname = boost::asio::ip::host_name();
945 static std::string const shroudedHostId = [
this]() {
951 return shroudedHostId;
966 boost::asio::steady_timer& timer,
973 [
this, onExpire, onError](boost::system::error_code
const& e) {
974 if ((e.value() == boost::system::errc::success) &&
975 (!m_job_queue.isStopped()))
980 if (e.value() != boost::system::errc::success &&
981 e.value() != boost::asio::error::operation_aborted)
984 JLOG(m_journal.error())
985 <<
"Timer got error '" << e.message()
986 <<
"'. Restarting timer.";
991 timer.expires_after(expiry_time);
992 timer.async_wait(std::move(*optionalCountedHandler));
997NetworkOPsImp::setHeartbeatTimer()
1001 mConsensus.parms().ledgerGRANULARITY,
1003 m_job_queue.addJob(jtNETOP_TIMER,
"NetOPs.heartbeat", [this]() {
1004 processHeartbeatTimer();
1007 [
this]() { setHeartbeatTimer(); });
1011NetworkOPsImp::setClusterTimer()
1013 using namespace std::chrono_literals;
1020 processClusterTimer();
1023 [
this]() { setClusterTimer(); });
1029 JLOG(m_journal.debug()) <<
"Scheduling AccountHistory job for account "
1031 using namespace std::chrono_literals;
1033 accountHistoryTxTimer_,
1035 [
this, subInfo]() { addAccountHistoryJob(subInfo); },
1036 [
this, subInfo]() { setAccountHistoryJobTimer(subInfo); });
1040NetworkOPsImp::processHeartbeatTimer()
1043 "Heartbeat Timer", mConsensus.validating(), m_journal);
1051 std::size_t const numPeers = app_.overlay().size();
1054 if (numPeers < minPeerCount_)
1056 if (mMode != OperatingMode::DISCONNECTED)
1058 setMode(OperatingMode::DISCONNECTED);
1060 ss <<
"Node count (" << numPeers <<
") has fallen "
1061 <<
"below required minimum (" << minPeerCount_ <<
").";
1062 JLOG(m_journal.warn()) << ss.
str();
1063 CLOG(clog.
ss()) <<
"set mode to DISCONNECTED: " << ss.
str();
1068 <<
"already DISCONNECTED. too few peers (" << numPeers
1069 <<
"), need at least " << minPeerCount_;
1076 setHeartbeatTimer();
1081 if (mMode == OperatingMode::DISCONNECTED)
1083 setMode(OperatingMode::CONNECTED);
1084 JLOG(m_journal.info())
1085 <<
"Node count (" << numPeers <<
") is sufficient.";
1086 CLOG(clog.
ss()) <<
"setting mode to CONNECTED based on " << numPeers
1092 auto origMode = mMode.load();
1093 CLOG(clog.
ss()) <<
"mode: " << strOperatingMode(origMode,
true);
1094 if (mMode == OperatingMode::SYNCING)
1095 setMode(OperatingMode::SYNCING);
1096 else if (mMode == OperatingMode::CONNECTED)
1097 setMode(OperatingMode::CONNECTED);
1098 auto newMode = mMode.load();
1099 if (origMode != newMode)
1102 <<
", changing to " << strOperatingMode(newMode,
true);
1104 CLOG(clog.
ss()) <<
". ";
1107 mConsensus.timerEntry(app_.timeKeeper().closeTime(), clog.
ss());
1109 CLOG(clog.
ss()) <<
"consensus phase " << to_string(mLastConsensusPhase);
1111 if (mLastConsensusPhase != currPhase)
1113 reportConsensusStateChange(currPhase);
1114 mLastConsensusPhase = currPhase;
1115 CLOG(clog.
ss()) <<
" changed to " << to_string(mLastConsensusPhase);
1117 CLOG(clog.
ss()) <<
". ";
1119 setHeartbeatTimer();
1123NetworkOPsImp::processClusterTimer()
1125 if (app_.cluster().size() == 0)
1128 using namespace std::chrono_literals;
1130 bool const update = app_.cluster().update(
1131 app_.nodeIdentity().first,
1133 (m_ledgerMaster.getValidatedLedgerAge() <= 4min)
1134 ? app_.getFeeTrack().getLocalFee()
1136 app_.timeKeeper().now());
1140 JLOG(m_journal.debug()) <<
"Too soon to send cluster update";
1145 protocol::TMCluster cluster;
1146 app_.cluster().for_each([&cluster](
ClusterNode const& node) {
1147 protocol::TMClusterNode& n = *cluster.add_clusternodes();
1152 n.set_nodename(node.
name());
1156 for (
auto& item : gossip.
items)
1158 protocol::TMLoadSource& node = *cluster.add_loadsources();
1159 node.set_name(to_string(item.address));
1160 node.set_cost(item.balance);
1162 app_.overlay().foreach(
send_if(
1174 if (mode == OperatingMode::FULL && admin)
1176 auto const consensusMode = mConsensus.mode();
1177 if (consensusMode != ConsensusMode::wrongLedger)
1179 if (consensusMode == ConsensusMode::proposing)
1182 if (mConsensus.validating())
1183 return "validating";
1193 if (isNeedNetworkLedger())
1201 m_ledgerMaster.getValidatedRules().enabled(featureBatch))
1203 JLOG(m_journal.error())
1204 <<
"Submitted transaction invalid: tfInnerBatchTxn flag present.";
1211 auto const txid = trans->getTransactionID();
1212 auto const flags = app_.getHashRouter().getFlags(txid);
1214 if ((flags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
1216 JLOG(m_journal.warn()) <<
"Submitted transaction cached bad";
1223 app_.getHashRouter(),
1225 m_ledgerMaster.getValidatedRules(),
1228 if (validity != Validity::Valid)
1230 JLOG(m_journal.warn())
1231 <<
"Submitted transaction invalid: " << reason;
1237 JLOG(m_journal.warn())
1238 <<
"Exception checking transaction " << txid <<
": " << ex.
what();
1247 m_job_queue.addJob(
jtTRANSACTION,
"submitTxn", [
this, tx]() {
1249 processTransaction(t,
false,
false, FailHard::no);
1256 auto const newFlags = app_.getHashRouter().getFlags(transaction->getID());
1258 if ((newFlags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
1261 JLOG(m_journal.warn()) << transaction->getID() <<
": cached bad!\n";
1262 transaction->setStatus(
INVALID);
1267 auto const view = m_ledgerMaster.getCurrentLedger();
1272 auto const sttx = *transaction->getSTransaction();
1273 if (sttx.isFlag(
tfInnerBatchTxn) && view->rules().enabled(featureBatch))
1275 transaction->setStatus(
INVALID);
1277 app_.getHashRouter().setFlags(
1278 transaction->getID(), HashRouterFlags::BAD);
1285 auto const [validity, reason] =
1286 checkValidity(app_.getHashRouter(), sttx, view->rules(), app_.config());
1288 validity == Validity::Valid,
1289 "ripple::NetworkOPsImp::processTransaction : valid validity");
1292 if (validity == Validity::SigBad)
1294 JLOG(m_journal.info()) <<
"Transaction has bad signature: " << reason;
1295 transaction->setStatus(
INVALID);
1297 app_.getHashRouter().setFlags(
1298 transaction->getID(), HashRouterFlags::BAD);
1303 app_.getMasterTransaction().canonicalize(&transaction);
1309NetworkOPsImp::processTransaction(
1315 auto ev = m_job_queue.makeLoadEvent(
jtTXN_PROC,
"ProcessTXN");
1318 if (!preProcessTransaction(transaction))
1322 doTransactionSync(transaction, bUnlimited, failType);
1324 doTransactionAsync(transaction, bUnlimited, failType);
1328NetworkOPsImp::doTransactionAsync(
1335 if (transaction->getApplying())
1338 mTransactions.push_back(
1340 transaction->setApplying();
1342 if (mDispatchState == DispatchState::none)
1344 if (m_job_queue.addJob(
1345 jtBATCH,
"transactionBatch", [
this]() { transactionBatch(); }))
1347 mDispatchState = DispatchState::scheduled;
1353NetworkOPsImp::doTransactionSync(
1360 if (!transaction->getApplying())
1362 mTransactions.push_back(
1364 transaction->setApplying();
1367 doTransactionSyncBatch(
1369 return transaction->getApplying();
1374NetworkOPsImp::doTransactionSyncBatch(
1380 if (mDispatchState == DispatchState::running)
1389 if (mTransactions.size())
1392 if (m_job_queue.addJob(
jtBATCH,
"transactionBatch", [
this]() {
1396 mDispatchState = DispatchState::scheduled;
1400 }
while (retryCallback(lock));
1406 auto ev = m_job_queue.makeLoadEvent(
jtTXN_PROC,
"ProcessTXNSet");
1409 for (
auto const& [_, tx] :
set)
1414 if (transaction->getStatus() ==
INVALID)
1416 if (!reason.
empty())
1418 JLOG(m_journal.trace())
1419 <<
"Exception checking transaction: " << reason;
1421 app_.getHashRouter().setFlags(
1422 tx->getTransactionID(), HashRouterFlags::BAD);
1427 if (!preProcessTransaction(transaction))
1438 for (
auto& transaction : candidates)
1440 if (!transaction->getApplying())
1442 transactions.
emplace_back(transaction,
false,
false, FailHard::no);
1443 transaction->setApplying();
1447 if (mTransactions.empty())
1448 mTransactions.swap(transactions);
1451 mTransactions.reserve(mTransactions.size() + transactions.
size());
1452 for (
auto& t : transactions)
1453 mTransactions.push_back(std::move(t));
1455 if (mTransactions.empty())
1457 JLOG(m_journal.debug()) <<
"No transaction to process!";
1464 "ripple::NetworkOPsImp::processTransactionSet has lock");
1466 mTransactions.begin(), mTransactions.end(), [](
auto const& t) {
1467 return t.transaction->getApplying();
1473NetworkOPsImp::transactionBatch()
1477 if (mDispatchState == DispatchState::running)
1480 while (mTransactions.size())
1491 mTransactions.
swap(transactions);
1493 !transactions.
empty(),
1494 "ripple::NetworkOPsImp::apply : non-empty transactions");
1496 mDispatchState != DispatchState::running,
1497 "ripple::NetworkOPsImp::apply : is not running");
1499 mDispatchState = DispatchState::running;
1505 bool changed =
false;
1519 if (e.failType == FailHard::yes)
1522 auto const result = app_.getTxQ().apply(
1523 app_, view, e.transaction->getSTransaction(), flags, j);
1524 e.result = result.ter;
1525 e.applied = result.applied;
1526 changed = changed || result.applied;
1535 if (
auto const l = m_ledgerMaster.getValidatedLedger())
1536 validatedLedgerIndex = l->info().seq;
1538 auto newOL = app_.openLedger().current();
1541 e.transaction->clearSubmitResult();
1545 pubProposedTransaction(
1546 newOL, e.transaction->getSTransaction(), e.result);
1547 e.transaction->setApplied();
1550 e.transaction->setResult(e.result);
1553 app_.getHashRouter().setFlags(
1554 e.transaction->getID(), HashRouterFlags::BAD);
1563 JLOG(m_journal.info())
1564 <<
"TransactionResult: " << token <<
": " << human;
1569 bool addLocal = e.local;
1573 JLOG(m_journal.debug())
1574 <<
"Transaction is now included in open ledger";
1575 e.transaction->setStatus(
INCLUDED);
1580 auto const& txCur = e.transaction->getSTransaction();
1583 for (
auto txNext = m_ledgerMaster.popAcctTransaction(txCur);
1585 txNext = m_ledgerMaster.popAcctTransaction(txCur), ++count)
1592 if (t->getApplying())
1594 submit_held.
emplace_back(t,
false,
false, FailHard::no);
1603 JLOG(m_journal.info()) <<
"Transaction is obsolete";
1604 e.transaction->setStatus(
OBSOLETE);
1608 JLOG(m_journal.debug())
1609 <<
"Transaction is likely to claim a"
1610 <<
" fee, but is queued until fee drops";
1612 e.transaction->setStatus(
HELD);
1616 m_ledgerMaster.addHeldTransaction(e.transaction);
1617 e.transaction->setQueued();
1618 e.transaction->setKept();
1624 if (e.failType != FailHard::yes)
1626 auto const lastLedgerSeq =
1627 e.transaction->getSTransaction()->at(
1628 ~sfLastLedgerSequence);
1629 auto const ledgersLeft = lastLedgerSeq
1631 m_ledgerMaster.getCurrentLedgerIndex()
1650 (ledgersLeft && ledgersLeft <= LocalTxs::holdLedgers) ||
1651 app_.getHashRouter().setFlags(
1652 e.transaction->getID(), HashRouterFlags::HELD))
1655 JLOG(m_journal.debug())
1656 <<
"Transaction should be held: " << e.result;
1657 e.transaction->setStatus(
HELD);
1658 m_ledgerMaster.addHeldTransaction(e.transaction);
1659 e.transaction->setKept();
1662 JLOG(m_journal.debug())
1663 <<
"Not holding transaction "
1664 << e.transaction->getID() <<
": "
1665 << (e.local ?
"local" :
"network") <<
", "
1666 <<
"result: " << e.result <<
" ledgers left: "
1667 << (ledgersLeft ? to_string(*ledgersLeft)
1673 JLOG(m_journal.debug())
1674 <<
"Status other than success " << e.result;
1675 e.transaction->setStatus(
INVALID);
1678 auto const enforceFailHard =
1679 e.failType == FailHard::yes && !
isTesSuccess(e.result);
1681 if (addLocal && !enforceFailHard)
1683 m_localTX->push_back(
1684 m_ledgerMaster.getCurrentLedgerIndex(),
1685 e.transaction->getSTransaction());
1686 e.transaction->setKept();
1690 ((mMode != OperatingMode::FULL) &&
1691 (e.failType != FailHard::yes) && e.local) ||
1696 app_.getHashRouter().shouldRelay(e.transaction->getID());
1697 if (
auto const sttx = *(e.transaction->getSTransaction());
1702 newOL->rules().enabled(featureBatch)))
1704 protocol::TMTransaction tx;
1708 tx.set_rawtransaction(s.
data(), s.
size());
1709 tx.set_status(protocol::tsCURRENT);
1710 tx.set_receivetimestamp(
1711 app_.timeKeeper().now().time_since_epoch().count());
1714 app_.overlay().relay(e.transaction->getID(), tx, *toSkip);
1715 e.transaction->setBroadcast();
1719 if (validatedLedgerIndex)
1721 auto [fee, accountSeq, availableSeq] =
1722 app_.getTxQ().getTxRequiredFeeAndSeq(
1723 *newOL, e.transaction->getSTransaction());
1724 e.transaction->setCurrentLedgerState(
1725 *validatedLedgerIndex, fee, accountSeq, availableSeq);
1733 e.transaction->clearApplying();
1735 if (!submit_held.
empty())
1737 if (mTransactions.empty())
1738 mTransactions.swap(submit_held);
1741 mTransactions.reserve(mTransactions.size() + submit_held.
size());
1742 for (
auto& e : submit_held)
1743 mTransactions.push_back(std::move(e));
1749 mDispatchState = DispatchState::none;
1757NetworkOPsImp::getOwnerInfo(
1762 auto root = keylet::ownerDir(account);
1763 auto sleNode = lpLedger->read(keylet::page(
root));
1770 for (
auto const& uDirEntry : sleNode->getFieldV256(sfIndexes))
1772 auto sleCur = lpLedger->read(keylet::child(uDirEntry));
1775 "ripple::NetworkOPsImp::getOwnerInfo : non-null child SLE");
1777 switch (sleCur->getType())
1780 if (!jvObjects.
isMember(jss::offers))
1781 jvObjects[jss::offers] =
1784 jvObjects[jss::offers].
append(
1785 sleCur->getJson(JsonOptions::none));
1788 case ltRIPPLE_STATE:
1789 if (!jvObjects.
isMember(jss::ripple_lines))
1791 jvObjects[jss::ripple_lines] =
1795 jvObjects[jss::ripple_lines].
append(
1796 sleCur->getJson(JsonOptions::none));
1799 case ltACCOUNT_ROOT:
1804 "ripple::NetworkOPsImp::getOwnerInfo : invalid "
1811 uNodeDir = sleNode->getFieldU64(sfIndexNext);
1815 sleNode = lpLedger->read(keylet::page(
root, uNodeDir));
1818 "ripple::NetworkOPsImp::getOwnerInfo : read next page");
1831NetworkOPsImp::isBlocked()
1833 return isAmendmentBlocked() || isUNLBlocked();
1837NetworkOPsImp::isAmendmentBlocked()
1839 return amendmentBlocked_;
1843NetworkOPsImp::setAmendmentBlocked()
1845 amendmentBlocked_ =
true;
1846 setMode(OperatingMode::CONNECTED);
1850NetworkOPsImp::isAmendmentWarned()
1852 return !amendmentBlocked_ && amendmentWarned_;
1856NetworkOPsImp::setAmendmentWarned()
1858 amendmentWarned_ =
true;
1862NetworkOPsImp::clearAmendmentWarned()
1864 amendmentWarned_ =
false;
1868NetworkOPsImp::isUNLBlocked()
1874NetworkOPsImp::setUNLBlocked()
1877 setMode(OperatingMode::CONNECTED);
1881NetworkOPsImp::clearUNLBlocked()
1883 unlBlocked_ =
false;
1887NetworkOPsImp::checkLastClosedLedger(
1896 JLOG(m_journal.trace()) <<
"NetworkOPsImp::checkLastClosedLedger";
1898 auto const ourClosed = m_ledgerMaster.getClosedLedger();
1903 uint256 closedLedger = ourClosed->info().hash;
1904 uint256 prevClosedLedger = ourClosed->info().parentHash;
1905 JLOG(m_journal.trace()) <<
"OurClosed: " << closedLedger;
1906 JLOG(m_journal.trace()) <<
"PrevClosed: " << prevClosedLedger;
1911 auto& validations = app_.getValidations();
1912 JLOG(m_journal.debug())
1913 <<
"ValidationTrie " <<
Json::Compact(validations.getJsonTrie());
1917 peerCounts[closedLedger] = 0;
1918 if (mMode >= OperatingMode::TRACKING)
1919 peerCounts[closedLedger]++;
1921 for (
auto& peer : peerList)
1923 uint256 peerLedger = peer->getClosedLedgerHash();
1926 ++peerCounts[peerLedger];
1929 for (
auto const& it : peerCounts)
1930 JLOG(m_journal.debug()) <<
"L: " << it.first <<
" n=" << it.second;
1932 uint256 preferredLCL = validations.getPreferredLCL(
1934 m_ledgerMaster.getValidLedgerIndex(),
1937 bool switchLedgers = preferredLCL != closedLedger;
1939 closedLedger = preferredLCL;
1941 if (switchLedgers && (closedLedger == prevClosedLedger))
1944 JLOG(m_journal.info()) <<
"We won't switch to our own previous ledger";
1945 networkClosed = ourClosed->info().hash;
1946 switchLedgers =
false;
1949 networkClosed = closedLedger;
1954 auto consensus = m_ledgerMaster.getLedgerByHash(closedLedger);
1957 consensus = app_.getInboundLedgers().acquire(
1958 closedLedger, 0, InboundLedger::Reason::CONSENSUS);
1961 (!m_ledgerMaster.canBeCurrent(consensus) ||
1962 !m_ledgerMaster.isCompatible(
1963 *consensus, m_journal.debug(),
"Not switching")))
1967 networkClosed = ourClosed->info().hash;
1971 JLOG(m_journal.warn()) <<
"We are not running on the consensus ledger";
1972 JLOG(m_journal.info()) <<
"Our LCL: " << ourClosed->info().hash
1974 JLOG(m_journal.info()) <<
"Net LCL " << closedLedger;
1976 if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
1978 setMode(OperatingMode::CONNECTED);
1986 switchLastClosedLedger(consensus);
1993NetworkOPsImp::switchLastClosedLedger(
1997 JLOG(m_journal.error())
1998 <<
"JUMP last closed ledger to " << newLCL->info().hash;
2000 clearNeedNetworkLedger();
2003 app_.getTxQ().processClosedLedger(app_, *newLCL,
true);
2010 auto retries = m_localTX->getTxSet();
2011 auto const lastVal = app_.getLedgerMaster().getValidatedLedger();
2016 rules.
emplace(app_.config().features);
2017 app_.openLedger().accept(
2028 return app_.getTxQ().accept(app_, view);
2032 m_ledgerMaster.switchLCL(newLCL);
2034 protocol::TMStatusChange s;
2035 s.set_newevent(protocol::neSWITCHED_LEDGER);
2036 s.set_ledgerseq(newLCL->info().seq);
2037 s.set_networktime(app_.timeKeeper().now().time_since_epoch().count());
2038 s.set_ledgerhashprevious(
2039 newLCL->info().parentHash.begin(), newLCL->info().parentHash.size());
2040 s.set_ledgerhash(newLCL->info().hash.begin(), newLCL->info().hash.size());
2042 app_.overlay().foreach(
2047NetworkOPsImp::beginConsensus(
2053 "ripple::NetworkOPsImp::beginConsensus : nonzero input");
2055 auto closingInfo = m_ledgerMaster.getCurrentLedger()->info();
2057 JLOG(m_journal.info()) <<
"Consensus time for #" << closingInfo.seq
2058 <<
" with LCL " << closingInfo.parentHash;
2060 auto prevLedger = m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);
2065 if (mMode == OperatingMode::FULL)
2067 JLOG(m_journal.warn()) <<
"Don't have LCL, going to tracking";
2068 setMode(OperatingMode::TRACKING);
2069 CLOG(clog) <<
"beginConsensus Don't have LCL, going to tracking. ";
2072 CLOG(clog) <<
"beginConsensus no previous ledger. ";
2077 prevLedger->info().hash == closingInfo.parentHash,
2078 "ripple::NetworkOPsImp::beginConsensus : prevLedger hash matches "
2081 closingInfo.parentHash == m_ledgerMaster.getClosedLedger()->info().hash,
2082 "ripple::NetworkOPsImp::beginConsensus : closedLedger parent matches "
2085 if (prevLedger->rules().enabled(featureNegativeUNL))
2086 app_.validators().setNegativeUNL(prevLedger->negativeUNL());
2087 TrustChanges const changes = app_.validators().updateTrusted(
2088 app_.getValidations().getCurrentNodeIDs(),
2089 closingInfo.parentCloseTime,
2092 app_.getHashRouter());
2094 if (!changes.
added.empty() || !changes.
removed.empty())
2096 app_.getValidations().trustChanged(changes.
added, changes.
removed);
2098 app_.getAmendmentTable().trustChanged(
2099 app_.validators().getQuorumKeys().second);
2102 mConsensus.startRound(
2103 app_.timeKeeper().closeTime(),
2111 if (mLastConsensusPhase != currPhase)
2113 reportConsensusStateChange(currPhase);
2114 mLastConsensusPhase = currPhase;
2117 JLOG(m_journal.debug()) <<
"Initiating consensus engine";
2124 auto const& peerKey = peerPos.
publicKey();
2125 if (validatorPK_ == peerKey || validatorMasterPK_ == peerKey)
2136 JLOG(m_journal.error())
2137 <<
"Received a proposal signed by MY KEY from a peer. This may "
2138 "indicate a misconfiguration where another node has the same "
2139 "validator key, or may be caused by unusual message routing and "
2144 return mConsensus.peerProposal(app_.timeKeeper().closeTime(), peerPos);
2155 protocol::TMHaveTransactionSet msg;
2156 msg.set_hash(map->getHash().as_uint256().begin(), 256 / 8);
2157 msg.set_status(protocol::tsHAVE);
2158 app_.overlay().foreach(
2163 mConsensus.gotTxSet(app_.timeKeeper().closeTime(),
RCLTxSet{map});
2169 uint256 deadLedger = m_ledgerMaster.getClosedLedger()->info().parentHash;
2171 for (
auto const& it : app_.overlay().getActivePeers())
2173 if (it && (it->getClosedLedgerHash() == deadLedger))
2175 JLOG(m_journal.trace()) <<
"Killing obsolete peer status";
2182 checkLastClosedLedger(app_.overlay().getActivePeers(), networkClosed);
2184 if (networkClosed.
isZero())
2186 CLOG(clog) <<
"endConsensus last closed ledger is zero. ";
2196 if (((mMode == OperatingMode::CONNECTED) ||
2197 (mMode == OperatingMode::SYNCING)) &&
2203 if (!needNetworkLedger_)
2204 setMode(OperatingMode::TRACKING);
2207 if (((mMode == OperatingMode::CONNECTED) ||
2208 (mMode == OperatingMode::TRACKING)) &&
2214 auto current = m_ledgerMaster.getCurrentLedger();
2215 if (app_.timeKeeper().now() < (
current->info().parentCloseTime +
2216 2 *
current->info().closeTimeResolution))
2218 setMode(OperatingMode::FULL);
2222 beginConsensus(networkClosed, clog);
2226NetworkOPsImp::consensusViewChange()
2228 if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING))
2230 setMode(OperatingMode::CONNECTED);
2240 if (!mStreamMaps[sManifests].empty())
2244 jvObj[jss::type] =
"manifestReceived";
2247 jvObj[jss::signing_key] =
2251 jvObj[jss::signature] =
strHex(*sig);
2254 jvObj[jss::domain] = mo.
domain;
2257 for (
auto i = mStreamMaps[sManifests].begin();
2258 i != mStreamMaps[sManifests].end();)
2260 if (
auto p = i->second.lock())
2262 p->send(jvObj,
true);
2267 i = mStreamMaps[sManifests].erase(i);
2273NetworkOPsImp::ServerFeeSummary::ServerFeeSummary(
2277 : loadFactorServer{loadFeeTrack.getLoadFactor()}
2278 , loadBaseServer{loadFeeTrack.getLoadBase()}
2280 , em{
std::move(escalationMetrics)}
2290 em.has_value() != b.
em.has_value())
2296 em->minProcessingFeeLevel != b.
em->minProcessingFeeLevel ||
2297 em->openLedgerFeeLevel != b.
em->openLedgerFeeLevel ||
2298 em->referenceFeeLevel != b.
em->referenceFeeLevel);
2331 jvObj[jss::type] =
"serverStatus";
2333 jvObj[jss::load_base] = f.loadBaseServer;
2334 jvObj[jss::load_factor_server] = f.loadFactorServer;
2335 jvObj[jss::base_fee] = f.baseFee.jsonClipped();
2340 safe_cast<std::uint64_t>(f.loadFactorServer),
2342 f.em->openLedgerFeeLevel,
2344 f.em->referenceFeeLevel)
2347 jvObj[jss::load_factor] =
trunc32(loadFactor);
2348 jvObj[jss::load_factor_fee_escalation] =
2349 f.em->openLedgerFeeLevel.jsonClipped();
2350 jvObj[jss::load_factor_fee_queue] =
2351 f.em->minProcessingFeeLevel.jsonClipped();
2352 jvObj[jss::load_factor_fee_reference] =
2353 f.em->referenceFeeLevel.jsonClipped();
2356 jvObj[jss::load_factor] = f.loadFactorServer;
2370 p->send(jvObj,
true);
2387 if (!streamMap.empty())
2390 jvObj[jss::type] =
"consensusPhase";
2391 jvObj[jss::consensus] =
to_string(phase);
2393 for (
auto i = streamMap.begin(); i != streamMap.end();)
2395 if (
auto p = i->second.lock())
2397 p->send(jvObj,
true);
2402 i = streamMap.erase(i);
2418 auto const signerPublic = val->getSignerPublic();
2420 jvObj[jss::type] =
"validationReceived";
2421 jvObj[jss::validation_public_key] =
2423 jvObj[jss::ledger_hash] =
to_string(val->getLedgerHash());
2424 jvObj[jss::signature] =
strHex(val->getSignature());
2425 jvObj[jss::full] = val->isFull();
2426 jvObj[jss::flags] = val->getFlags();
2427 jvObj[jss::signing_time] = *(*val)[~sfSigningTime];
2428 jvObj[jss::data] =
strHex(val->getSerializer().slice());
2431 if (
auto version = (*val)[~sfServerVersion])
2434 if (
auto cookie = (*val)[~sfCookie])
2437 if (
auto hash = (*val)[~sfValidatedHash])
2438 jvObj[jss::validated_hash] =
strHex(*hash);
2440 auto const masterKey =
2443 if (masterKey != signerPublic)
2448 if (
auto const seq = (*val)[~sfLedgerSequence])
2449 jvObj[jss::ledger_index] = *seq;
2451 if (val->isFieldPresent(sfAmendments))
2454 for (
auto const& amendment : val->getFieldV256(sfAmendments))
2455 jvObj[jss::amendments].append(
to_string(amendment));
2458 if (
auto const closeTime = (*val)[~sfCloseTime])
2459 jvObj[jss::close_time] = *closeTime;
2461 if (
auto const loadFee = (*val)[~sfLoadFee])
2462 jvObj[jss::load_fee] = *loadFee;
2464 if (
auto const baseFee = val->at(~sfBaseFee))
2465 jvObj[jss::base_fee] =
static_cast<double>(*baseFee);
2467 if (
auto const reserveBase = val->at(~sfReserveBase))
2468 jvObj[jss::reserve_base] = *reserveBase;
2470 if (
auto const reserveInc = val->at(~sfReserveIncrement))
2471 jvObj[jss::reserve_inc] = *reserveInc;
2475 if (
auto const baseFeeXRP = ~val->at(~sfBaseFeeDrops);
2476 baseFeeXRP && baseFeeXRP->native())
2477 jvObj[jss::base_fee] = baseFeeXRP->xrp().jsonClipped();
2479 if (
auto const reserveBaseXRP = ~val->at(~sfReserveBaseDrops);
2480 reserveBaseXRP && reserveBaseXRP->native())
2481 jvObj[jss::reserve_base] = reserveBaseXRP->xrp().jsonClipped();
2483 if (
auto const reserveIncXRP = ~val->at(~sfReserveIncrementDrops);
2484 reserveIncXRP && reserveIncXRP->native())
2485 jvObj[jss::reserve_inc] = reserveIncXRP->xrp().jsonClipped();
2494 if (jvTx.
isMember(jss::ledger_index))
2496 jvTx[jss::ledger_index] =
2504 if (
auto p = i->second.lock())
2508 [&](
Json::Value const& jv) { p->send(jv,
true); });
2528 jvObj[jss::type] =
"peerStatusChange";
2537 p->send(jvObj,
true);
2551 using namespace std::chrono_literals;
2583 <<
"recvValidation " << val->getLedgerHash() <<
" from " << source;
2599 <<
"Exception thrown for handling new validation "
2600 << val->getLedgerHash() <<
": " << e.
what();
2605 <<
"Unknown exception thrown for handling new validation "
2606 << val->getLedgerHash();
2618 ss <<
"VALIDATION: " << val->render() <<
" master_key: ";
2655 "This server is amendment blocked, and must be updated to be "
2656 "able to stay in sync with the network.";
2663 "This server has an expired validator list. validators.txt "
2664 "may be incorrectly configured or some [validator_list_sites] "
2665 "may be unreachable.";
2672 "One or more unsupported amendments have reached majority. "
2673 "Upgrade to the latest version before they are activated "
2674 "to avoid being amendment blocked.";
2675 if (
auto const expected =
2679 d[jss::expected_date] = expected->time_since_epoch().count();
2680 d[jss::expected_date_UTC] =
to_string(*expected);
2684 if (warnings.size())
2685 info[jss::warnings] = std::move(warnings);
2700 info[jss::time] =
to_string(std::chrono::floor<std::chrono::microseconds>(
2704 info[jss::network_ledger] =
"waiting";
2706 info[jss::validation_quorum] =
2714 info[jss::node_size] =
"tiny";
2717 info[jss::node_size] =
"small";
2720 info[jss::node_size] =
"medium";
2723 info[jss::node_size] =
"large";
2726 info[jss::node_size] =
"huge";
2735 info[jss::validator_list_expires] =
2736 safe_cast<Json::UInt>(when->time_since_epoch().count());
2738 info[jss::validator_list_expires] = 0;
2748 if (*when == TimeKeeper::time_point::max())
2750 x[jss::expiration] =
"never";
2751 x[jss::status] =
"active";
2758 x[jss::status] =
"active";
2760 x[jss::status] =
"expired";
2765 x[jss::status] =
"unknown";
2766 x[jss::expiration] =
"unknown";
2770#if defined(GIT_COMMIT_HASH) || defined(GIT_BRANCH)
2773#ifdef GIT_COMMIT_HASH
2774 x[jss::hash] = GIT_COMMIT_HASH;
2777 x[jss::branch] = GIT_BRANCH;
2782 info[jss::io_latency_ms] =
2790 info[jss::pubkey_validator] =
2795 info[jss::pubkey_validator] =
"none";
2805 info[jss::counters][jss::nodestore] = nodestore;
2809 info[jss::pubkey_node] =
2815 info[jss::amendment_blocked] =
true;
2829 lastClose[jss::converge_time_s] =
2834 lastClose[jss::converge_time] =
2838 info[jss::last_close] = lastClose;
2846 info[jss::network_id] =
static_cast<Json::UInt>(*netid);
2848 auto const escalationMetrics =
2856 auto const loadFactorFeeEscalation =
2858 escalationMetrics.openLedgerFeeLevel,
2860 escalationMetrics.referenceFeeLevel)
2864 safe_cast<std::uint64_t>(loadFactorServer), loadFactorFeeEscalation);
2868 info[jss::load_base] = loadBaseServer;
2869 info[jss::load_factor] =
trunc32(loadFactor);
2870 info[jss::load_factor_server] = loadFactorServer;
2877 info[jss::load_factor_fee_escalation] =
2878 escalationMetrics.openLedgerFeeLevel.jsonClipped();
2879 info[jss::load_factor_fee_queue] =
2880 escalationMetrics.minProcessingFeeLevel.jsonClipped();
2881 info[jss::load_factor_fee_reference] =
2882 escalationMetrics.referenceFeeLevel.jsonClipped();
2886 info[jss::load_factor] =
2887 static_cast<double>(loadFactor) / loadBaseServer;
2889 if (loadFactorServer != loadFactor)
2890 info[jss::load_factor_server] =
2891 static_cast<double>(loadFactorServer) / loadBaseServer;
2896 if (fee != loadBaseServer)
2897 info[jss::load_factor_local] =
2898 static_cast<double>(fee) / loadBaseServer;
2900 if (fee != loadBaseServer)
2901 info[jss::load_factor_net] =
2902 static_cast<double>(fee) / loadBaseServer;
2904 if (fee != loadBaseServer)
2905 info[jss::load_factor_cluster] =
2906 static_cast<double>(fee) / loadBaseServer;
2908 if (escalationMetrics.openLedgerFeeLevel !=
2909 escalationMetrics.referenceFeeLevel &&
2910 (admin || loadFactorFeeEscalation != loadFactor))
2911 info[jss::load_factor_fee_escalation] =
2912 escalationMetrics.openLedgerFeeLevel.decimalFromReference(
2913 escalationMetrics.referenceFeeLevel);
2914 if (escalationMetrics.minProcessingFeeLevel !=
2915 escalationMetrics.referenceFeeLevel)
2916 info[jss::load_factor_fee_queue] =
2917 escalationMetrics.minProcessingFeeLevel.decimalFromReference(
2918 escalationMetrics.referenceFeeLevel);
2931 XRPAmount const baseFee = lpClosed->fees().base;
2933 l[jss::seq] =
Json::UInt(lpClosed->info().seq);
2934 l[jss::hash] =
to_string(lpClosed->info().hash);
2939 l[jss::reserve_base] = lpClosed->fees().reserve.jsonClipped();
2940 l[jss::reserve_inc] = lpClosed->fees().increment.jsonClipped();
2942 lpClosed->info().closeTime.time_since_epoch().count());
2947 l[jss::reserve_base_xrp] = lpClosed->fees().reserve.decimalXRP();
2948 l[jss::reserve_inc_xrp] = lpClosed->fees().increment.decimalXRP();
2951 std::abs(closeOffset.count()) >= 60)
2952 l[jss::close_time_offset] =
2960 Json::UInt(age < highAgeThreshold ? age.count() : 0);
2964 auto lCloseTime = lpClosed->info().closeTime;
2966 if (lCloseTime <= closeTime)
2968 using namespace std::chrono_literals;
2969 auto age = closeTime - lCloseTime;
2971 Json::UInt(age < highAgeThreshold ? age.count() : 0);
2977 info[jss::validated_ledger] = l;
2979 info[jss::closed_ledger] = l;
2983 info[jss::published_ledger] =
"none";
2984 else if (lpPublished->info().seq != lpClosed->info().seq)
2985 info[jss::published_ledger] = lpPublished->info().seq;
2990 info[jss::jq_trans_overflow] =
2992 info[jss::peer_disconnects] =
2994 info[jss::peer_disconnects_resources] =
2999 "http",
"https",
"peer",
"ws",
"ws2",
"wss",
"wss2"};
3007 !(port.admin_nets_v4.empty() && port.admin_nets_v6.empty() &&
3008 port.admin_user.empty() && port.admin_password.empty()))
3022 for (
auto const& p : proto)
3023 jv[jss::protocol].append(p);
3030 auto const optPort = grpcSection.
get(
"port");
3031 if (optPort && grpcSection.get(
"ip"))
3034 jv[jss::port] = *optPort;
3036 jv[jss::protocol].
append(
"grpc");
3039 info[jss::ports] = std::move(ports);
3065 ledger->rules().enabled(featureBatch))
3083 [&](
Json::Value const& jv) { p->send(jv, true); });
3108 lpAccepted->info().hash, alpAccepted);
3112 alpAccepted->getLedger().
get() == lpAccepted.
get(),
3113 "ripple::NetworkOPsImp::pubLedger : accepted input");
3117 <<
"Publishing ledger " << lpAccepted->info().seq <<
" "
3118 << lpAccepted->info().hash;
3126 jvObj[jss::type] =
"ledgerClosed";
3127 jvObj[jss::ledger_index] = lpAccepted->info().seq;
3128 jvObj[jss::ledger_hash] =
to_string(lpAccepted->info().hash);
3130 lpAccepted->info().closeTime.time_since_epoch().count());
3134 if (!lpAccepted->rules().enabled(featureXRPFees))
3136 jvObj[jss::fee_base] = lpAccepted->fees().base.jsonClipped();
3137 jvObj[jss::reserve_base] = lpAccepted->fees().reserve.jsonClipped();
3138 jvObj[jss::reserve_inc] =
3139 lpAccepted->fees().increment.jsonClipped();
3141 jvObj[jss::txn_count] =
Json::UInt(alpAccepted->size());
3145 jvObj[jss::validated_ledgers] =
3155 p->send(jvObj,
true);
3173 p->send(jvObj,
true);
3182 static bool firstTime =
true;
3189 for (
auto& inner : outer.second)
3191 auto& subInfo = inner.second;
3192 if (subInfo.index_->separationLedgerSeq_ == 0)
3195 alpAccepted->getLedger(), subInfo);
3204 for (
auto const& accTx : *alpAccepted)
3208 lpAccepted, *accTx, accTx == *(--alpAccepted->end()));
3235 "reportConsensusStateChange->pubConsensus",
3266 jvObj[jss::type] =
"transaction";
3270 jvObj[jss::transaction] =
3277 jvObj[jss::meta], *ledger, transaction, meta->
get());
3280 jvObj[jss::meta], transaction, meta->
get());
3284 if (
auto const& lookup = ledger->txRead(transaction->getTransactionID());
3285 lookup.second && lookup.second->isFieldPresent(sfTransactionIndex))
3287 uint32_t
const txnSeq = lookup.second->getFieldU32(sfTransactionIndex);
3289 if (transaction->isFieldPresent(sfNetworkID))
3290 netID = transaction->getFieldU32(sfNetworkID);
3295 jvObj[jss::ctid] = *ctid;
3297 if (!ledger->open())
3298 jvObj[jss::ledger_hash] =
to_string(ledger->info().hash);
3302 jvObj[jss::ledger_index] = ledger->info().seq;
3303 jvObj[jss::transaction][jss::date] =
3304 ledger->info().closeTime.time_since_epoch().count();
3305 jvObj[jss::validated] =
true;
3306 jvObj[jss::close_time_iso] =
to_string_iso(ledger->info().closeTime);
3312 jvObj[jss::validated] =
false;
3313 jvObj[jss::ledger_current_index] = ledger->info().seq;
3316 jvObj[jss::status] = validated ?
"closed" :
"proposed";
3317 jvObj[jss::engine_result] = sToken;
3318 jvObj[jss::engine_result_code] = result;
3319 jvObj[jss::engine_result_message] = sHuman;
3321 if (transaction->getTxnType() == ttOFFER_CREATE)
3323 auto const account = transaction->getAccountID(sfAccount);
3324 auto const amount = transaction->getFieldAmount(sfTakerGets);
3327 if (account != amount.issue().account)
3335 jvObj[jss::transaction][jss::owner_funds] = ownerFunds.getText();
3343 [&]<
unsigned Version>(
3345 RPC::insertDeliverMax(
3346 jvTx[jss::transaction], transaction->getTxnType(), Version);
3348 if constexpr (Version > 1)
3350 jvTx[jss::tx_json] = jvTx.removeMember(jss::transaction);
3351 jvTx[jss::hash] = hash;
3355 jvTx[jss::transaction][jss::hash] = hash;
3368 auto const& stTxn = transaction.
getTxn();
3372 auto const trResult = transaction.
getResult();
3387 [&](
Json::Value const& jv) { p->send(jv, true); });
3404 [&](
Json::Value const& jv) { p->send(jv, true); });
3429 auto const currLedgerSeq = ledger->seq();
3436 for (
auto const& affectedAccount : transaction.
getAffected())
3441 auto it = simiIt->second.begin();
3443 while (it != simiIt->second.end())
3454 it = simiIt->second.erase(it);
3461 auto it = simiIt->second.begin();
3462 while (it != simiIt->second.end())
3473 it = simiIt->second.erase(it);
3480 auto& subs = histoIt->second;
3481 auto it = subs.begin();
3482 while (it != subs.end())
3485 if (currLedgerSeq <= info.index_->separationLedgerSeq_)
3499 it = subs.erase(it);
3510 <<
"pubAccountTransaction: "
3511 <<
"proposed=" << iProposed <<
", accepted=" << iAccepted;
3513 if (!notify.
empty() || !accountHistoryNotify.
empty())
3515 auto const& stTxn = transaction.
getTxn();
3519 auto const trResult = transaction.
getResult();
3525 isrListener->getApiVersion(),
3526 [&](
Json::Value const& jv) { isrListener->send(jv,
true); });
3530 jvObj.
set(jss::account_history_boundary,
true);
3533 jvObj.
isMember(jss::account_history_tx_stream) ==
3535 "ripple::NetworkOPsImp::pubAccountTransaction : "
3536 "account_history_tx_stream not set");
3537 for (
auto& info : accountHistoryNotify)
3539 auto& index = info.index_;
3540 if (index->forwardTxIndex_ == 0 && !index->haveHistorical_)
3541 jvObj.
set(jss::account_history_tx_first,
true);
3543 jvObj.
set(jss::account_history_tx_index, index->forwardTxIndex_++);
3546 info.sink_->getApiVersion(),
3547 [&](
Json::Value const& jv) { info.sink_->send(jv,
true); });
3572 for (
auto const& affectedAccount : tx->getMentionedAccounts())
3577 auto it = simiIt->second.begin();
3579 while (it != simiIt->second.end())
3590 it = simiIt->second.erase(it);
3597 JLOG(
m_journal.
trace()) <<
"pubProposedAccountTransaction: " << iProposed;
3599 if (!notify.
empty() || !accountHistoryNotify.
empty())
3606 isrListener->getApiVersion(),
3607 [&](
Json::Value const& jv) { isrListener->send(jv,
true); });
3610 jvObj.
isMember(jss::account_history_tx_stream) ==
3612 "ripple::NetworkOPs::pubProposedAccountTransaction : "
3613 "account_history_tx_stream not set");
3614 for (
auto& info : accountHistoryNotify)
3616 auto& index = info.index_;
3617 if (index->forwardTxIndex_ == 0 && !index->haveHistorical_)
3618 jvObj.
set(jss::account_history_tx_first,
true);
3619 jvObj.
set(jss::account_history_tx_index, index->forwardTxIndex_++);
3621 info.sink_->getApiVersion(),
3622 [&](
Json::Value const& jv) { info.sink_->send(jv,
true); });
3639 for (
auto const& naAccountID : vnaAccountIDs)
3642 <<
"subAccount: account: " <<
toBase58(naAccountID);
3644 isrListener->insertSubAccountInfo(naAccountID, rt);
3649 for (
auto const& naAccountID : vnaAccountIDs)
3651 auto simIterator = subMap.
find(naAccountID);
3652 if (simIterator == subMap.
end())
3656 usisElement[isrListener->getSeq()] = isrListener;
3658 subMap.
insert(simIterator, make_pair(naAccountID, usisElement));
3663 simIterator->second[isrListener->getSeq()] = isrListener;
3674 for (
auto const& naAccountID : vnaAccountIDs)
3677 isrListener->deleteSubAccountInfo(naAccountID, rt);
3694 for (
auto const& naAccountID : vnaAccountIDs)
3696 auto simIterator = subMap.
find(naAccountID);
3698 if (simIterator != subMap.
end())
3701 simIterator->second.erase(uSeq);
3703 if (simIterator->second.empty())
3706 subMap.
erase(simIterator);
3715 enum DatabaseType { Sqlite,
None };
3716 static auto const databaseType = [&]() -> DatabaseType {
3721 return DatabaseType::Sqlite;
3723 return DatabaseType::None;
3726 if (databaseType == DatabaseType::None)
3730 "ripple::NetworkOPsImp::addAccountHistoryJob : no database");
3732 <<
"AccountHistory job for account "
3745 "AccountHistoryTxStream",
3746 [
this, dbType = databaseType, subInfo]() {
3747 auto const& accountId = subInfo.
index_->accountId_;
3748 auto& lastLedgerSeq = subInfo.
index_->historyLastLedgerSeq_;
3749 auto& txHistoryIndex = subInfo.
index_->historyTxIndex_;
3752 <<
"AccountHistory job for account " <<
toBase58(accountId)
3753 <<
" started. lastLedgerSeq=" << lastLedgerSeq;
3763 auto stx = tx->getSTransaction();
3764 if (stx->getAccountID(sfAccount) == accountId &&
3765 stx->getSeqValue() == 1)
3769 for (
auto& node : meta->getNodes())
3771 if (node.getFieldU16(sfLedgerEntryType) != ltACCOUNT_ROOT)
3774 if (node.isFieldPresent(sfNewFields))
3776 if (
auto inner =
dynamic_cast<STObject const*
>(
3777 node.peekAtPField(sfNewFields));
3780 if (inner->isFieldPresent(sfAccount) &&
3781 inner->getAccountID(sfAccount) == accountId)
3793 bool unsubscribe) ->
bool {
3796 sptr->send(jvObj,
true);
3806 bool unsubscribe) ->
bool {
3810 sptr->getApiVersion(),
3811 [&](
Json::Value const& jv) { sptr->send(jv,
true); });
3834 accountId, minLedger, maxLedger, marker, 0,
true};
3835 return db->newestAccountTxPage(options);
3840 "ripple::NetworkOPsImp::addAccountHistoryJob : "
3841 "getMoreTxns : invalid database type");
3851 while (lastLedgerSeq >= 2 && !subInfo.
index_->stopHistorical_)
3853 int feeChargeCount = 0;
3862 <<
"AccountHistory job for account "
3863 <<
toBase58(accountId) <<
" no InfoSub. Fee charged "
3864 << feeChargeCount <<
" times.";
3869 auto startLedgerSeq =
3870 (lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 : 2);
3872 <<
"AccountHistory job for account " <<
toBase58(accountId)
3873 <<
", working on ledger range [" << startLedgerSeq <<
","
3874 << lastLedgerSeq <<
"]";
3876 auto haveRange = [&]() ->
bool {
3879 auto haveSomeValidatedLedgers =
3881 validatedMin, validatedMax);
3883 return haveSomeValidatedLedgers &&
3884 validatedMin <= startLedgerSeq &&
3885 lastLedgerSeq <= validatedMax;
3891 <<
"AccountHistory reschedule job for account "
3892 <<
toBase58(accountId) <<
", incomplete ledger range ["
3893 << startLedgerSeq <<
"," << lastLedgerSeq <<
"]";
3899 while (!subInfo.
index_->stopHistorical_)
3902 getMoreTxns(startLedgerSeq, lastLedgerSeq, marker);
3907 "ripple::NetworkOPsImp::addAccountHistoryJob : "
3908 "getMoreTxns failed");
3910 <<
"AccountHistory job for account "
3911 <<
toBase58(accountId) <<
" getMoreTxns failed.";
3917 auto const& txns = dbResult->first;
3918 marker = dbResult->second;
3919 size_t num_txns = txns.size();
3920 for (
size_t i = 0; i < num_txns; ++i)
3922 auto const& [tx, meta] = txns[i];
3927 <<
"AccountHistory job for account "
3928 <<
toBase58(accountId) <<
" empty tx or meta.";
3939 "ripple::NetworkOPsImp::addAccountHistoryJob : "
3940 "getLedgerBySeq failed");
3942 <<
"AccountHistory job for account "
3943 <<
toBase58(accountId) <<
" no ledger.";
3949 tx->getSTransaction();
3954 "NetworkOPsImp::addAccountHistoryJob : "
3955 "getSTransaction failed");
3957 <<
"AccountHistory job for account "
3959 <<
" getSTransaction failed.";
3966 auto const trR = meta->getResultTER();
3968 transJson(stTxn, trR,
true, curTxLedger, mRef);
3971 jss::account_history_tx_index, txHistoryIndex--);
3972 if (i + 1 == num_txns ||
3973 txns[i + 1].first->getLedger() != tx->getLedger())
3974 jvTx.
set(jss::account_history_boundary,
true);
3976 if (isFirstTx(tx, meta))
3978 jvTx.
set(jss::account_history_tx_first,
true);
3979 sendMultiApiJson(jvTx,
false);
3982 <<
"AccountHistory job for account "
3984 <<
" done, found last tx.";
3989 sendMultiApiJson(jvTx,
false);
3996 <<
"AccountHistory job for account "
3998 <<
" paging, marker=" << marker->ledgerSeq <<
":"
4007 if (!subInfo.
index_->stopHistorical_)
4009 lastLedgerSeq = startLedgerSeq - 1;
4010 if (lastLedgerSeq <= 1)
4013 <<
"AccountHistory job for account "
4015 <<
" done, reached genesis ledger.";
4028 subInfo.
index_->separationLedgerSeq_ = ledger->seq();
4029 auto const& accountId = subInfo.
index_->accountId_;
4031 if (!ledger->exists(accountKeylet))
4034 <<
"subAccountHistoryStart, no account " <<
toBase58(accountId)
4035 <<
", no need to add AccountHistory job.";
4040 if (
auto const sleAcct = ledger->read(accountKeylet); sleAcct)
4042 if (sleAcct->getFieldU32(sfSequence) == 1)
4045 <<
"subAccountHistoryStart, genesis account "
4047 <<
" does not have tx, no need to add AccountHistory job.";
4055 "ripple::NetworkOPsImp::subAccountHistoryStart : failed to "
4056 "access genesis account");
4061 subInfo.
index_->historyLastLedgerSeq_ = ledger->seq();
4062 subInfo.
index_->haveHistorical_ =
true;
4065 <<
"subAccountHistoryStart, add AccountHistory job: accountId="
4066 <<
toBase58(accountId) <<
", currentLedgerSeq=" << ledger->seq();
4076 if (!isrListener->insertSubAccountHistory(accountId))
4079 <<
"subAccountHistory, already subscribed to account "
4091 inner.
emplace(isrListener->getSeq(), ahi);
4097 simIterator->second.emplace(isrListener->getSeq(), ahi);
4111 <<
"subAccountHistory, no validated ledger yet, delay start";
4124 isrListener->deleteSubAccountHistory(account);
4138 auto& subInfoMap = simIterator->second;
4139 auto subInfoIter = subInfoMap.find(seq);
4140 if (subInfoIter != subInfoMap.end())
4142 subInfoIter->second.index_->stopHistorical_ =
true;
4147 simIterator->second.erase(seq);
4148 if (simIterator->second.empty())
4154 <<
"unsubAccountHistory, account " <<
toBase58(account)
4155 <<
", historyOnly = " << (historyOnly ?
"true" :
"false");
4163 listeners->addSubscriber(isrListener);
4167 UNREACHABLE(
"ripple::NetworkOPsImp::subBook : null book listeners");
4177 listeners->removeSubscriber(uSeq);
4189 m_standalone,
"ripple::NetworkOPsImp::acceptLedger : is standalone");
4192 Throw<std::runtime_error>(
4193 "Operation only possible in STANDALONE mode.");
4208 jvResult[jss::ledger_index] = lpClosed->info().seq;
4209 jvResult[jss::ledger_hash] =
to_string(lpClosed->info().hash);
4211 lpClosed->info().closeTime.time_since_epoch().count());
4212 if (!lpClosed->rules().enabled(featureXRPFees))
4214 jvResult[jss::fee_base] = lpClosed->fees().base.jsonClipped();
4215 jvResult[jss::reserve_base] = lpClosed->fees().reserve.jsonClipped();
4216 jvResult[jss::reserve_inc] = lpClosed->fees().increment.jsonClipped();
4222 jvResult[jss::validated_ledgers] =
4228 .emplace(isrListener->getSeq(), isrListener)
4238 .emplace(isrListener->getSeq(), isrListener)
4264 .emplace(isrListener->getSeq(), isrListener)
4292 jvResult[jss::random] =
to_string(uRandom);
4294 jvResult[jss::load_base] = feeTrack.getLoadBase();
4295 jvResult[jss::load_factor] = feeTrack.getLoadFactor();
4296 jvResult[jss::hostid] =
getHostId(admin);
4297 jvResult[jss::pubkey_node] =
4302 .emplace(isrListener->getSeq(), isrListener)
4320 .emplace(isrListener->getSeq(), isrListener)
4338 .emplace(isrListener->getSeq(), isrListener)
4356 .emplace(isrListener->getSeq(), isrListener)
4380 .emplace(isrListener->getSeq(), isrListener)
4398 .emplace(isrListener->getSeq(), isrListener)
4446 if (map.find(pInfo->getSeq()) != map.end())
4453#ifndef USE_NEW_BOOK_PAGE
4464 unsigned int iLimit,
4474 uint256 uTipIndex = uBookBase;
4478 stream <<
"getBookPage:" << book;
4479 stream <<
"getBookPage: uBookBase=" << uBookBase;
4480 stream <<
"getBookPage: uBookEnd=" << uBookEnd;
4481 stream <<
"getBookPage: uTipIndex=" << uTipIndex;
4490 bool bDirectAdvance =
true;
4494 unsigned int uBookEntry;
4500 while (!bDone && iLimit-- > 0)
4504 bDirectAdvance =
false;
4508 auto const ledgerIndex = view.
succ(uTipIndex, uBookEnd);
4512 sleOfferDir.
reset();
4521 uTipIndex = sleOfferDir->key();
4524 cdirFirst(view, uTipIndex, sleOfferDir, uBookEntry, offerIndex);
4527 <<
"getBookPage: uTipIndex=" << uTipIndex;
4529 <<
"getBookPage: offerIndex=" << offerIndex;
4539 auto const uOfferOwnerID = sleOffer->getAccountID(sfAccount);
4540 auto const& saTakerGets = sleOffer->getFieldAmount(sfTakerGets);
4541 auto const& saTakerPays = sleOffer->getFieldAmount(sfTakerPays);
4543 bool firstOwnerOffer(
true);
4549 saOwnerFunds = saTakerGets;
4551 else if (bGlobalFreeze)
4559 auto umBalanceEntry = umBalance.
find(uOfferOwnerID);
4560 if (umBalanceEntry != umBalance.
end())
4564 saOwnerFunds = umBalanceEntry->second;
4565 firstOwnerOffer =
false;
4579 if (saOwnerFunds < beast::zero)
4583 saOwnerFunds.
clear();
4591 STAmount saOwnerFundsLimit = saOwnerFunds;
4603 saOwnerFundsLimit =
divide(saOwnerFunds, offerRate);
4606 if (saOwnerFundsLimit >= saTakerGets)
4609 saTakerGetsFunded = saTakerGets;
4615 saTakerGetsFunded = saOwnerFundsLimit;
4617 saTakerGetsFunded.
setJson(jvOffer[jss::taker_gets_funded]);
4621 saTakerGetsFunded, saDirRate, saTakerPays.
issue()))
4622 .setJson(jvOffer[jss::taker_pays_funded]);
4628 saOwnerFunds,
multiply(saTakerGetsFunded, offerRate));
4630 umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
4634 jvOf[jss::quality] = saDirRate.
getText();
4636 if (firstOwnerOffer)
4637 jvOf[jss::owner_funds] = saOwnerFunds.
getText();
4644 if (!
cdirNext(view, uTipIndex, sleOfferDir, uBookEntry, offerIndex))
4646 bDirectAdvance =
true;
4651 <<
"getBookPage: offerIndex=" << offerIndex;
4671 unsigned int iLimit,
4679 MetaView lesActive(lpLedger,
tapNONE,
true);
4680 OrderBookIterator obIterator(lesActive, book);
4684 bool const bGlobalFreeze = lesActive.isGlobalFrozen(book.
out.
account) ||
4685 lesActive.isGlobalFrozen(book.
in.
account);
4687 while (iLimit-- > 0 && obIterator.nextOffer())
4692 auto const uOfferOwnerID = sleOffer->getAccountID(sfAccount);
4693 auto const& saTakerGets = sleOffer->getFieldAmount(sfTakerGets);
4694 auto const& saTakerPays = sleOffer->getFieldAmount(sfTakerPays);
4695 STAmount saDirRate = obIterator.getCurrentRate();
4701 saOwnerFunds = saTakerGets;
4703 else if (bGlobalFreeze)
4711 auto umBalanceEntry = umBalance.
find(uOfferOwnerID);
4713 if (umBalanceEntry != umBalance.
end())
4717 saOwnerFunds = umBalanceEntry->second;
4723 saOwnerFunds = lesActive.accountHolds(
4729 if (saOwnerFunds.isNegative())
4733 saOwnerFunds.zero();
4740 STAmount saTakerGetsFunded;
4741 STAmount saOwnerFundsLimit = saOwnerFunds;
4753 saOwnerFundsLimit =
divide(saOwnerFunds, offerRate);
4756 if (saOwnerFundsLimit >= saTakerGets)
4759 saTakerGetsFunded = saTakerGets;
4764 saTakerGetsFunded = saOwnerFundsLimit;
4766 saTakerGetsFunded.setJson(jvOffer[jss::taker_gets_funded]);
4772 multiply(saTakerGetsFunded, saDirRate, saTakerPays.issue()))
4773 .setJson(jvOffer[jss::taker_pays_funded]);
4776 STAmount saOwnerPays = (
parityRate == offerRate)
4779 saOwnerFunds,
multiply(saTakerGetsFunded, offerRate));
4781 umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
4783 if (!saOwnerFunds.isZero() || uOfferOwnerID == uTakerID)
4787 jvOf[jss::quality] = saDirRate.
getText();
4802 auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
4842 ++counters_[
static_cast<std::size_t>(om)].transitions;
4844 counters_[
static_cast<std::size_t>(om)].transitions == 1)
4846 initialSyncUs_ = std::chrono::duration_cast<std::chrono::microseconds>(
4847 now - processStart_)
4851 std::chrono::duration_cast<std::chrono::microseconds>(now - start_);
4860 auto [counters, mode, start, initialSync] = getCounterData();
4861 auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
4871 auto& state = obj[jss::state_accounting][
states_[i]];
4872 state[jss::transitions] =
std::to_string(counters[i].transitions);
4873 state[jss::duration_us] =
std::to_string(counters[i].dur.count());
4877 obj[jss::initial_sync_duration_us] =
std::to_string(initialSync);
4892 boost::asio::io_context& io_svc,
T back_inserter(T... args)
Decorator for streaming out compact json.
Lightweight wrapper to tag static string.
Value & append(Value const &value)
Append value to array at the end.
bool isMember(char const *key) const
Return true if the object has a member named key.
Value get(UInt index, Value const &defaultValue) const
If the array contains at least index+1 elements, returns the element value, otherwise returns default...
A generic endpoint for log messages.
Stream trace() const
Severity stream access functions.
A metric for measuring an integral value.
void set(value_type value) const
Set the value on the gauge.
A reference to a handler for performing polled collection.
A transaction that is in a closed ledger.
boost::container::flat_set< AccountID > const & getAffected() const
std::shared_ptr< STTx const > const & getTxn() const
TxMeta const & getMeta() const
virtual std::optional< NetClock::time_point > firstUnsupportedExpected() const =0
virtual Config & config()=0
virtual Overlay & overlay()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual OpenLedger & openLedger()=0
virtual beast::Journal journal(std::string const &name)=0
virtual NodeStore::Database & getNodeStore()=0
virtual ServerHandler & getServerHandler()=0
virtual std::chrono::milliseconds getIOLatency()=0
virtual OrderBookDB & getOrderBookDB()=0
virtual TimeKeeper & timeKeeper()=0
virtual TaggedCache< uint256, AcceptedLedger > & getAcceptedLedgerCache()=0
virtual JobQueue & getJobQueue()=0
virtual InboundLedgers & getInboundLedgers()=0
virtual ValidatorList & validators()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual LedgerMaster & getLedgerMaster()=0
virtual RelationalDatabase & getRelationalDatabase()=0
virtual ManifestCache & validatorManifests()=0
virtual perf::PerfLog & getPerfLog()=0
virtual Cluster & cluster()=0
virtual AmendmentTable & getAmendmentTable()=0
virtual std::pair< PublicKey, SecretKey > const & nodeIdentity()=0
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
Holds transactions which were deferred to the next pass of consensus.
The role of a ClosureCounter is to assist in shutdown by letting callers wait for the completion of c...
std::string const & name() const
std::uint32_t getLoadFee() const
NetClock::time_point getReportTime() const
PublicKey const & identity() const
std::size_t size() const
The number of nodes in the cluster list.
std::string SERVER_DOMAIN
static constexpr std::uint32_t FEE_UNITS_DEPRECATED
int RELAY_UNTRUSTED_VALIDATIONS
virtual void clearFailures()=0
virtual Json::Value getInfo()=0
std::shared_ptr< InfoSub > pointer
A pool of threads to perform work.
Json::Value getJson(int c=0)
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
std::shared_ptr< Ledger const > getValidatedLedger()
bool haveValidated()
Whether we have ever fully validated a ledger.
std::shared_ptr< ReadView const > getCurrentLedger()
bool getValidatedRange(std::uint32_t &minVal, std::uint32_t &maxVal)
std::shared_ptr< Ledger const > getClosedLedger()
std::string getCompleteLedgers()
std::size_t getFetchPackCacheSize() const
std::shared_ptr< ReadView const > getPublishedLedger()
std::shared_ptr< Ledger const > getLedgerBySeq(std::uint32_t index)
std::chrono::seconds getValidatedLedgerAge()
Manages the current fee schedule.
std::uint32_t getClusterFee() const
std::uint32_t getLocalFee() const
std::uint32_t getLoadBase() const
std::uint32_t getRemoteFee() const
std::uint32_t getLoadFactor() const
void heartbeat()
Reset the stall detection timer.
PublicKey getMasterKey(PublicKey const &pk) const
Returns ephemeral signing key's master public key.
State accounting records two attributes for each possible server state: 1) Amount of time spent in ea...
void mode(OperatingMode om)
Record state transition.
void json(Json::Value &obj) const
Output state counters in JSON format.
std::array< Counters, 5 > counters_
std::uint64_t initialSyncUs_
CounterData getCounterData() const
std::chrono::steady_clock::time_point start_
static std::array< Json::StaticString const, 5 > const states_
std::chrono::steady_clock::time_point const processStart_
Transaction with input flags and results to be applied in batches.
TransactionStatus(std::shared_ptr< Transaction > t, bool a, bool l, FailHard f)
std::shared_ptr< Transaction > const transaction
void processClusterTimer()
boost::asio::steady_timer accountHistoryTxTimer_
void pubProposedTransaction(std::shared_ptr< ReadView const > const &ledger, std::shared_ptr< STTx const > const &transaction, TER result) override
OperatingMode getOperatingMode() const override
std::string strOperatingMode(OperatingMode const mode, bool const admin) const override
bool preProcessTransaction(std::shared_ptr< Transaction > &transaction)
std::vector< TransactionStatus > mTransactions
bool unsubBookChanges(std::uint64_t uListener) override
std::atomic< OperatingMode > mMode
Json::Value getLedgerFetchInfo() override
bool isUNLBlocked() override
void unsubAccount(InfoSub::ref ispListener, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
Json::Value getOwnerInfo(std::shared_ptr< ReadView const > lpLedger, AccountID const &account) override
void setNeedNetworkLedger() override
void setUNLBlocked() override
void pubConsensus(ConsensusPhase phase)
void transactionBatch()
Apply transactions in batches.
void apply(std::unique_lock< std::mutex > &batchLock)
Attempt to apply transactions and post-process based on the results.
void setAmendmentBlocked() override
bool checkLastClosedLedger(Overlay::PeerSequence const &, uint256 &networkClosed)
void processTransaction(std::shared_ptr< Transaction > &transaction, bool bUnlimited, bool bLocal, FailHard failType) override
Process transactions as they arrive from the network or which are submitted by clients.
void processTransactionSet(CanonicalTXSet const &set) override
Process a set of transactions synchronously, and ensuring that they are processed in one batch.
void clearUNLBlocked() override
boost::asio::steady_timer heartbeatTimer_
void updateLocalTx(ReadView const &view) override
bool unsubManifests(std::uint64_t uListener) override
DispatchState
Synchronization states for transaction batches.
std::optional< PublicKey > const validatorPK_
bool unsubTransactions(std::uint64_t uListener) override
void clearAmendmentWarned() override
std::size_t getLocalTxCount() override
std::unique_ptr< LocalTxs > m_localTX
bool subValidations(InfoSub::ref ispListener) override
bool subLedger(InfoSub::ref ispListener, Json::Value &jvResult) override
~NetworkOPsImp() override
bool isAmendmentBlocked() override
void unsubAccountHistoryInternal(std::uint64_t seq, AccountID const &account, bool historyOnly) override
SubAccountHistoryMapType mSubAccountHistory
Json::Value getServerInfo(bool human, bool admin, bool counters) override
InfoSub::pointer addRpcSub(std::string const &strUrl, InfoSub::ref) override
boost::asio::steady_timer clusterTimer_
bool isAmendmentWarned() override
static std::array< char const *, 5 > const states_
bool subServer(InfoSub::ref ispListener, Json::Value &jvResult, bool admin) override
void unsubAccountInternal(std::uint64_t seq, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
std::atomic< bool > amendmentBlocked_
SubInfoMapType mSubAccount
std::optional< PublicKey > const validatorMasterPK_
void unsubAccountHistory(InfoSub::ref ispListener, AccountID const &account, bool historyOnly) override
unsubscribe an account's transactions
std::set< uint256 > pendingValidations_
bool beginConsensus(uint256 const &networkClosed, std::unique_ptr< std::stringstream > const &clog) override
void doTransactionAsync(std::shared_ptr< Transaction > transaction, bool bUnlimited, FailHard failtype)
For transactions not submitted by a locally connected client, fire and forget.
void setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
bool unsubValidations(std::uint64_t uListener) override
void endConsensus(std::unique_ptr< std::stringstream > const &clog) override
ClosureCounter< void, boost::system::error_code const & > waitHandlerCounter_
void pubLedger(std::shared_ptr< ReadView const > const &lpAccepted) override
void addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
void doTransactionSync(std::shared_ptr< Transaction > transaction, bool bUnlimited, FailHard failType)
For transactions submitted directly by a client, apply batch of transactions and wait for this transa...
void setTimer(boost::asio::steady_timer &timer, std::chrono::milliseconds const &expiry_time, std::function< void()> onExpire, std::function< void()> onError)
std::array< SubMapType, SubTypes::sLastEntry > mStreamMaps
bool unsubPeerStatus(std::uint64_t uListener) override
void pubValidation(std::shared_ptr< STValidation > const &val) override
std::size_t const minPeerCount_
std::atomic< bool > unlBlocked_
bool subBook(InfoSub::ref ispListener, Book const &) override
std::uint32_t acceptLedger(std::optional< std::chrono::milliseconds > consensusDelay) override
Accepts the current transaction tree, return the new ledger's sequence.
void stateAccounting(Json::Value &obj) override
void submitTransaction(std::shared_ptr< STTx const > const &) override
bool unsubRTTransactions(std::uint64_t uListener) override
Json::Value getConsensusInfo() override
std::recursive_mutex mSubLock
std::atomic< bool > needNetworkLedger_
bool recvValidation(std::shared_ptr< STValidation > const &val, std::string const &source) override
void switchLastClosedLedger(std::shared_ptr< Ledger const > const &newLCL)
StateAccounting accounting_
void reportConsensusStateChange(ConsensusPhase phase)
bool subConsensus(InfoSub::ref ispListener) override
bool isNeedNetworkLedger() override
void setAmendmentWarned() override
bool processTrustedProposal(RCLCxPeerPos proposal) override
void doTransactionSyncBatch(std::unique_lock< std::mutex > &lock, std::function< bool(std::unique_lock< std::mutex > const &)> retryCallback)
bool subPeerStatus(InfoSub::ref ispListener) override
void mapComplete(std::shared_ptr< SHAMap > const &map, bool fromAcquire) override
bool tryRemoveRpcSub(std::string const &strUrl) override
void pubAccountTransaction(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &transaction, bool last)
LedgerMaster & m_ledgerMaster
void clearLedgerFetch() override
bool isBlocked() override
void consensusViewChange() override
void setStateTimer() override
Called to initially start our timers.
bool subManifests(InfoSub::ref ispListener) override
void pubValidatedTransaction(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &transaction, bool last)
void subAccount(InfoSub::ref ispListener, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
bool unsubServer(std::uint64_t uListener) override
MultiApiJson transJson(std::shared_ptr< STTx const > const &transaction, TER result, bool validated, std::shared_ptr< ReadView const > const &ledger, std::optional< std::reference_wrapper< TxMeta const > > meta)
ServerFeeSummary mLastFeeSummary
void pubPeerStatus(std::function< Json::Value(void)> const &) override
void setStandAlone() override
bool subRTTransactions(InfoSub::ref ispListener) override
void pubProposedAccountTransaction(std::shared_ptr< ReadView const > const &ledger, std::shared_ptr< STTx const > const &transaction, TER result)
std::condition_variable mCond
void setMode(OperatingMode om) override
void getBookPage(std::shared_ptr< ReadView const > &lpLedger, Book const &, AccountID const &uTakerID, bool const bProof, unsigned int iLimit, Json::Value const &jvMarker, Json::Value &jvResult) override
void clearNeedNetworkLedger() override
NetworkOPsImp(Application &app, NetworkOPs::clock_type &clock, bool standalone, std::size_t minPeerCount, bool start_valid, JobQueue &job_queue, LedgerMaster &ledgerMaster, ValidatorKeys const &validatorKeys, boost::asio::io_context &io_svc, beast::Journal journal, beast::insight::Collector::ptr const &collector)
DispatchState mDispatchState
bool subBookChanges(InfoSub::ref ispListener) override
SubInfoMapType mSubRTAccount
void reportFeeChange() override
void processHeartbeatTimer()
bool unsubBook(std::uint64_t uListener, Book const &) override
void subAccountHistoryStart(std::shared_ptr< ReadView const > const &ledger, SubAccountHistoryInfoWeak &subInfo)
error_code_i subAccountHistory(InfoSub::ref ispListener, AccountID const &account) override
subscribe an account's new transactions and retrieve the account's historical transactions
std::mutex validationsMutex_
void pubManifest(Manifest const &) override
ConsensusPhase mLastConsensusPhase
bool subTransactions(InfoSub::ref ispListener) override
std::atomic< bool > amendmentWarned_
InfoSub::pointer findRpcSub(std::string const &strUrl) override
bool unsubLedger(std::uint64_t uListener) override
std::string getHostId(bool forAdmin)
bool unsubConsensus(std::uint64_t uListener) override
Provides server functionality for clients.
void getCountsJson(Json::Value &obj)
std::shared_ptr< OpenView const > current() const
Returns a view to the current open ledger.
Writable ledger view that accumulates state and tx changes.
BookListeners::pointer getBookListeners(Book const &)
void processTxn(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &alTx, MultiApiJson const &jvObj)
BookListeners::pointer makeBookListeners(Book const &)
virtual std::optional< std::uint32_t > networkID() const =0
Returns the ID of the network this server is configured for, if any.
virtual std::uint64_t getPeerDisconnect() const =0
virtual std::size_t size() const =0
Returns the number of active peers.
virtual std::uint64_t getJqTransOverflow() const =0
virtual std::uint64_t getPeerDisconnectCharges() const =0
Manages the generic consensus algorithm for use by the RCL.
std::size_t prevProposers() const
Get the number of proposing peers that participated in the previous round.
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
std::chrono::milliseconds prevRoundTime() const
Get duration of the previous round.
Json::Value getJson(bool full) const
A peer's signed, proposed position for use in RCLConsensus.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
Represents a set of transactions in RCLConsensus.
Wraps a ledger instance for use in generic Validations LedgerTrie.
static std::string getWordFromBlob(void const *blob, size_t bytes)
Chooses a single dictionary word from the data.
Collects logging information.
std::unique_ptr< std::stringstream > const & ss()
virtual std::shared_ptr< SLE const > read(Keylet const &k) const =0
Return the state item associated with a key.
virtual std::optional< key_type > succ(key_type const &key, std::optional< key_type > const &last=std::nullopt) const =0
Return the key of the next state item.
void setJson(Json::Value &) const
std::string getText() const override
Issue const & issue() const
std::optional< T > get(std::string const &name) const
std::size_t size() const noexcept
void const * data() const noexcept
void setup(Setup const &setup, beast::Journal journal)
time_point now() const override
Returns the current time, using the server's clock.
std::chrono::seconds closeOffset() const
time_point closeTime() const
Returns the predicted close time, in network time.
Metrics getMetrics(OpenView const &view) const
Returns fee metrics in reference fee level units.
Validator keys and manifest as set in configuration file.
std::size_t count() const
Return the number of configured validator list sites.
std::optional< PublicKey > getTrustedKey(PublicKey const &identity) const
Returns master public key if public key is trusted.
std::optional< PublicKey > localPublicKey() const
This function returns the local validator public key or a std::nullopt.
std::optional< TimeKeeper::time_point > expires() const
Return the time when the validator list will expire.
std::size_t quorum() const
Get quorum value for current trusted key set.
constexpr double decimalXRP() const
Json::Value jsonClipped() const
static constexpr std::size_t size()
virtual Json::Value currentJson() const =0
Render currently executing jobs and RPC calls and durations in Json.
virtual Json::Value countersJson() const =0
Render performance counters in Json.
Automatically unlocks and re-locks a unique_lock object.
T emplace_back(T... args)
@ arrayValue
array value (ordered list)
@ objectValue
object value (collection of name/value pairs).
void rngfill(void *const buffer, std::size_t const bytes, Generator &g)
std::string const & getVersionString()
Server version.
std::optional< std::string > encodeCTID(uint32_t ledgerSeq, uint32_t txnIndex, uint32_t networkID) noexcept
Encodes ledger sequence, transaction index, and network ID into a CTID string.
Json::Value computeBookChanges(std::shared_ptr< L const > const &lpAccepted)
void insertNFTSyntheticInJson(Json::Value &, std::shared_ptr< STTx const > const &, TxMeta const &)
Adds common synthetic fields to transaction-related JSON responses.
void insertMPTokenIssuanceID(Json::Value &response, std::shared_ptr< STTx const > const &transaction, TxMeta const &transactionMeta)
void insertDeliveredAmount(Json::Value &meta, ReadView const &, std::shared_ptr< STTx const > const &serializedTx, TxMeta const &)
Add a delivered_amount field to the meta input/output parameter.
Charge const feeMediumBurdenRPC
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
Keylet account(AccountID const &id) noexcept
AccountID root.
Keylet page(uint256 const &root, std::uint64_t index=0) noexcept
A page in a directory.
Keylet offer(AccountID const &id, std::uint32_t seq) noexcept
An offer from an account.
Rate rate(Env &env, Account const &account, std::uint32_t const &seq)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::unique_ptr< NetworkOPs > make_NetworkOPs(Application &app, NetworkOPs::clock_type &clock, bool standalone, std::size_t minPeerCount, bool startvalid, JobQueue &job_queue, LedgerMaster &ledgerMaster, ValidatorKeys const &validatorKeys, boost::asio::io_context &io_svc, beast::Journal journal, beast::insight::Collector::ptr const &collector)
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
STAmount divide(STAmount const &amount, Rate const &rate)
std::shared_ptr< STTx const > sterilize(STTx const &stx)
Sterilize a transaction.
STAmount accountFunds(ReadView const &view, AccountID const &id, STAmount const &saDefault, FreezeHandling freezeHandling, beast::Journal j)
std::uint64_t getQuality(uint256 const &uBase)
std::pair< PublicKey, SecretKey > generateKeyPair(KeyType type, Seed const &seed)
Generate a key pair deterministically.
auto constexpr muldiv_max
std::unique_ptr< LocalTxs > make_LocalTxs()
STAmount amountFromQuality(std::uint64_t rate)
void handleNewValidation(Application &app, std::shared_ptr< STValidation > const &val, std::string const &source, BypassAccept const bypassAccept, std::optional< beast::Journal > j)
Handle a new validation.
@ warnRPC_EXPIRED_VALIDATOR_LIST
@ warnRPC_UNSUPPORTED_MAJORITY
@ warnRPC_AMENDMENT_BLOCKED
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
std::unique_ptr< FeeVote > make_FeeVote(FeeSetup const &setup, beast::Journal journal)
Create an instance of the FeeVote logic.
OperatingMode
Specifies the mode under which the server believes it's operating.
@ TRACKING
convinced we agree with the network
@ DISCONNECTED
not ready to process requests
@ CONNECTED
convinced we are talking to the network
@ FULL
we have the ledger and can even validate
@ SYNCING
fallen slightly behind
STAmount multiply(STAmount const &amount, Rate const &rate)
AccountID calcAccountID(PublicKey const &pk)
@ current
This was a new validation and was added.
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
Json::Value rpcError(int iError)
bool isTefFailure(TER x) noexcept
ConsensusPhase
Phases of consensus for a single ledger round.
static std::array< char const *, 5 > const stateNames
std::string strHex(FwdIt begin, FwdIt end)
Rate transferRate(ReadView const &view, AccountID const &issuer)
Returns IOU issuer transfer fee as Rate.
void forAllApiVersions(Fn const &fn, Args &&... args)
bool isTerRetry(TER x) noexcept
send_if_pred< Predicate > send_if(std::shared_ptr< Message > const &m, Predicate const &f)
Helper function to aid in type deduction.
uint256 getQualityNext(uint256 const &uBase)
STAmount accountHolds(ReadView const &view, AccountID const &account, Currency const ¤cy, AccountID const &issuer, FreezeHandling zeroIfFrozen, beast::Journal j)
bool isTesSuccess(TER x) noexcept
Rules makeRulesGivenLedger(DigestAwareReadView const &ledger, Rules const ¤t)
std::string to_string_iso(date::sys_time< Duration > tp)
bool cdirFirst(ReadView const &view, uint256 const &root, std::shared_ptr< SLE const > &page, unsigned int &index, uint256 &entry)
Returns the first entry in the directory, advancing the index.
std::string to_string(base_uint< Bits, Tag > const &a)
FeeSetup setup_FeeVote(Section const §ion)
bool isTemMalformed(TER x) noexcept
Number root(Number f, unsigned d)
std::optional< std::uint64_t > mulDiv(std::uint64_t value, std::uint64_t mul, std::uint64_t div)
Return value*mul/div accurately.
Json::Value getJson(LedgerFill const &fill)
Return a new Json::Value representing the ledger with given options.
@ ledgerMaster
ledger master data for signing
@ proposal
proposal for signing
bool cdirNext(ReadView const &view, uint256 const &root, std::shared_ptr< SLE const > &page, unsigned int &index, uint256 &entry)
Returns the next entry in the directory, advancing the index.
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
Seed generateSeed(std::string const &passPhrase)
Generate a seed deterministically.
constexpr std::size_t maxPoppedTransactions
bool transResultInfo(TER code, std::string &token, std::string &text)
bool isTelLocal(TER x) noexcept
uint256 getBookBase(Book const &book)
constexpr std::uint32_t tfInnerBatchTxn
Rate const parityRate
A transfer rate signifying a 1:1 exchange.
bool isGlobalFrozen(ReadView const &view, AccountID const &issuer)
static std::uint32_t trunc32(std::uint64_t v)
static auto const genesisAccountId
T set_intersection(T... args)
std::string serialized
The manifest in serialized form.
std::uint32_t sequence
The sequence number of this manifest.
std::string domain
The domain, if one was specified in the manifest; empty otherwise.
std::optional< Blob > getSignature() const
Returns manifest signature.
std::optional< PublicKey > signingKey
The ephemeral key associated with this manifest.
Blob getMasterSignature() const
Returns manifest master key signature.
PublicKey masterKey
The master key associated with this manifest.
Server fees published on server subscription.
bool operator!=(ServerFeeSummary const &b) const
ServerFeeSummary()=default
std::optional< TxQ::Metrics > em
std::uint32_t loadFactorServer
bool operator==(ServerFeeSummary const &b) const
std::uint32_t loadBaseServer
decltype(initialSyncUs_) initialSyncUs
decltype(counters_) counters
std::uint64_t transitions
std::chrono::microseconds dur
beast::insight::Gauge full_transitions
Stats(Handler const &handler, beast::insight::Collector::ptr const &collector)
beast::insight::Hook hook
beast::insight::Gauge connected_duration
beast::insight::Gauge tracking_duration
beast::insight::Gauge connected_transitions
beast::insight::Gauge disconnected_transitions
beast::insight::Gauge syncing_duration
beast::insight::Gauge tracking_transitions
beast::insight::Gauge full_duration
beast::insight::Gauge disconnected_duration
beast::insight::Gauge syncing_transitions
std::uint32_t historyLastLedgerSeq_
std::uint32_t separationLedgerSeq_
AccountID const accountId_
std::uint32_t forwardTxIndex_
std::atomic< bool > stopHistorical_
std::int32_t historyTxIndex_
SubAccountHistoryIndex(AccountID const &accountId)
std::shared_ptr< SubAccountHistoryIndex > index_
std::shared_ptr< SubAccountHistoryIndex > index_
Represents a transfer rate.
Data format for exchanging consumption information across peers.
std::vector< Item > items
Changes in trusted nodes after updating validator list.
hash_set< NodeID > removed
Structure returned by TxQ::getMetrics, expressed in reference fee level units.
IsMemberResult isMember(char const *key) const
void set(char const *key, auto const &v)
Select all peers (except optional excluded) that are in our cluster.
Sends a message to all peers.
T time_since_epoch(T... args)