rippled
Loading...
Searching...
No Matches
NetworkOPs.cpp
1#include <xrpld/app/consensus/RCLConsensus.h>
2#include <xrpld/app/consensus/RCLValidations.h>
3#include <xrpld/app/ledger/AcceptedLedger.h>
4#include <xrpld/app/ledger/InboundLedgers.h>
5#include <xrpld/app/ledger/LedgerMaster.h>
6#include <xrpld/app/ledger/LedgerToJson.h>
7#include <xrpld/app/ledger/LocalTxs.h>
8#include <xrpld/app/ledger/OpenLedger.h>
9#include <xrpld/app/ledger/OrderBookDB.h>
10#include <xrpld/app/ledger/TransactionMaster.h>
11#include <xrpld/app/main/LoadManager.h>
12#include <xrpld/app/main/Tuning.h>
13#include <xrpld/app/misc/AmendmentTable.h>
14#include <xrpld/app/misc/DeliverMax.h>
15#include <xrpld/app/misc/HashRouter.h>
16#include <xrpld/app/misc/LoadFeeTrack.h>
17#include <xrpld/app/misc/NetworkOPs.h>
18#include <xrpld/app/misc/Transaction.h>
19#include <xrpld/app/misc/TxQ.h>
20#include <xrpld/app/misc/ValidatorKeys.h>
21#include <xrpld/app/misc/ValidatorList.h>
22#include <xrpld/app/misc/detail/AccountTxPaging.h>
23#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
24#include <xrpld/app/tx/apply.h>
25#include <xrpld/consensus/Consensus.h>
26#include <xrpld/consensus/ConsensusParms.h>
27#include <xrpld/overlay/Cluster.h>
28#include <xrpld/overlay/Overlay.h>
29#include <xrpld/overlay/predicates.h>
30#include <xrpld/perflog/PerfLog.h>
31#include <xrpld/rpc/BookChanges.h>
32#include <xrpld/rpc/CTID.h>
33#include <xrpld/rpc/DeliveredAmount.h>
34#include <xrpld/rpc/MPTokenIssuanceID.h>
35#include <xrpld/rpc/ServerHandler.h>
36
37#include <xrpl/basics/UptimeClock.h>
38#include <xrpl/basics/mulDiv.h>
39#include <xrpl/basics/safe_cast.h>
40#include <xrpl/basics/scope.h>
41#include <xrpl/beast/utility/rngfill.h>
42#include <xrpl/crypto/RFC1751.h>
43#include <xrpl/crypto/csprng.h>
44#include <xrpl/protocol/BuildInfo.h>
45#include <xrpl/protocol/Feature.h>
46#include <xrpl/protocol/MultiApiJson.h>
47#include <xrpl/protocol/NFTSyntheticSerializer.h>
48#include <xrpl/protocol/RPCErr.h>
49#include <xrpl/protocol/TxFlags.h>
50#include <xrpl/protocol/jss.h>
51#include <xrpl/resource/Fees.h>
52#include <xrpl/resource/ResourceManager.h>
53
54#include <boost/asio/ip/host_name.hpp>
55#include <boost/asio/steady_timer.hpp>
56
57#include <algorithm>
58#include <exception>
59#include <mutex>
60#include <optional>
61#include <set>
62#include <sstream>
63#include <string>
64#include <tuple>
65#include <unordered_map>
66
67namespace ripple {
68
69class NetworkOPsImp final : public NetworkOPs
70{
76 {
77 public:
79 bool const admin;
80 bool const local;
82 bool applied = false;
84
87 bool a,
88 bool l,
89 FailHard f)
90 : transaction(t), admin(a), local(l), failType(f)
91 {
92 XRPL_ASSERT(
94 "ripple::NetworkOPsImp::TransactionStatus::TransactionStatus : "
95 "valid inputs");
96 }
97 };
98
102 enum class DispatchState : unsigned char {
103 none,
104 scheduled,
105 running,
106 };
107
109
125 {
133
137 std::chrono::steady_clock::time_point start_ =
139 std::chrono::steady_clock::time_point const processStart_ = start_;
142
143 public:
145 {
147 .transitions = 1;
148 }
149
156 void
158
164 void
165 json(Json::Value& obj) const;
166
168 {
170 decltype(mode_) mode;
171 decltype(start_) start;
173 };
174
177 {
180 }
181 };
182
185 {
186 ServerFeeSummary() = default;
187
189 XRPAmount fee,
190 TxQ::Metrics&& escalationMetrics,
191 LoadFeeTrack const& loadFeeTrack);
192 bool
193 operator!=(ServerFeeSummary const& b) const;
194
195 bool
197 {
198 return !(*this != b);
199 }
200
205 };
206
207public:
209 Application& app,
211 bool standalone,
212 std::size_t minPeerCount,
213 bool start_valid,
214 JobQueue& job_queue,
216 ValidatorKeys const& validatorKeys,
217 boost::asio::io_context& io_svc,
218 beast::Journal journal,
219 beast::insight::Collector::ptr const& collector)
220 : app_(app)
221 , m_journal(journal)
224 , heartbeatTimer_(io_svc)
225 , clusterTimer_(io_svc)
226 , accountHistoryTxTimer_(io_svc)
227 , mConsensus(
228 app,
230 setup_FeeVote(app_.config().section("voting")),
231 app_.logs().journal("FeeVote")),
233 *m_localTX,
234 app.getInboundTransactions(),
235 beast::get_abstract_clock<std::chrono::steady_clock>(),
236 validatorKeys,
237 app_.logs().journal("LedgerConsensus"))
238 , validatorPK_(
239 validatorKeys.keys ? validatorKeys.keys->publicKey
240 : decltype(validatorPK_){})
242 validatorKeys.keys ? validatorKeys.keys->masterPublicKey
243 : decltype(validatorMasterPK_){})
245 , m_job_queue(job_queue)
246 , m_standalone(standalone)
247 , minPeerCount_(start_valid ? 0 : minPeerCount)
248 , m_stats(std::bind(&NetworkOPsImp::collect_metrics, this), collector)
249 {
250 }
251
252 ~NetworkOPsImp() override
253 {
254 // This clear() is necessary to ensure the shared_ptrs in this map get
255 // destroyed NOW because the objects in this map invoke methods on this
256 // class when they are destroyed
258 }
259
260public:
262 getOperatingMode() const override;
263
265 strOperatingMode(OperatingMode const mode, bool const admin) const override;
266
268 strOperatingMode(bool const admin = false) const override;
269
270 //
271 // Transaction operations.
272 //
273
274 // Must complete immediately.
275 void
277
278 void
280 std::shared_ptr<Transaction>& transaction,
281 bool bUnlimited,
282 bool bLocal,
283 FailHard failType) override;
284
285 void
286 processTransactionSet(CanonicalTXSet const& set) override;
287
296 void
299 bool bUnlimited,
300 FailHard failType);
301
311 void
314 bool bUnlimited,
315 FailHard failtype);
316
317private:
318 bool
320
321 void
324 std::function<bool(std::unique_lock<std::mutex> const&)> retryCallback);
325
326public:
330 void
332
338 void
340
341 //
342 // Owner functions.
343 //
344
348 AccountID const& account) override;
349
350 //
351 // Book functions.
352 //
353
354 void
357 Book const&,
358 AccountID const& uTakerID,
359 bool const bProof,
360 unsigned int iLimit,
361 Json::Value const& jvMarker,
362 Json::Value& jvResult) override;
363
364 // Ledger proposal/close functions.
365 bool
367
368 bool
371 std::string const& source) override;
372
373 void
374 mapComplete(std::shared_ptr<SHAMap> const& map, bool fromAcquire) override;
375
376 // Network state machine.
377
378 // Used for the "jump" case.
379private:
380 void
382 bool
384
385public:
386 bool
388 uint256 const& networkClosed,
389 std::unique_ptr<std::stringstream> const& clog) override;
390 void
392 void
393 setStandAlone() override;
394
398 void
399 setStateTimer() override;
400
401 void
402 setNeedNetworkLedger() override;
403 void
404 clearNeedNetworkLedger() override;
405 bool
406 isNeedNetworkLedger() override;
407 bool
408 isFull() override;
409
410 void
411 setMode(OperatingMode om) override;
412
413 bool
414 isBlocked() override;
415 bool
416 isAmendmentBlocked() override;
417 void
418 setAmendmentBlocked() override;
419 bool
420 isAmendmentWarned() override;
421 void
422 setAmendmentWarned() override;
423 void
424 clearAmendmentWarned() override;
425 bool
426 isUNLBlocked() override;
427 void
428 setUNLBlocked() override;
429 void
430 clearUNLBlocked() override;
431 void
432 consensusViewChange() override;
433
435 getConsensusInfo() override;
437 getServerInfo(bool human, bool admin, bool counters) override;
438 void
439 clearLedgerFetch() override;
441 getLedgerFetchInfo() override;
444 std::optional<std::chrono::milliseconds> consensusDelay) override;
445 void
446 reportFeeChange() override;
447 void
449
450 void
451 updateLocalTx(ReadView const& view) override;
453 getLocalTxCount() override;
454
455 //
456 // Monitoring: publisher side.
457 //
458 void
459 pubLedger(std::shared_ptr<ReadView const> const& lpAccepted) override;
460 void
463 std::shared_ptr<STTx const> const& transaction,
464 TER result) override;
465 void
466 pubValidation(std::shared_ptr<STValidation> const& val) override;
467
468 //--------------------------------------------------------------------------
469 //
470 // InfoSub::Source.
471 //
472 void
474 InfoSub::ref ispListener,
475 hash_set<AccountID> const& vnaAccountIDs,
476 bool rt) override;
477 void
479 InfoSub::ref ispListener,
480 hash_set<AccountID> const& vnaAccountIDs,
481 bool rt) override;
482
483 // Just remove the subscription from the tracking
484 // not from the InfoSub. Needed for InfoSub destruction
485 void
487 std::uint64_t seq,
488 hash_set<AccountID> const& vnaAccountIDs,
489 bool rt) override;
490
492 subAccountHistory(InfoSub::ref ispListener, AccountID const& account)
493 override;
494 void
496 InfoSub::ref ispListener,
497 AccountID const& account,
498 bool historyOnly) override;
499
500 void
502 std::uint64_t seq,
503 AccountID const& account,
504 bool historyOnly) override;
505
506 bool
507 subLedger(InfoSub::ref ispListener, Json::Value& jvResult) override;
508 bool
509 unsubLedger(std::uint64_t uListener) override;
510
511 bool
512 subBookChanges(InfoSub::ref ispListener) override;
513 bool
514 unsubBookChanges(std::uint64_t uListener) override;
515
516 bool
517 subServer(InfoSub::ref ispListener, Json::Value& jvResult, bool admin)
518 override;
519 bool
520 unsubServer(std::uint64_t uListener) override;
521
522 bool
523 subBook(InfoSub::ref ispListener, Book const&) override;
524 bool
525 unsubBook(std::uint64_t uListener, Book const&) override;
526
527 bool
528 subManifests(InfoSub::ref ispListener) override;
529 bool
530 unsubManifests(std::uint64_t uListener) override;
531 void
532 pubManifest(Manifest const&) override;
533
534 bool
535 subTransactions(InfoSub::ref ispListener) override;
536 bool
537 unsubTransactions(std::uint64_t uListener) override;
538
539 bool
540 subRTTransactions(InfoSub::ref ispListener) override;
541 bool
542 unsubRTTransactions(std::uint64_t uListener) override;
543
544 bool
545 subValidations(InfoSub::ref ispListener) override;
546 bool
547 unsubValidations(std::uint64_t uListener) override;
548
549 bool
550 subPeerStatus(InfoSub::ref ispListener) override;
551 bool
552 unsubPeerStatus(std::uint64_t uListener) override;
553 void
554 pubPeerStatus(std::function<Json::Value(void)> const&) override;
555
556 bool
557 subConsensus(InfoSub::ref ispListener) override;
558 bool
559 unsubConsensus(std::uint64_t uListener) override;
560
562 findRpcSub(std::string const& strUrl) override;
564 addRpcSub(std::string const& strUrl, InfoSub::ref) override;
565 bool
566 tryRemoveRpcSub(std::string const& strUrl) override;
567
568 void
569 stop() override
570 {
571 {
572 try
573 {
574 heartbeatTimer_.cancel();
575 }
576 catch (boost::system::system_error const& e)
577 {
578 JLOG(m_journal.error())
579 << "NetworkOPs: heartbeatTimer cancel error: " << e.what();
580 }
581
582 try
583 {
584 clusterTimer_.cancel();
585 }
586 catch (boost::system::system_error const& e)
587 {
588 JLOG(m_journal.error())
589 << "NetworkOPs: clusterTimer cancel error: " << e.what();
590 }
591
592 try
593 {
594 accountHistoryTxTimer_.cancel();
595 }
596 catch (boost::system::system_error const& e)
597 {
598 JLOG(m_journal.error())
599 << "NetworkOPs: accountHistoryTxTimer cancel error: "
600 << e.what();
601 }
602 }
603 // Make sure that any waitHandlers pending in our timers are done.
604 using namespace std::chrono_literals;
605 waitHandlerCounter_.join("NetworkOPs", 1s, m_journal);
606 }
607
608 void
609 stateAccounting(Json::Value& obj) override;
610
611private:
612 void
613 setTimer(
614 boost::asio::steady_timer& timer,
615 std::chrono::milliseconds const& expiry_time,
616 std::function<void()> onExpire,
617 std::function<void()> onError);
618 void
620 void
622 void
624 void
626
628 transJson(
629 std::shared_ptr<STTx const> const& transaction,
630 TER result,
631 bool validated,
634
635 void
638 AcceptedLedgerTx const& transaction,
639 bool last);
640
641 void
644 AcceptedLedgerTx const& transaction,
645 bool last);
646
647 void
650 std::shared_ptr<STTx const> const& transaction,
651 TER result);
652
653 void
654 pubServer();
655 void
657
659 getHostId(bool forAdmin);
660
661private:
665
666 /*
667 * With a validated ledger to separate history and future, the node
668 * streams historical txns with negative indexes starting from -1,
669 * and streams future txns starting from index 0.
670 * The SubAccountHistoryIndex struct maintains these indexes.
671 * It also has a flag stopHistorical_ for stopping streaming
672 * the historical txns.
673 */
710
714 void
718 void
720 void
722
725
727
729
731
736
738 boost::asio::steady_timer heartbeatTimer_;
739 boost::asio::steady_timer clusterTimer_;
740 boost::asio::steady_timer accountHistoryTxTimer_;
741
743
746
748
750
753
755
757
758 enum SubTypes {
759 sLedger, // Accepted ledgers.
760 sManifests, // Received validator manifests.
761 sServer, // When server changes connectivity state.
762 sTransactions, // All accepted transactions.
763 sRTTransactions, // All proposed and accepted transactions.
764 sValidations, // Received validations.
765 sPeerStatus, // Peer status changes.
766 sConsensusPhase, // Consensus phase
767 sBookChanges, // Per-ledger order book changes
768 sLastEntry // Any new entry must be ADDED ABOVE this one
769 };
770
772
774
776
777 // Whether we are in standalone mode.
778 bool const m_standalone;
779
780 // The number of nodes that we need to consider ourselves connected.
782
783 // Transaction batching.
788
790
793
794private:
795 struct Stats
796 {
797 template <class Handler>
799 Handler const& handler,
800 beast::insight::Collector::ptr const& collector)
801 : hook(collector->make_hook(handler))
802 , disconnected_duration(collector->make_gauge(
803 "State_Accounting",
804 "Disconnected_duration"))
805 , connected_duration(collector->make_gauge(
806 "State_Accounting",
807 "Connected_duration"))
809 collector->make_gauge("State_Accounting", "Syncing_duration"))
810 , tracking_duration(collector->make_gauge(
811 "State_Accounting",
812 "Tracking_duration"))
814 collector->make_gauge("State_Accounting", "Full_duration"))
815 , disconnected_transitions(collector->make_gauge(
816 "State_Accounting",
817 "Disconnected_transitions"))
818 , connected_transitions(collector->make_gauge(
819 "State_Accounting",
820 "Connected_transitions"))
821 , syncing_transitions(collector->make_gauge(
822 "State_Accounting",
823 "Syncing_transitions"))
824 , tracking_transitions(collector->make_gauge(
825 "State_Accounting",
826 "Tracking_transitions"))
828 collector->make_gauge("State_Accounting", "Full_transitions"))
829 {
830 }
831
838
844 };
845
846 std::mutex m_statsMutex; // Mutex to lock m_stats
848
849private:
850 void
852};
853
854//------------------------------------------------------------------------------
855
857 {"disconnected", "connected", "syncing", "tracking", "full"}};
858
860
868
869static auto const genesisAccountId = calcAccountID(
871 .first);
872
873//------------------------------------------------------------------------------
874inline OperatingMode
876{
877 return mMode;
878}
879
880inline std::string
881NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
882{
883 return strOperatingMode(mMode, admin);
884}
885
886inline void
891
892inline void
897
898inline void
903
904inline bool
909
910inline bool
915
918{
919 static std::string const hostname = boost::asio::ip::host_name();
920
921 if (forAdmin)
922 return hostname;
923
924 // For non-admin uses hash the node public key into a
925 // single RFC1751 word:
926 static std::string const shroudedHostId = [this]() {
927 auto const& id = app_.nodeIdentity();
928
929 return RFC1751::getWordFromBlob(id.first.data(), id.first.size());
930 }();
931
932 return shroudedHostId;
933}
934
935void
937{
939
940 // Only do this work if a cluster is configured
941 if (app_.cluster().size() != 0)
943}
944
945void
947 boost::asio::steady_timer& timer,
948 std::chrono::milliseconds const& expiry_time,
949 std::function<void()> onExpire,
950 std::function<void()> onError)
951{
952 // Only start the timer if waitHandlerCounter_ is not yet joined.
953 if (auto optionalCountedHandler = waitHandlerCounter_.wrap(
954 [this, onExpire, onError](boost::system::error_code const& e) {
955 if ((e.value() == boost::system::errc::success) &&
956 (!m_job_queue.isStopped()))
957 {
958 onExpire();
959 }
960 // Recover as best we can if an unexpected error occurs.
961 if (e.value() != boost::system::errc::success &&
962 e.value() != boost::asio::error::operation_aborted)
963 {
964 // Try again later and hope for the best.
965 JLOG(m_journal.error())
966 << "Timer got error '" << e.message()
967 << "'. Restarting timer.";
968 onError();
969 }
970 }))
971 {
972 timer.expires_after(expiry_time);
973 timer.async_wait(std::move(*optionalCountedHandler));
974 }
975}
976
977void
978NetworkOPsImp::setHeartbeatTimer()
979{
980 setTimer(
981 heartbeatTimer_,
982 mConsensus.parms().ledgerGRANULARITY,
983 [this]() {
984 m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this]() {
985 processHeartbeatTimer();
986 });
987 },
988 [this]() { setHeartbeatTimer(); });
989}
990
991void
992NetworkOPsImp::setClusterTimer()
993{
994 using namespace std::chrono_literals;
995
996 setTimer(
997 clusterTimer_,
998 10s,
999 [this]() {
1000 m_job_queue.addJob(jtNETOP_CLUSTER, "NetOPs.cluster", [this]() {
1001 processClusterTimer();
1002 });
1003 },
1004 [this]() { setClusterTimer(); });
1005}
1006
1007void
1008NetworkOPsImp::setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
1009{
1010 JLOG(m_journal.debug()) << "Scheduling AccountHistory job for account "
1011 << toBase58(subInfo.index_->accountId_);
1012 using namespace std::chrono_literals;
1013 setTimer(
1014 accountHistoryTxTimer_,
1015 4s,
1016 [this, subInfo]() { addAccountHistoryJob(subInfo); },
1017 [this, subInfo]() { setAccountHistoryJobTimer(subInfo); });
1018}
1019
1020void
1021NetworkOPsImp::processHeartbeatTimer()
1022{
1023 RclConsensusLogger clog(
1024 "Heartbeat Timer", mConsensus.validating(), m_journal);
1025 {
1026 std::unique_lock lock{app_.getMasterMutex()};
1027
1028 // VFALCO NOTE This is for diagnosing a crash on exit
1029 LoadManager& mgr(app_.getLoadManager());
1030 mgr.heartbeat();
1031
1032 std::size_t const numPeers = app_.overlay().size();
1033
1034 // do we have sufficient peers? If not, we are disconnected.
1035 if (numPeers < minPeerCount_)
1036 {
1037 if (mMode != OperatingMode::DISCONNECTED)
1038 {
1039 setMode(OperatingMode::DISCONNECTED);
1041 ss << "Node count (" << numPeers << ") has fallen "
1042 << "below required minimum (" << minPeerCount_ << ").";
1043 JLOG(m_journal.warn()) << ss.str();
1044 CLOG(clog.ss()) << "set mode to DISCONNECTED: " << ss.str();
1045 }
1046 else
1047 {
1048 CLOG(clog.ss())
1049 << "already DISCONNECTED. too few peers (" << numPeers
1050 << "), need at least " << minPeerCount_;
1051 }
1052
1053 // MasterMutex lock need not be held to call setHeartbeatTimer()
1054 lock.unlock();
1055 // We do not call mConsensus.timerEntry until there are enough
1056 // peers providing meaningful inputs to consensus
1057 setHeartbeatTimer();
1058
1059 return;
1060 }
1061
1062 if (mMode == OperatingMode::DISCONNECTED)
1063 {
1064 setMode(OperatingMode::CONNECTED);
1065 JLOG(m_journal.info())
1066 << "Node count (" << numPeers << ") is sufficient.";
1067 CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers
1068 << " peers. ";
1069 }
1070
1071 // Check if the last validated ledger forces a change between these
1072 // states.
1073 auto origMode = mMode.load();
1074 CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
1075 if (mMode == OperatingMode::SYNCING)
1076 setMode(OperatingMode::SYNCING);
1077 else if (mMode == OperatingMode::CONNECTED)
1078 setMode(OperatingMode::CONNECTED);
1079 auto newMode = mMode.load();
1080 if (origMode != newMode)
1081 {
1082 CLOG(clog.ss())
1083 << ", changing to " << strOperatingMode(newMode, true);
1084 }
1085 CLOG(clog.ss()) << ". ";
1086 }
1087
1088 mConsensus.timerEntry(app_.timeKeeper().closeTime(), clog.ss());
1089
1090 CLOG(clog.ss()) << "consensus phase " << to_string(mLastConsensusPhase);
1091 ConsensusPhase const currPhase = mConsensus.phase();
1092 if (mLastConsensusPhase != currPhase)
1093 {
1094 reportConsensusStateChange(currPhase);
1095 mLastConsensusPhase = currPhase;
1096 CLOG(clog.ss()) << " changed to " << to_string(mLastConsensusPhase);
1097 }
1098 CLOG(clog.ss()) << ". ";
1099
1100 setHeartbeatTimer();
1101}
1102
1103void
1104NetworkOPsImp::processClusterTimer()
1105{
1106 if (app_.cluster().size() == 0)
1107 return;
1108
1109 using namespace std::chrono_literals;
1110
1111 bool const update = app_.cluster().update(
1112 app_.nodeIdentity().first,
1113 "",
1114 (m_ledgerMaster.getValidatedLedgerAge() <= 4min)
1115 ? app_.getFeeTrack().getLocalFee()
1116 : 0,
1117 app_.timeKeeper().now());
1118
1119 if (!update)
1120 {
1121 JLOG(m_journal.debug()) << "Too soon to send cluster update";
1122 setClusterTimer();
1123 return;
1124 }
1125
1126 protocol::TMCluster cluster;
1127 app_.cluster().for_each([&cluster](ClusterNode const& node) {
1128 protocol::TMClusterNode& n = *cluster.add_clusternodes();
1129 n.set_publickey(toBase58(TokenType::NodePublic, node.identity()));
1130 n.set_reporttime(node.getReportTime().time_since_epoch().count());
1131 n.set_nodeload(node.getLoadFee());
1132 if (!node.name().empty())
1133 n.set_nodename(node.name());
1134 });
1135
1136 Resource::Gossip gossip = app_.getResourceManager().exportConsumers();
1137 for (auto& item : gossip.items)
1138 {
1139 protocol::TMLoadSource& node = *cluster.add_loadsources();
1140 node.set_name(to_string(item.address));
1141 node.set_cost(item.balance);
1142 }
1143 app_.overlay().foreach(send_if(
1144 std::make_shared<Message>(cluster, protocol::mtCLUSTER),
1145 peer_in_cluster()));
1146 setClusterTimer();
1147}
1148
1149//------------------------------------------------------------------------------
1150
1152NetworkOPsImp::strOperatingMode(OperatingMode const mode, bool const admin)
1153 const
1154{
1155 if (mode == OperatingMode::FULL && admin)
1156 {
1157 auto const consensusMode = mConsensus.mode();
1158 if (consensusMode != ConsensusMode::wrongLedger)
1159 {
1160 if (consensusMode == ConsensusMode::proposing)
1161 return "proposing";
1162
1163 if (mConsensus.validating())
1164 return "validating";
1165 }
1166 }
1167
1168 return states_[static_cast<std::size_t>(mode)];
1169}
1170
1171void
1172NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
1173{
1174 if (isNeedNetworkLedger())
1175 {
1176 // Nothing we can do if we've never been in sync
1177 return;
1178 }
1179
1180 // Enforce Network bar for batch txn
1181 if (iTrans->isFlag(tfInnerBatchTxn) &&
1182 m_ledgerMaster.getValidatedRules().enabled(featureBatch))
1183 {
1184 JLOG(m_journal.error())
1185 << "Submitted transaction invalid: tfInnerBatchTxn flag present.";
1186 return;
1187 }
1188
1189 // this is an asynchronous interface
1190 auto const trans = sterilize(*iTrans);
1191
1192 auto const txid = trans->getTransactionID();
1193 auto const flags = app_.getHashRouter().getFlags(txid);
1194
1195 if ((flags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
1196 {
1197 JLOG(m_journal.warn()) << "Submitted transaction cached bad";
1198 return;
1199 }
1200
1201 try
1202 {
1203 auto const [validity, reason] = checkValidity(
1204 app_.getHashRouter(),
1205 *trans,
1206 m_ledgerMaster.getValidatedRules(),
1207 app_.config());
1208
1209 if (validity != Validity::Valid)
1210 {
1211 JLOG(m_journal.warn())
1212 << "Submitted transaction invalid: " << reason;
1213 return;
1214 }
1215 }
1216 catch (std::exception const& ex)
1217 {
1218 JLOG(m_journal.warn())
1219 << "Exception checking transaction " << txid << ": " << ex.what();
1220
1221 return;
1222 }
1223
1224 std::string reason;
1225
1226 auto tx = std::make_shared<Transaction>(trans, reason, app_);
1227
1228 m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx]() {
1229 auto t = tx;
1230 processTransaction(t, false, false, FailHard::no);
1231 });
1232}
1233
1234bool
1235NetworkOPsImp::preProcessTransaction(std::shared_ptr<Transaction>& transaction)
1236{
1237 auto const newFlags = app_.getHashRouter().getFlags(transaction->getID());
1238
1239 if ((newFlags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
1240 {
1241 // cached bad
1242 JLOG(m_journal.warn()) << transaction->getID() << ": cached bad!\n";
1243 transaction->setStatus(INVALID);
1244 transaction->setResult(temBAD_SIGNATURE);
1245 return false;
1246 }
1247
1248 auto const view = m_ledgerMaster.getCurrentLedger();
1249
1250 // This function is called by several different parts of the codebase
1251 // under no circumstances will we ever accept an inner txn within a batch
1252 // txn from the network.
1253 auto const sttx = *transaction->getSTransaction();
1254 if (sttx.isFlag(tfInnerBatchTxn) && view->rules().enabled(featureBatch))
1255 {
1256 transaction->setStatus(INVALID);
1257 transaction->setResult(temINVALID_FLAG);
1258 app_.getHashRouter().setFlags(
1259 transaction->getID(), HashRouterFlags::BAD);
1260 return false;
1261 }
1262
1263 // NOTE eahennis - I think this check is redundant,
1264 // but I'm not 100% sure yet.
1265 // If so, only cost is looking up HashRouter flags.
1266 auto const [validity, reason] =
1267 checkValidity(app_.getHashRouter(), sttx, view->rules(), app_.config());
1268 XRPL_ASSERT(
1269 validity == Validity::Valid,
1270 "ripple::NetworkOPsImp::processTransaction : valid validity");
1271
1272 // Not concerned with local checks at this point.
1273 if (validity == Validity::SigBad)
1274 {
1275 JLOG(m_journal.info()) << "Transaction has bad signature: " << reason;
1276 transaction->setStatus(INVALID);
1277 transaction->setResult(temBAD_SIGNATURE);
1278 app_.getHashRouter().setFlags(
1279 transaction->getID(), HashRouterFlags::BAD);
1280 return false;
1281 }
1282
1283 // canonicalize can change our pointer
1284 app_.getMasterTransaction().canonicalize(&transaction);
1285
1286 return true;
1287}
1288
1289void
1290NetworkOPsImp::processTransaction(
1291 std::shared_ptr<Transaction>& transaction,
1292 bool bUnlimited,
1293 bool bLocal,
1294 FailHard failType)
1295{
1296 auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN");
1297
1298 // preProcessTransaction can change our pointer
1299 if (!preProcessTransaction(transaction))
1300 return;
1301
1302 if (bLocal)
1303 doTransactionSync(transaction, bUnlimited, failType);
1304 else
1305 doTransactionAsync(transaction, bUnlimited, failType);
1306}
1307
1308void
1309NetworkOPsImp::doTransactionAsync(
1310 std::shared_ptr<Transaction> transaction,
1311 bool bUnlimited,
1312 FailHard failType)
1313{
1314 std::lock_guard lock(mMutex);
1315
1316 if (transaction->getApplying())
1317 return;
1318
1319 mTransactions.push_back(
1320 TransactionStatus(transaction, bUnlimited, false, failType));
1321 transaction->setApplying();
1322
1323 if (mDispatchState == DispatchState::none)
1324 {
1325 if (m_job_queue.addJob(
1326 jtBATCH, "transactionBatch", [this]() { transactionBatch(); }))
1327 {
1328 mDispatchState = DispatchState::scheduled;
1329 }
1330 }
1331}
1332
1333void
1334NetworkOPsImp::doTransactionSync(
1335 std::shared_ptr<Transaction> transaction,
1336 bool bUnlimited,
1337 FailHard failType)
1338{
1339 std::unique_lock<std::mutex> lock(mMutex);
1340
1341 if (!transaction->getApplying())
1342 {
1343 mTransactions.push_back(
1344 TransactionStatus(transaction, bUnlimited, true, failType));
1345 transaction->setApplying();
1346 }
1347
1348 doTransactionSyncBatch(
1349 lock, [&transaction](std::unique_lock<std::mutex> const&) {
1350 return transaction->getApplying();
1351 });
1352}
1353
1354void
1355NetworkOPsImp::doTransactionSyncBatch(
1357 std::function<bool(std::unique_lock<std::mutex> const&)> retryCallback)
1358{
1359 do
1360 {
1361 if (mDispatchState == DispatchState::running)
1362 {
1363 // A batch processing job is already running, so wait.
1364 mCond.wait(lock);
1365 }
1366 else
1367 {
1368 apply(lock);
1369
1370 if (mTransactions.size())
1371 {
1372 // More transactions need to be applied, but by another job.
1373 if (m_job_queue.addJob(jtBATCH, "transactionBatch", [this]() {
1374 transactionBatch();
1375 }))
1376 {
1377 mDispatchState = DispatchState::scheduled;
1378 }
1379 }
1380 }
1381 } while (retryCallback(lock));
1382}
1383
1384void
1385NetworkOPsImp::processTransactionSet(CanonicalTXSet const& set)
1386{
1387 auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXNSet");
1389 candidates.reserve(set.size());
1390 for (auto const& [_, tx] : set)
1391 {
1392 std::string reason;
1393 auto transaction = std::make_shared<Transaction>(tx, reason, app_);
1394
1395 if (transaction->getStatus() == INVALID)
1396 {
1397 if (!reason.empty())
1398 {
1399 JLOG(m_journal.trace())
1400 << "Exception checking transaction: " << reason;
1401 }
1402 app_.getHashRouter().setFlags(
1403 tx->getTransactionID(), HashRouterFlags::BAD);
1404 continue;
1405 }
1406
1407 // preProcessTransaction can change our pointer
1408 if (!preProcessTransaction(transaction))
1409 continue;
1410
1411 candidates.emplace_back(transaction);
1412 }
1413
1414 std::vector<TransactionStatus> transactions;
1415 transactions.reserve(candidates.size());
1416
1417 std::unique_lock lock(mMutex);
1418
1419 for (auto& transaction : candidates)
1420 {
1421 if (!transaction->getApplying())
1422 {
1423 transactions.emplace_back(transaction, false, false, FailHard::no);
1424 transaction->setApplying();
1425 }
1426 }
1427
1428 if (mTransactions.empty())
1429 mTransactions.swap(transactions);
1430 else
1431 {
1432 mTransactions.reserve(mTransactions.size() + transactions.size());
1433 for (auto& t : transactions)
1434 mTransactions.push_back(std::move(t));
1435 }
1436 if (mTransactions.empty())
1437 {
1438 JLOG(m_journal.debug()) << "No transaction to process!";
1439 return;
1440 }
1441
1442 doTransactionSyncBatch(lock, [&](std::unique_lock<std::mutex> const&) {
1443 XRPL_ASSERT(
1444 lock.owns_lock(),
1445 "ripple::NetworkOPsImp::processTransactionSet has lock");
1446 return std::any_of(
1447 mTransactions.begin(), mTransactions.end(), [](auto const& t) {
1448 return t.transaction->getApplying();
1449 });
1450 });
1451}
1452
1453void
1454NetworkOPsImp::transactionBatch()
1455{
1456 std::unique_lock<std::mutex> lock(mMutex);
1457
1458 if (mDispatchState == DispatchState::running)
1459 return;
1460
1461 while (mTransactions.size())
1462 {
1463 apply(lock);
1464 }
1465}
1466
1467void
1468NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
1469{
1471 std::vector<TransactionStatus> transactions;
1472 mTransactions.swap(transactions);
1473 XRPL_ASSERT(
1474 !transactions.empty(),
1475 "ripple::NetworkOPsImp::apply : non-empty transactions");
1476 XRPL_ASSERT(
1477 mDispatchState != DispatchState::running,
1478 "ripple::NetworkOPsImp::apply : is not running");
1479
1480 mDispatchState = DispatchState::running;
1481
1482 batchLock.unlock();
1483
1484 {
1485 std::unique_lock masterLock{app_.getMasterMutex(), std::defer_lock};
1486 bool changed = false;
1487 {
1488 std::unique_lock ledgerLock{
1489 m_ledgerMaster.peekMutex(), std::defer_lock};
1490 std::lock(masterLock, ledgerLock);
1491
1492 app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
1493 for (TransactionStatus& e : transactions)
1494 {
1495 // we check before adding to the batch
1496 ApplyFlags flags = tapNONE;
1497 if (e.admin)
1498 flags |= tapUNLIMITED;
1499
1500 if (e.failType == FailHard::yes)
1501 flags |= tapFAIL_HARD;
1502
1503 auto const result = app_.getTxQ().apply(
1504 app_, view, e.transaction->getSTransaction(), flags, j);
1505 e.result = result.ter;
1506 e.applied = result.applied;
1507 changed = changed || result.applied;
1508 }
1509 return changed;
1510 });
1511 }
1512 if (changed)
1513 reportFeeChange();
1514
1515 std::optional<LedgerIndex> validatedLedgerIndex;
1516 if (auto const l = m_ledgerMaster.getValidatedLedger())
1517 validatedLedgerIndex = l->info().seq;
1518
1519 auto newOL = app_.openLedger().current();
1520 for (TransactionStatus& e : transactions)
1521 {
1522 e.transaction->clearSubmitResult();
1523
1524 if (e.applied)
1525 {
1526 pubProposedTransaction(
1527 newOL, e.transaction->getSTransaction(), e.result);
1528 e.transaction->setApplied();
1529 }
1530
1531 e.transaction->setResult(e.result);
1532
1533 if (isTemMalformed(e.result))
1534 app_.getHashRouter().setFlags(
1535 e.transaction->getID(), HashRouterFlags::BAD);
1536
1537#ifdef DEBUG
1538 if (e.result != tesSUCCESS)
1539 {
1540 std::string token, human;
1541
1542 if (transResultInfo(e.result, token, human))
1543 {
1544 JLOG(m_journal.info())
1545 << "TransactionResult: " << token << ": " << human;
1546 }
1547 }
1548#endif
1549
1550 bool addLocal = e.local;
1551
1552 if (e.result == tesSUCCESS)
1553 {
1554 JLOG(m_journal.debug())
1555 << "Transaction is now included in open ledger";
1556 e.transaction->setStatus(INCLUDED);
1557
1558 // Pop as many "reasonable" transactions for this account as
1559 // possible. "Reasonable" means they have sequential sequence
1560 // numbers, or use tickets.
1561 auto const& txCur = e.transaction->getSTransaction();
1562
1563 std::size_t count = 0;
1564 for (auto txNext = m_ledgerMaster.popAcctTransaction(txCur);
1565 txNext && count < maxPoppedTransactions;
1566 txNext = m_ledgerMaster.popAcctTransaction(txCur), ++count)
1567 {
1568 if (!batchLock.owns_lock())
1569 batchLock.lock();
1570 std::string reason;
1571 auto const trans = sterilize(*txNext);
1572 auto t = std::make_shared<Transaction>(trans, reason, app_);
1573 if (t->getApplying())
1574 break;
1575 submit_held.emplace_back(t, false, false, FailHard::no);
1576 t->setApplying();
1577 }
1578 if (batchLock.owns_lock())
1579 batchLock.unlock();
1580 }
1581 else if (e.result == tefPAST_SEQ)
1582 {
1583 // duplicate or conflict
1584 JLOG(m_journal.info()) << "Transaction is obsolete";
1585 e.transaction->setStatus(OBSOLETE);
1586 }
1587 else if (e.result == terQUEUED)
1588 {
1589 JLOG(m_journal.debug())
1590 << "Transaction is likely to claim a"
1591 << " fee, but is queued until fee drops";
1592
1593 e.transaction->setStatus(HELD);
1594 // Add to held transactions, because it could get
1595 // kicked out of the queue, and this will try to
1596 // put it back.
1597 m_ledgerMaster.addHeldTransaction(e.transaction);
1598 e.transaction->setQueued();
1599 e.transaction->setKept();
1600 }
1601 else if (
1602 isTerRetry(e.result) || isTelLocal(e.result) ||
1603 isTefFailure(e.result))
1604 {
1605 if (e.failType != FailHard::yes)
1606 {
1607 auto const lastLedgerSeq =
1608 e.transaction->getSTransaction()->at(
1609 ~sfLastLedgerSequence);
1610 auto const ledgersLeft = lastLedgerSeq
1611 ? *lastLedgerSeq -
1612 m_ledgerMaster.getCurrentLedgerIndex()
1614 // If any of these conditions are met, the transaction can
1615 // be held:
1616 // 1. It was submitted locally. (Note that this flag is only
1617 // true on the initial submission.)
1618 // 2. The transaction has a LastLedgerSequence, and the
1619 // LastLedgerSequence is fewer than LocalTxs::holdLedgers
1620 // (5) ledgers into the future. (Remember that an
1621 // unseated optional compares as less than all seated
1622 // values, so it has to be checked explicitly first.)
1623 // 3. The HashRouterFlags::BAD flag is not set on the txID.
1624 // (setFlags
1625 // checks before setting. If the flag is set, it returns
1626 // false, which means it's been held once without one of
1627 // the other conditions, so don't hold it again. Time's
1628 // up!)
1629 //
1630 if (e.local ||
1631 (ledgersLeft && ledgersLeft <= LocalTxs::holdLedgers) ||
1632 app_.getHashRouter().setFlags(
1633 e.transaction->getID(), HashRouterFlags::HELD))
1634 {
1635 // transaction should be held
1636 JLOG(m_journal.debug())
1637 << "Transaction should be held: " << e.result;
1638 e.transaction->setStatus(HELD);
1639 m_ledgerMaster.addHeldTransaction(e.transaction);
1640 e.transaction->setKept();
1641 }
1642 else
1643 JLOG(m_journal.debug())
1644 << "Not holding transaction "
1645 << e.transaction->getID() << ": "
1646 << (e.local ? "local" : "network") << ", "
1647 << "result: " << e.result << " ledgers left: "
1648 << (ledgersLeft ? to_string(*ledgersLeft)
1649 : "unspecified");
1650 }
1651 }
1652 else
1653 {
1654 JLOG(m_journal.debug())
1655 << "Status other than success " << e.result;
1656 e.transaction->setStatus(INVALID);
1657 }
1658
1659 auto const enforceFailHard =
1660 e.failType == FailHard::yes && !isTesSuccess(e.result);
1661
1662 if (addLocal && !enforceFailHard)
1663 {
1664 m_localTX->push_back(
1665 m_ledgerMaster.getCurrentLedgerIndex(),
1666 e.transaction->getSTransaction());
1667 e.transaction->setKept();
1668 }
1669
1670 if ((e.applied ||
1671 ((mMode != OperatingMode::FULL) &&
1672 (e.failType != FailHard::yes) && e.local) ||
1673 (e.result == terQUEUED)) &&
1674 !enforceFailHard)
1675 {
1676 auto const toSkip =
1677 app_.getHashRouter().shouldRelay(e.transaction->getID());
1678 if (auto const sttx = *(e.transaction->getSTransaction());
1679 toSkip &&
1680 // Skip relaying if it's an inner batch txn and batch
1681 // feature is enabled
1682 !(sttx.isFlag(tfInnerBatchTxn) &&
1683 newOL->rules().enabled(featureBatch)))
1684 {
1685 protocol::TMTransaction tx;
1686 Serializer s;
1687
1688 sttx.add(s);
1689 tx.set_rawtransaction(s.data(), s.size());
1690 tx.set_status(protocol::tsCURRENT);
1691 tx.set_receivetimestamp(
1692 app_.timeKeeper().now().time_since_epoch().count());
1693 tx.set_deferred(e.result == terQUEUED);
1694 // FIXME: This should be when we received it
1695 app_.overlay().relay(e.transaction->getID(), tx, *toSkip);
1696 e.transaction->setBroadcast();
1697 }
1698 }
1699
1700 if (validatedLedgerIndex)
1701 {
1702 auto [fee, accountSeq, availableSeq] =
1703 app_.getTxQ().getTxRequiredFeeAndSeq(
1704 *newOL, e.transaction->getSTransaction());
1705 e.transaction->setCurrentLedgerState(
1706 *validatedLedgerIndex, fee, accountSeq, availableSeq);
1707 }
1708 }
1709 }
1710
1711 batchLock.lock();
1712
1713 for (TransactionStatus& e : transactions)
1714 e.transaction->clearApplying();
1715
1716 if (!submit_held.empty())
1717 {
1718 if (mTransactions.empty())
1719 mTransactions.swap(submit_held);
1720 else
1721 {
1722 mTransactions.reserve(mTransactions.size() + submit_held.size());
1723 for (auto& e : submit_held)
1724 mTransactions.push_back(std::move(e));
1725 }
1726 }
1727
1728 mCond.notify_all();
1729
1730 mDispatchState = DispatchState::none;
1731}
1732
1733//
1734// Owner functions
1735//
1736
1738NetworkOPsImp::getOwnerInfo(
1740 AccountID const& account)
1741{
1742 Json::Value jvObjects(Json::objectValue);
1743 auto root = keylet::ownerDir(account);
1744 auto sleNode = lpLedger->read(keylet::page(root));
1745 if (sleNode)
1746 {
1747 std::uint64_t uNodeDir;
1748
1749 do
1750 {
1751 for (auto const& uDirEntry : sleNode->getFieldV256(sfIndexes))
1752 {
1753 auto sleCur = lpLedger->read(keylet::child(uDirEntry));
1754 XRPL_ASSERT(
1755 sleCur,
1756 "ripple::NetworkOPsImp::getOwnerInfo : non-null child SLE");
1757
1758 switch (sleCur->getType())
1759 {
1760 case ltOFFER:
1761 if (!jvObjects.isMember(jss::offers))
1762 jvObjects[jss::offers] =
1764
1765 jvObjects[jss::offers].append(
1766 sleCur->getJson(JsonOptions::none));
1767 break;
1768
1769 case ltRIPPLE_STATE:
1770 if (!jvObjects.isMember(jss::ripple_lines))
1771 {
1772 jvObjects[jss::ripple_lines] =
1774 }
1775
1776 jvObjects[jss::ripple_lines].append(
1777 sleCur->getJson(JsonOptions::none));
1778 break;
1779
1780 case ltACCOUNT_ROOT:
1781 case ltDIR_NODE:
1782 // LCOV_EXCL_START
1783 default:
1784 UNREACHABLE(
1785 "ripple::NetworkOPsImp::getOwnerInfo : invalid "
1786 "type");
1787 break;
1788 // LCOV_EXCL_STOP
1789 }
1790 }
1791
1792 uNodeDir = sleNode->getFieldU64(sfIndexNext);
1793
1794 if (uNodeDir)
1795 {
1796 sleNode = lpLedger->read(keylet::page(root, uNodeDir));
1797 XRPL_ASSERT(
1798 sleNode,
1799 "ripple::NetworkOPsImp::getOwnerInfo : read next page");
1800 }
1801 } while (uNodeDir);
1802 }
1803
1804 return jvObjects;
1805}
1806
1807//
1808// Other
1809//
1810
1811inline bool
1812NetworkOPsImp::isBlocked()
1813{
1814 return isAmendmentBlocked() || isUNLBlocked();
1815}
1816
1817inline bool
1818NetworkOPsImp::isAmendmentBlocked()
1819{
1820 return amendmentBlocked_;
1821}
1822
1823void
1824NetworkOPsImp::setAmendmentBlocked()
1825{
1826 amendmentBlocked_ = true;
1827 setMode(OperatingMode::CONNECTED);
1828}
1829
1830inline bool
1831NetworkOPsImp::isAmendmentWarned()
1832{
1833 return !amendmentBlocked_ && amendmentWarned_;
1834}
1835
1836inline void
1837NetworkOPsImp::setAmendmentWarned()
1838{
1839 amendmentWarned_ = true;
1840}
1841
1842inline void
1843NetworkOPsImp::clearAmendmentWarned()
1844{
1845 amendmentWarned_ = false;
1846}
1847
1848inline bool
1849NetworkOPsImp::isUNLBlocked()
1850{
1851 return unlBlocked_;
1852}
1853
1854void
1855NetworkOPsImp::setUNLBlocked()
1856{
1857 unlBlocked_ = true;
1858 setMode(OperatingMode::CONNECTED);
1859}
1860
1861inline void
1862NetworkOPsImp::clearUNLBlocked()
1863{
1864 unlBlocked_ = false;
1865}
1866
1867bool
1868NetworkOPsImp::checkLastClosedLedger(
1869 Overlay::PeerSequence const& peerList,
1870 uint256& networkClosed)
1871{
1872 // Returns true if there's an *abnormal* ledger issue, normal changing in
1873 // TRACKING mode should return false. Do we have sufficient validations for
1874 // our last closed ledger? Or do sufficient nodes agree? And do we have no
1875 // better ledger available? If so, we are either tracking or full.
1876
1877 JLOG(m_journal.trace()) << "NetworkOPsImp::checkLastClosedLedger";
1878
1879 auto const ourClosed = m_ledgerMaster.getClosedLedger();
1880
1881 if (!ourClosed)
1882 return false;
1883
1884 uint256 closedLedger = ourClosed->info().hash;
1885 uint256 prevClosedLedger = ourClosed->info().parentHash;
1886 JLOG(m_journal.trace()) << "OurClosed: " << closedLedger;
1887 JLOG(m_journal.trace()) << "PrevClosed: " << prevClosedLedger;
1888
1889 //-------------------------------------------------------------------------
1890 // Determine preferred last closed ledger
1891
1892 auto& validations = app_.getValidations();
1893 JLOG(m_journal.debug())
1894 << "ValidationTrie " << Json::Compact(validations.getJsonTrie());
1895
1896 // Will rely on peer LCL if no trusted validations exist
1898 peerCounts[closedLedger] = 0;
1899 if (mMode >= OperatingMode::TRACKING)
1900 peerCounts[closedLedger]++;
1901
1902 for (auto& peer : peerList)
1903 {
1904 uint256 peerLedger = peer->getClosedLedgerHash();
1905
1906 if (peerLedger.isNonZero())
1907 ++peerCounts[peerLedger];
1908 }
1909
1910 for (auto const& it : peerCounts)
1911 JLOG(m_journal.debug()) << "L: " << it.first << " n=" << it.second;
1912
1913 uint256 preferredLCL = validations.getPreferredLCL(
1914 RCLValidatedLedger{ourClosed, validations.adaptor().journal()},
1915 m_ledgerMaster.getValidLedgerIndex(),
1916 peerCounts);
1917
1918 bool switchLedgers = preferredLCL != closedLedger;
1919 if (switchLedgers)
1920 closedLedger = preferredLCL;
1921 //-------------------------------------------------------------------------
1922 if (switchLedgers && (closedLedger == prevClosedLedger))
1923 {
1924 // don't switch to our own previous ledger
1925 JLOG(m_journal.info()) << "We won't switch to our own previous ledger";
1926 networkClosed = ourClosed->info().hash;
1927 switchLedgers = false;
1928 }
1929 else
1930 networkClosed = closedLedger;
1931
1932 if (!switchLedgers)
1933 return false;
1934
1935 auto consensus = m_ledgerMaster.getLedgerByHash(closedLedger);
1936
1937 if (!consensus)
1938 consensus = app_.getInboundLedgers().acquire(
1939 closedLedger, 0, InboundLedger::Reason::CONSENSUS);
1940
1941 if (consensus &&
1942 (!m_ledgerMaster.canBeCurrent(consensus) ||
1943 !m_ledgerMaster.isCompatible(
1944 *consensus, m_journal.debug(), "Not switching")))
1945 {
1946 // Don't switch to a ledger not on the validated chain
1947 // or with an invalid close time or sequence
1948 networkClosed = ourClosed->info().hash;
1949 return false;
1950 }
1951
1952 JLOG(m_journal.warn()) << "We are not running on the consensus ledger";
1953 JLOG(m_journal.info()) << "Our LCL: " << ourClosed->info().hash
1954 << getJson({*ourClosed, {}});
1955 JLOG(m_journal.info()) << "Net LCL " << closedLedger;
1956
1957 if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
1958 {
1959 setMode(OperatingMode::CONNECTED);
1960 }
1961
1962 if (consensus)
1963 {
1964 // FIXME: If this rewinds the ledger sequence, or has the same
1965 // sequence, we should update the status on any stored transactions
1966 // in the invalidated ledgers.
1967 switchLastClosedLedger(consensus);
1968 }
1969
1970 return true;
1971}
1972
1973void
1974NetworkOPsImp::switchLastClosedLedger(
1975 std::shared_ptr<Ledger const> const& newLCL)
1976{
1977 // set the newLCL as our last closed ledger -- this is abnormal code
1978 JLOG(m_journal.error())
1979 << "JUMP last closed ledger to " << newLCL->info().hash;
1980
1981 clearNeedNetworkLedger();
1982
1983 // Update fee computations.
1984 app_.getTxQ().processClosedLedger(app_, *newLCL, true);
1985
1986 // Caller must own master lock
1987 {
1988 // Apply tx in old open ledger to new
1989 // open ledger. Then apply local tx.
1990
1991 auto retries = m_localTX->getTxSet();
1992 auto const lastVal = app_.getLedgerMaster().getValidatedLedger();
1994 if (lastVal)
1995 rules = makeRulesGivenLedger(*lastVal, app_.config().features);
1996 else
1997 rules.emplace(app_.config().features);
1998 app_.openLedger().accept(
1999 app_,
2000 *rules,
2001 newLCL,
2002 OrderedTxs({}),
2003 false,
2004 retries,
2005 tapNONE,
2006 "jump",
2007 [&](OpenView& view, beast::Journal j) {
2008 // Stuff the ledger with transactions from the queue.
2009 return app_.getTxQ().accept(app_, view);
2010 });
2011 }
2012
2013 m_ledgerMaster.switchLCL(newLCL);
2014
2015 protocol::TMStatusChange s;
2016 s.set_newevent(protocol::neSWITCHED_LEDGER);
2017 s.set_ledgerseq(newLCL->info().seq);
2018 s.set_networktime(app_.timeKeeper().now().time_since_epoch().count());
2019 s.set_ledgerhashprevious(
2020 newLCL->info().parentHash.begin(), newLCL->info().parentHash.size());
2021 s.set_ledgerhash(newLCL->info().hash.begin(), newLCL->info().hash.size());
2022
2023 app_.overlay().foreach(
2024 send_always(std::make_shared<Message>(s, protocol::mtSTATUS_CHANGE)));
2025}
2026
2027bool
2028NetworkOPsImp::beginConsensus(
2029 uint256 const& networkClosed,
2031{
2032 XRPL_ASSERT(
2033 networkClosed.isNonZero(),
2034 "ripple::NetworkOPsImp::beginConsensus : nonzero input");
2035
2036 auto closingInfo = m_ledgerMaster.getCurrentLedger()->info();
2037
2038 JLOG(m_journal.info()) << "Consensus time for #" << closingInfo.seq
2039 << " with LCL " << closingInfo.parentHash;
2040
2041 auto prevLedger = m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);
2042
2043 if (!prevLedger)
2044 {
2045 // this shouldn't happen unless we jump ledgers
2046 if (mMode == OperatingMode::FULL)
2047 {
2048 JLOG(m_journal.warn()) << "Don't have LCL, going to tracking";
2049 setMode(OperatingMode::TRACKING);
2050 CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. ";
2051 }
2052
2053 CLOG(clog) << "beginConsensus no previous ledger. ";
2054 return false;
2055 }
2056
2057 XRPL_ASSERT(
2058 prevLedger->info().hash == closingInfo.parentHash,
2059 "ripple::NetworkOPsImp::beginConsensus : prevLedger hash matches "
2060 "parent");
2061 XRPL_ASSERT(
2062 closingInfo.parentHash == m_ledgerMaster.getClosedLedger()->info().hash,
2063 "ripple::NetworkOPsImp::beginConsensus : closedLedger parent matches "
2064 "hash");
2065
2066 app_.validators().setNegativeUNL(prevLedger->negativeUNL());
2067 TrustChanges const changes = app_.validators().updateTrusted(
2068 app_.getValidations().getCurrentNodeIDs(),
2069 closingInfo.parentCloseTime,
2070 *this,
2071 app_.overlay(),
2072 app_.getHashRouter());
2073
2074 if (!changes.added.empty() || !changes.removed.empty())
2075 {
2076 app_.getValidations().trustChanged(changes.added, changes.removed);
2077 // Update the AmendmentTable so it tracks the current validators.
2078 app_.getAmendmentTable().trustChanged(
2079 app_.validators().getQuorumKeys().second);
2080 }
2081
2082 mConsensus.startRound(
2083 app_.timeKeeper().closeTime(),
2084 networkClosed,
2085 prevLedger,
2086 changes.removed,
2087 changes.added,
2088 clog);
2089
2090 ConsensusPhase const currPhase = mConsensus.phase();
2091 if (mLastConsensusPhase != currPhase)
2092 {
2093 reportConsensusStateChange(currPhase);
2094 mLastConsensusPhase = currPhase;
2095 }
2096
2097 JLOG(m_journal.debug()) << "Initiating consensus engine";
2098 return true;
2099}
2100
2101bool
2102NetworkOPsImp::processTrustedProposal(RCLCxPeerPos peerPos)
2103{
2104 auto const& peerKey = peerPos.publicKey();
2105 if (validatorPK_ == peerKey || validatorMasterPK_ == peerKey)
2106 {
2107 // Could indicate a operator misconfiguration where two nodes are
2108 // running with the same validator key configured, so this isn't fatal,
2109 // and it doesn't necessarily indicate peer misbehavior. But since this
2110 // is a trusted message, it could be a very big deal. Either way, we
2111 // don't want to relay the proposal. Note that the byzantine behavior
2112 // detection in handleNewValidation will notify other peers.
2113 //
2114 // Another, innocuous explanation is unusual message routing and delays,
2115 // causing this node to receive its own messages back.
2116 JLOG(m_journal.error())
2117 << "Received a proposal signed by MY KEY from a peer. This may "
2118 "indicate a misconfiguration where another node has the same "
2119 "validator key, or may be caused by unusual message routing and "
2120 "delays.";
2121 return false;
2122 }
2123
2124 return mConsensus.peerProposal(app_.timeKeeper().closeTime(), peerPos);
2125}
2126
2127void
2128NetworkOPsImp::mapComplete(std::shared_ptr<SHAMap> const& map, bool fromAcquire)
2129{
2130 // We now have an additional transaction set
2131 // either created locally during the consensus process
2132 // or acquired from a peer
2133
2134 // Inform peers we have this set
2135 protocol::TMHaveTransactionSet msg;
2136 msg.set_hash(map->getHash().as_uint256().begin(), 256 / 8);
2137 msg.set_status(protocol::tsHAVE);
2138 app_.overlay().foreach(
2139 send_always(std::make_shared<Message>(msg, protocol::mtHAVE_SET)));
2140
2141 // We acquired it because consensus asked us to
2142 if (fromAcquire)
2143 mConsensus.gotTxSet(app_.timeKeeper().closeTime(), RCLTxSet{map});
2144}
2145
2146void
2147NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
2148{
2149 uint256 deadLedger = m_ledgerMaster.getClosedLedger()->info().parentHash;
2150
2151 for (auto const& it : app_.overlay().getActivePeers())
2152 {
2153 if (it && (it->getClosedLedgerHash() == deadLedger))
2154 {
2155 JLOG(m_journal.trace()) << "Killing obsolete peer status";
2156 it->cycleStatus();
2157 }
2158 }
2159
2160 uint256 networkClosed;
2161 bool ledgerChange =
2162 checkLastClosedLedger(app_.overlay().getActivePeers(), networkClosed);
2163
2164 if (networkClosed.isZero())
2165 {
2166 CLOG(clog) << "endConsensus last closed ledger is zero. ";
2167 return;
2168 }
2169
2170 // WRITEME: Unless we are in FULL and in the process of doing a consensus,
2171 // we must count how many nodes share our LCL, how many nodes disagree with
2172 // our LCL, and how many validations our LCL has. We also want to check
2173 // timing to make sure there shouldn't be a newer LCL. We need this
2174 // information to do the next three tests.
2175
2176 if (((mMode == OperatingMode::CONNECTED) ||
2177 (mMode == OperatingMode::SYNCING)) &&
2178 !ledgerChange)
2179 {
2180 // Count number of peers that agree with us and UNL nodes whose
2181 // validations we have for LCL. If the ledger is good enough, go to
2182 // TRACKING - TODO
2183 if (!needNetworkLedger_)
2184 setMode(OperatingMode::TRACKING);
2185 }
2186
2187 if (((mMode == OperatingMode::CONNECTED) ||
2188 (mMode == OperatingMode::TRACKING)) &&
2189 !ledgerChange)
2190 {
2191 // check if the ledger is good enough to go to FULL
2192 // Note: Do not go to FULL if we don't have the previous ledger
2193 // check if the ledger is bad enough to go to CONNECTE D -- TODO
2194 auto current = m_ledgerMaster.getCurrentLedger();
2195 if (app_.timeKeeper().now() < (current->info().parentCloseTime +
2196 2 * current->info().closeTimeResolution))
2197 {
2198 setMode(OperatingMode::FULL);
2199 }
2200 }
2201
2202 beginConsensus(networkClosed, clog);
2203}
2204
2205void
2206NetworkOPsImp::consensusViewChange()
2207{
2208 if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING))
2209 {
2210 setMode(OperatingMode::CONNECTED);
2211 }
2212}
2213
2214void
2215NetworkOPsImp::pubManifest(Manifest const& mo)
2216{
2217 // VFALCO consider std::shared_mutex
2218 std::lock_guard sl(mSubLock);
2219
2220 if (!mStreamMaps[sManifests].empty())
2221 {
2223
2224 jvObj[jss::type] = "manifestReceived";
2225 jvObj[jss::master_key] = toBase58(TokenType::NodePublic, mo.masterKey);
2226 if (mo.signingKey)
2227 jvObj[jss::signing_key] =
2228 toBase58(TokenType::NodePublic, *mo.signingKey);
2229 jvObj[jss::seq] = Json::UInt(mo.sequence);
2230 if (auto sig = mo.getSignature())
2231 jvObj[jss::signature] = strHex(*sig);
2232 jvObj[jss::master_signature] = strHex(mo.getMasterSignature());
2233 if (!mo.domain.empty())
2234 jvObj[jss::domain] = mo.domain;
2235 jvObj[jss::manifest] = strHex(mo.serialized);
2236
2237 for (auto i = mStreamMaps[sManifests].begin();
2238 i != mStreamMaps[sManifests].end();)
2239 {
2240 if (auto p = i->second.lock())
2241 {
2242 p->send(jvObj, true);
2243 ++i;
2244 }
2245 else
2246 {
2247 i = mStreamMaps[sManifests].erase(i);
2248 }
2249 }
2250 }
2251}
2252
2253NetworkOPsImp::ServerFeeSummary::ServerFeeSummary(
2254 XRPAmount fee,
2255 TxQ::Metrics&& escalationMetrics,
2256 LoadFeeTrack const& loadFeeTrack)
2257 : loadFactorServer{loadFeeTrack.getLoadFactor()}
2258 , loadBaseServer{loadFeeTrack.getLoadBase()}
2259 , baseFee{fee}
2260 , em{std::move(escalationMetrics)}
2261{
2262}
2263
2264bool
2266 NetworkOPsImp::ServerFeeSummary const& b) const
2267{
2268 if (loadFactorServer != b.loadFactorServer ||
2269 loadBaseServer != b.loadBaseServer || baseFee != b.baseFee ||
2270 em.has_value() != b.em.has_value())
2271 return true;
2272
2273 if (em && b.em)
2274 {
2275 return (
2276 em->minProcessingFeeLevel != b.em->minProcessingFeeLevel ||
2277 em->openLedgerFeeLevel != b.em->openLedgerFeeLevel ||
2278 em->referenceFeeLevel != b.em->referenceFeeLevel);
2279 }
2280
2281 return false;
2282}
2283
2284// Need to cap to uint64 to uint32 due to JSON limitations
2285static std::uint32_t
2287{
2289
2290 return std::min(max32, v);
2291};
2292
2293void
2295{
2296 // VFALCO TODO Don't hold the lock across calls to send...make a copy of the
2297 // list into a local array while holding the lock then release
2298 // the lock and call send on everyone.
2299 //
2301
2302 if (!mStreamMaps[sServer].empty())
2303 {
2305
2307 app_.openLedger().current()->fees().base,
2309 app_.getFeeTrack()};
2310
2311 jvObj[jss::type] = "serverStatus";
2312 jvObj[jss::server_status] = strOperatingMode();
2313 jvObj[jss::load_base] = f.loadBaseServer;
2314 jvObj[jss::load_factor_server] = f.loadFactorServer;
2315 jvObj[jss::base_fee] = f.baseFee.jsonClipped();
2316
2317 if (f.em)
2318 {
2319 auto const loadFactor = std::max(
2320 safe_cast<std::uint64_t>(f.loadFactorServer),
2321 mulDiv(
2322 f.em->openLedgerFeeLevel,
2323 f.loadBaseServer,
2324 f.em->referenceFeeLevel)
2326
2327 jvObj[jss::load_factor] = trunc32(loadFactor);
2328 jvObj[jss::load_factor_fee_escalation] =
2329 f.em->openLedgerFeeLevel.jsonClipped();
2330 jvObj[jss::load_factor_fee_queue] =
2331 f.em->minProcessingFeeLevel.jsonClipped();
2332 jvObj[jss::load_factor_fee_reference] =
2333 f.em->referenceFeeLevel.jsonClipped();
2334 }
2335 else
2336 jvObj[jss::load_factor] = f.loadFactorServer;
2337
2338 mLastFeeSummary = f;
2339
2340 for (auto i = mStreamMaps[sServer].begin();
2341 i != mStreamMaps[sServer].end();)
2342 {
2343 InfoSub::pointer p = i->second.lock();
2344
2345 // VFALCO TODO research the possibility of using thread queues and
2346 // linearizing the deletion of subscribers with the
2347 // sending of JSON data.
2348 if (p)
2349 {
2350 p->send(jvObj, true);
2351 ++i;
2352 }
2353 else
2354 {
2355 i = mStreamMaps[sServer].erase(i);
2356 }
2357 }
2358 }
2359}
2360
2361void
2363{
2365
2366 auto& streamMap = mStreamMaps[sConsensusPhase];
2367 if (!streamMap.empty())
2368 {
2370 jvObj[jss::type] = "consensusPhase";
2371 jvObj[jss::consensus] = to_string(phase);
2372
2373 for (auto i = streamMap.begin(); i != streamMap.end();)
2374 {
2375 if (auto p = i->second.lock())
2376 {
2377 p->send(jvObj, true);
2378 ++i;
2379 }
2380 else
2381 {
2382 i = streamMap.erase(i);
2383 }
2384 }
2385 }
2386}
2387
2388void
2390{
2391 // VFALCO consider std::shared_mutex
2393
2394 if (!mStreamMaps[sValidations].empty())
2395 {
2397
2398 auto const signerPublic = val->getSignerPublic();
2399
2400 jvObj[jss::type] = "validationReceived";
2401 jvObj[jss::validation_public_key] =
2402 toBase58(TokenType::NodePublic, signerPublic);
2403 jvObj[jss::ledger_hash] = to_string(val->getLedgerHash());
2404 jvObj[jss::signature] = strHex(val->getSignature());
2405 jvObj[jss::full] = val->isFull();
2406 jvObj[jss::flags] = val->getFlags();
2407 jvObj[jss::signing_time] = *(*val)[~sfSigningTime];
2408 jvObj[jss::data] = strHex(val->getSerializer().slice());
2409 jvObj[jss::network_id] = app_.config().NETWORK_ID;
2410
2411 if (auto version = (*val)[~sfServerVersion])
2412 jvObj[jss::server_version] = std::to_string(*version);
2413
2414 if (auto cookie = (*val)[~sfCookie])
2415 jvObj[jss::cookie] = std::to_string(*cookie);
2416
2417 if (auto hash = (*val)[~sfValidatedHash])
2418 jvObj[jss::validated_hash] = strHex(*hash);
2419
2420 auto const masterKey =
2421 app_.validatorManifests().getMasterKey(signerPublic);
2422
2423 if (masterKey != signerPublic)
2424 jvObj[jss::master_key] = toBase58(TokenType::NodePublic, masterKey);
2425
2426 // NOTE *seq is a number, but old API versions used string. We replace
2427 // number with a string using MultiApiJson near end of this function
2428 if (auto const seq = (*val)[~sfLedgerSequence])
2429 jvObj[jss::ledger_index] = *seq;
2430
2431 if (val->isFieldPresent(sfAmendments))
2432 {
2433 jvObj[jss::amendments] = Json::Value(Json::arrayValue);
2434 for (auto const& amendment : val->getFieldV256(sfAmendments))
2435 jvObj[jss::amendments].append(to_string(amendment));
2436 }
2437
2438 if (auto const closeTime = (*val)[~sfCloseTime])
2439 jvObj[jss::close_time] = *closeTime;
2440
2441 if (auto const loadFee = (*val)[~sfLoadFee])
2442 jvObj[jss::load_fee] = *loadFee;
2443
2444 if (auto const baseFee = val->at(~sfBaseFee))
2445 jvObj[jss::base_fee] = static_cast<double>(*baseFee);
2446
2447 if (auto const reserveBase = val->at(~sfReserveBase))
2448 jvObj[jss::reserve_base] = *reserveBase;
2449
2450 if (auto const reserveInc = val->at(~sfReserveIncrement))
2451 jvObj[jss::reserve_inc] = *reserveInc;
2452
2453 // (The ~ operator converts the Proxy to a std::optional, which
2454 // simplifies later operations)
2455 if (auto const baseFeeXRP = ~val->at(~sfBaseFeeDrops);
2456 baseFeeXRP && baseFeeXRP->native())
2457 jvObj[jss::base_fee] = baseFeeXRP->xrp().jsonClipped();
2458
2459 if (auto const reserveBaseXRP = ~val->at(~sfReserveBaseDrops);
2460 reserveBaseXRP && reserveBaseXRP->native())
2461 jvObj[jss::reserve_base] = reserveBaseXRP->xrp().jsonClipped();
2462
2463 if (auto const reserveIncXRP = ~val->at(~sfReserveIncrementDrops);
2464 reserveIncXRP && reserveIncXRP->native())
2465 jvObj[jss::reserve_inc] = reserveIncXRP->xrp().jsonClipped();
2466
2467 // NOTE Use MultiApiJson to publish two slightly different JSON objects
2468 // for consumers supporting different API versions
2469 MultiApiJson multiObj{jvObj};
2470 multiObj.visit(
2471 RPC::apiVersion<1>, //
2472 [](Json::Value& jvTx) {
2473 // Type conversion for older API versions to string
2474 if (jvTx.isMember(jss::ledger_index))
2475 {
2476 jvTx[jss::ledger_index] =
2477 std::to_string(jvTx[jss::ledger_index].asUInt());
2478 }
2479 });
2480
2481 for (auto i = mStreamMaps[sValidations].begin();
2482 i != mStreamMaps[sValidations].end();)
2483 {
2484 if (auto p = i->second.lock())
2485 {
2486 multiObj.visit(
2487 p->getApiVersion(), //
2488 [&](Json::Value const& jv) { p->send(jv, true); });
2489 ++i;
2490 }
2491 else
2492 {
2493 i = mStreamMaps[sValidations].erase(i);
2494 }
2495 }
2496 }
2497}
2498
2499void
2501{
2503
2504 if (!mStreamMaps[sPeerStatus].empty())
2505 {
2506 Json::Value jvObj(func());
2507
2508 jvObj[jss::type] = "peerStatusChange";
2509
2510 for (auto i = mStreamMaps[sPeerStatus].begin();
2511 i != mStreamMaps[sPeerStatus].end();)
2512 {
2513 InfoSub::pointer p = i->second.lock();
2514
2515 if (p)
2516 {
2517 p->send(jvObj, true);
2518 ++i;
2519 }
2520 else
2521 {
2522 i = mStreamMaps[sPeerStatus].erase(i);
2523 }
2524 }
2525 }
2526}
2527
2528void
2530{
2531 using namespace std::chrono_literals;
2532 if (om == OperatingMode::CONNECTED)
2533 {
2536 }
2537 else if (om == OperatingMode::SYNCING)
2538 {
2541 }
2542
2543 if ((om > OperatingMode::CONNECTED) && isBlocked())
2545
2546 if (mMode == om)
2547 return;
2548
2549 mMode = om;
2550
2551 accounting_.mode(om);
2552
2553 JLOG(m_journal.info()) << "STATE->" << strOperatingMode();
2554 pubServer();
2555}
2556
2557bool
2560 std::string const& source)
2561{
2562 JLOG(m_journal.trace())
2563 << "recvValidation " << val->getLedgerHash() << " from " << source;
2564
2566 BypassAccept bypassAccept = BypassAccept::no;
2567 try
2568 {
2569 if (pendingValidations_.contains(val->getLedgerHash()))
2570 bypassAccept = BypassAccept::yes;
2571 else
2572 pendingValidations_.insert(val->getLedgerHash());
2573 scope_unlock unlock(lock);
2574 handleNewValidation(app_, val, source, bypassAccept, m_journal);
2575 }
2576 catch (std::exception const& e)
2577 {
2578 JLOG(m_journal.warn())
2579 << "Exception thrown for handling new validation "
2580 << val->getLedgerHash() << ": " << e.what();
2581 }
2582 catch (...)
2583 {
2584 JLOG(m_journal.warn())
2585 << "Unknown exception thrown for handling new validation "
2586 << val->getLedgerHash();
2587 }
2588 if (bypassAccept == BypassAccept::no)
2589 {
2590 pendingValidations_.erase(val->getLedgerHash());
2591 }
2592 lock.unlock();
2593
2594 pubValidation(val);
2595
2596 JLOG(m_journal.debug()) << [this, &val]() -> auto {
2598 ss << "VALIDATION: " << val->render() << " master_key: ";
2599 auto master = app_.validators().getTrustedKey(val->getSignerPublic());
2600 if (master)
2601 {
2602 ss << toBase58(TokenType::NodePublic, *master);
2603 }
2604 else
2605 {
2606 ss << "none";
2607 }
2608 return ss.str();
2609 }();
2610
2611 // We will always relay trusted validations; if configured, we will
2612 // also relay all untrusted validations.
2613 return app_.config().RELAY_UNTRUSTED_VALIDATIONS == 1 || val->isTrusted();
2614}
2615
2618{
2619 return mConsensus.getJson(true);
2620}
2621
2623NetworkOPsImp::getServerInfo(bool human, bool admin, bool counters)
2624{
2626
2627 // System-level warnings
2628 {
2629 Json::Value warnings{Json::arrayValue};
2630 if (isAmendmentBlocked())
2631 {
2632 Json::Value& w = warnings.append(Json::objectValue);
2633 w[jss::id] = warnRPC_AMENDMENT_BLOCKED;
2634 w[jss::message] =
2635 "This server is amendment blocked, and must be updated to be "
2636 "able to stay in sync with the network.";
2637 }
2638 if (isUNLBlocked())
2639 {
2640 Json::Value& w = warnings.append(Json::objectValue);
2641 w[jss::id] = warnRPC_EXPIRED_VALIDATOR_LIST;
2642 w[jss::message] =
2643 "This server has an expired validator list. validators.txt "
2644 "may be incorrectly configured or some [validator_list_sites] "
2645 "may be unreachable.";
2646 }
2647 if (admin && isAmendmentWarned())
2648 {
2649 Json::Value& w = warnings.append(Json::objectValue);
2650 w[jss::id] = warnRPC_UNSUPPORTED_MAJORITY;
2651 w[jss::message] =
2652 "One or more unsupported amendments have reached majority. "
2653 "Upgrade to the latest version before they are activated "
2654 "to avoid being amendment blocked.";
2655 if (auto const expected =
2657 {
2658 auto& d = w[jss::details] = Json::objectValue;
2659 d[jss::expected_date] = expected->time_since_epoch().count();
2660 d[jss::expected_date_UTC] = to_string(*expected);
2661 }
2662 }
2663
2664 if (warnings.size())
2665 info[jss::warnings] = std::move(warnings);
2666 }
2667
2668 // hostid: unique string describing the machine
2669 if (human)
2670 info[jss::hostid] = getHostId(admin);
2671
2672 // domain: if configured with a domain, report it:
2673 if (!app_.config().SERVER_DOMAIN.empty())
2674 info[jss::server_domain] = app_.config().SERVER_DOMAIN;
2675
2676 info[jss::build_version] = BuildInfo::getVersionString();
2677
2678 info[jss::server_state] = strOperatingMode(admin);
2679
2680 info[jss::time] = to_string(std::chrono::floor<std::chrono::microseconds>(
2682
2684 info[jss::network_ledger] = "waiting";
2685
2686 info[jss::validation_quorum] =
2687 static_cast<Json::UInt>(app_.validators().quorum());
2688
2689 if (admin)
2690 {
2691 switch (app_.config().NODE_SIZE)
2692 {
2693 case 0:
2694 info[jss::node_size] = "tiny";
2695 break;
2696 case 1:
2697 info[jss::node_size] = "small";
2698 break;
2699 case 2:
2700 info[jss::node_size] = "medium";
2701 break;
2702 case 3:
2703 info[jss::node_size] = "large";
2704 break;
2705 case 4:
2706 info[jss::node_size] = "huge";
2707 break;
2708 }
2709
2710 auto when = app_.validators().expires();
2711
2712 if (!human)
2713 {
2714 if (when)
2715 info[jss::validator_list_expires] =
2716 safe_cast<Json::UInt>(when->time_since_epoch().count());
2717 else
2718 info[jss::validator_list_expires] = 0;
2719 }
2720 else
2721 {
2722 auto& x = (info[jss::validator_list] = Json::objectValue);
2723
2724 x[jss::count] = static_cast<Json::UInt>(app_.validators().count());
2725
2726 if (when)
2727 {
2728 if (*when == TimeKeeper::time_point::max())
2729 {
2730 x[jss::expiration] = "never";
2731 x[jss::status] = "active";
2732 }
2733 else
2734 {
2735 x[jss::expiration] = to_string(*when);
2736
2737 if (*when > app_.timeKeeper().now())
2738 x[jss::status] = "active";
2739 else
2740 x[jss::status] = "expired";
2741 }
2742 }
2743 else
2744 {
2745 x[jss::status] = "unknown";
2746 x[jss::expiration] = "unknown";
2747 }
2748 }
2749
2750#if defined(GIT_COMMIT_HASH) || defined(GIT_BRANCH)
2751 {
2752 auto& x = (info[jss::git] = Json::objectValue);
2753#ifdef GIT_COMMIT_HASH
2754 x[jss::hash] = GIT_COMMIT_HASH;
2755#endif
2756#ifdef GIT_BRANCH
2757 x[jss::branch] = GIT_BRANCH;
2758#endif
2759 }
2760#endif
2761 }
2762 info[jss::io_latency_ms] =
2763 static_cast<Json::UInt>(app_.getIOLatency().count());
2764
2765 if (admin)
2766 {
2767 if (auto const localPubKey = app_.validators().localPublicKey();
2768 localPubKey && app_.getValidationPublicKey())
2769 {
2770 info[jss::pubkey_validator] =
2771 toBase58(TokenType::NodePublic, localPubKey.value());
2772 }
2773 else
2774 {
2775 info[jss::pubkey_validator] = "none";
2776 }
2777 }
2778
2779 if (counters)
2780 {
2781 info[jss::counters] = app_.getPerfLog().countersJson();
2782
2783 Json::Value nodestore(Json::objectValue);
2784 app_.getNodeStore().getCountsJson(nodestore);
2785 info[jss::counters][jss::nodestore] = nodestore;
2786 info[jss::current_activities] = app_.getPerfLog().currentJson();
2787 }
2788
2789 info[jss::pubkey_node] =
2791
2792 info[jss::complete_ledgers] = app_.getLedgerMaster().getCompleteLedgers();
2793
2795 info[jss::amendment_blocked] = true;
2796
2797 auto const fp = m_ledgerMaster.getFetchPackCacheSize();
2798
2799 if (fp != 0)
2800 info[jss::fetch_pack] = Json::UInt(fp);
2801
2802 info[jss::peers] = Json::UInt(app_.overlay().size());
2803
2804 Json::Value lastClose = Json::objectValue;
2805 lastClose[jss::proposers] = Json::UInt(mConsensus.prevProposers());
2806
2807 if (human)
2808 {
2809 lastClose[jss::converge_time_s] =
2811 }
2812 else
2813 {
2814 lastClose[jss::converge_time] =
2816 }
2817
2818 info[jss::last_close] = lastClose;
2819
2820 // info[jss::consensus] = mConsensus.getJson();
2821
2822 if (admin)
2823 info[jss::load] = m_job_queue.getJson();
2824
2825 if (auto const netid = app_.overlay().networkID())
2826 info[jss::network_id] = static_cast<Json::UInt>(*netid);
2827
2828 auto const escalationMetrics =
2830
2831 auto const loadFactorServer = app_.getFeeTrack().getLoadFactor();
2832 auto const loadBaseServer = app_.getFeeTrack().getLoadBase();
2833 /* Scale the escalated fee level to unitless "load factor".
2834 In practice, this just strips the units, but it will continue
2835 to work correctly if either base value ever changes. */
2836 auto const loadFactorFeeEscalation =
2837 mulDiv(
2838 escalationMetrics.openLedgerFeeLevel,
2839 loadBaseServer,
2840 escalationMetrics.referenceFeeLevel)
2842
2843 auto const loadFactor = std::max(
2844 safe_cast<std::uint64_t>(loadFactorServer), loadFactorFeeEscalation);
2845
2846 if (!human)
2847 {
2848 info[jss::load_base] = loadBaseServer;
2849 info[jss::load_factor] = trunc32(loadFactor);
2850 info[jss::load_factor_server] = loadFactorServer;
2851
2852 /* Json::Value doesn't support uint64, so clamp to max
2853 uint32 value. This is mostly theoretical, since there
2854 probably isn't enough extant XRP to drive the factor
2855 that high.
2856 */
2857 info[jss::load_factor_fee_escalation] =
2858 escalationMetrics.openLedgerFeeLevel.jsonClipped();
2859 info[jss::load_factor_fee_queue] =
2860 escalationMetrics.minProcessingFeeLevel.jsonClipped();
2861 info[jss::load_factor_fee_reference] =
2862 escalationMetrics.referenceFeeLevel.jsonClipped();
2863 }
2864 else
2865 {
2866 info[jss::load_factor] =
2867 static_cast<double>(loadFactor) / loadBaseServer;
2868
2869 if (loadFactorServer != loadFactor)
2870 info[jss::load_factor_server] =
2871 static_cast<double>(loadFactorServer) / loadBaseServer;
2872
2873 if (admin)
2874 {
2876 if (fee != loadBaseServer)
2877 info[jss::load_factor_local] =
2878 static_cast<double>(fee) / loadBaseServer;
2879 fee = app_.getFeeTrack().getRemoteFee();
2880 if (fee != loadBaseServer)
2881 info[jss::load_factor_net] =
2882 static_cast<double>(fee) / loadBaseServer;
2883 fee = app_.getFeeTrack().getClusterFee();
2884 if (fee != loadBaseServer)
2885 info[jss::load_factor_cluster] =
2886 static_cast<double>(fee) / loadBaseServer;
2887 }
2888 if (escalationMetrics.openLedgerFeeLevel !=
2889 escalationMetrics.referenceFeeLevel &&
2890 (admin || loadFactorFeeEscalation != loadFactor))
2891 info[jss::load_factor_fee_escalation] =
2892 escalationMetrics.openLedgerFeeLevel.decimalFromReference(
2893 escalationMetrics.referenceFeeLevel);
2894 if (escalationMetrics.minProcessingFeeLevel !=
2895 escalationMetrics.referenceFeeLevel)
2896 info[jss::load_factor_fee_queue] =
2897 escalationMetrics.minProcessingFeeLevel.decimalFromReference(
2898 escalationMetrics.referenceFeeLevel);
2899 }
2900
2901 bool valid = false;
2902 auto lpClosed = m_ledgerMaster.getValidatedLedger();
2903
2904 if (lpClosed)
2905 valid = true;
2906 else
2907 lpClosed = m_ledgerMaster.getClosedLedger();
2908
2909 if (lpClosed)
2910 {
2911 XRPAmount const baseFee = lpClosed->fees().base;
2913 l[jss::seq] = Json::UInt(lpClosed->info().seq);
2914 l[jss::hash] = to_string(lpClosed->info().hash);
2915
2916 if (!human)
2917 {
2918 l[jss::base_fee] = baseFee.jsonClipped();
2919 l[jss::reserve_base] = lpClosed->fees().reserve.jsonClipped();
2920 l[jss::reserve_inc] = lpClosed->fees().increment.jsonClipped();
2921 l[jss::close_time] = Json::Value::UInt(
2922 lpClosed->info().closeTime.time_since_epoch().count());
2923 }
2924 else
2925 {
2926 l[jss::base_fee_xrp] = baseFee.decimalXRP();
2927 l[jss::reserve_base_xrp] = lpClosed->fees().reserve.decimalXRP();
2928 l[jss::reserve_inc_xrp] = lpClosed->fees().increment.decimalXRP();
2929
2930 if (auto const closeOffset = app_.timeKeeper().closeOffset();
2931 std::abs(closeOffset.count()) >= 60)
2932 l[jss::close_time_offset] =
2933 static_cast<std::uint32_t>(closeOffset.count());
2934
2935 constexpr std::chrono::seconds highAgeThreshold{1000000};
2937 {
2938 auto const age = m_ledgerMaster.getValidatedLedgerAge();
2939 l[jss::age] =
2940 Json::UInt(age < highAgeThreshold ? age.count() : 0);
2941 }
2942 else
2943 {
2944 auto lCloseTime = lpClosed->info().closeTime;
2945 auto closeTime = app_.timeKeeper().closeTime();
2946 if (lCloseTime <= closeTime)
2947 {
2948 using namespace std::chrono_literals;
2949 auto age = closeTime - lCloseTime;
2950 l[jss::age] =
2951 Json::UInt(age < highAgeThreshold ? age.count() : 0);
2952 }
2953 }
2954 }
2955
2956 if (valid)
2957 info[jss::validated_ledger] = l;
2958 else
2959 info[jss::closed_ledger] = l;
2960
2961 auto lpPublished = m_ledgerMaster.getPublishedLedger();
2962 if (!lpPublished)
2963 info[jss::published_ledger] = "none";
2964 else if (lpPublished->info().seq != lpClosed->info().seq)
2965 info[jss::published_ledger] = lpPublished->info().seq;
2966 }
2967
2968 accounting_.json(info);
2969 info[jss::uptime] = UptimeClock::now().time_since_epoch().count();
2970 info[jss::jq_trans_overflow] =
2972 info[jss::peer_disconnects] =
2974 info[jss::peer_disconnects_resources] =
2976
2977 // This array must be sorted in increasing order.
2978 static constexpr std::array<std::string_view, 7> protocols{
2979 "http", "https", "peer", "ws", "ws2", "wss", "wss2"};
2980 static_assert(std::is_sorted(std::begin(protocols), std::end(protocols)));
2981 {
2983 for (auto const& port : app_.getServerHandler().setup().ports)
2984 {
2985 // Don't publish admin ports for non-admin users
2986 if (!admin &&
2987 !(port.admin_nets_v4.empty() && port.admin_nets_v6.empty() &&
2988 port.admin_user.empty() && port.admin_password.empty()))
2989 continue;
2992 std::begin(port.protocol),
2993 std::end(port.protocol),
2994 std::begin(protocols),
2995 std::end(protocols),
2996 std::back_inserter(proto));
2997 if (!proto.empty())
2998 {
2999 auto& jv = ports.append(Json::Value(Json::objectValue));
3000 jv[jss::port] = std::to_string(port.port);
3001 jv[jss::protocol] = Json::Value{Json::arrayValue};
3002 for (auto const& p : proto)
3003 jv[jss::protocol].append(p);
3004 }
3005 }
3006
3007 if (app_.config().exists(SECTION_PORT_GRPC))
3008 {
3009 auto const& grpcSection = app_.config().section(SECTION_PORT_GRPC);
3010 auto const optPort = grpcSection.get("port");
3011 if (optPort && grpcSection.get("ip"))
3012 {
3013 auto& jv = ports.append(Json::Value(Json::objectValue));
3014 jv[jss::port] = *optPort;
3015 jv[jss::protocol] = Json::Value{Json::arrayValue};
3016 jv[jss::protocol].append("grpc");
3017 }
3018 }
3019 info[jss::ports] = std::move(ports);
3020 }
3021
3022 return info;
3023}
3024
3025void
3030
3036
3037void
3039 std::shared_ptr<ReadView const> const& ledger,
3040 std::shared_ptr<STTx const> const& transaction,
3041 TER result)
3042{
3043 // never publish an inner txn inside a batch txn
3044 if (transaction->isFlag(tfInnerBatchTxn) &&
3045 ledger->rules().enabled(featureBatch))
3046 return;
3047
3048 MultiApiJson jvObj =
3049 transJson(transaction, result, false, ledger, std::nullopt);
3050
3051 {
3053
3054 auto it = mStreamMaps[sRTTransactions].begin();
3055 while (it != mStreamMaps[sRTTransactions].end())
3056 {
3057 InfoSub::pointer p = it->second.lock();
3058
3059 if (p)
3060 {
3061 jvObj.visit(
3062 p->getApiVersion(), //
3063 [&](Json::Value const& jv) { p->send(jv, true); });
3064 ++it;
3065 }
3066 else
3067 {
3068 it = mStreamMaps[sRTTransactions].erase(it);
3069 }
3070 }
3071 }
3072
3073 pubProposedAccountTransaction(ledger, transaction, result);
3074}
3075
3076void
3078{
3079 // Ledgers are published only when they acquire sufficient validations
3080 // Holes are filled across connection loss or other catastrophe
3081
3083 app_.getAcceptedLedgerCache().fetch(lpAccepted->info().hash);
3084 if (!alpAccepted)
3085 {
3086 alpAccepted = std::make_shared<AcceptedLedger>(lpAccepted, app_);
3087 app_.getAcceptedLedgerCache().canonicalize_replace_client(
3088 lpAccepted->info().hash, alpAccepted);
3089 }
3090
3091 XRPL_ASSERT(
3092 alpAccepted->getLedger().get() == lpAccepted.get(),
3093 "ripple::NetworkOPsImp::pubLedger : accepted input");
3094
3095 {
3096 JLOG(m_journal.debug())
3097 << "Publishing ledger " << lpAccepted->info().seq << " "
3098 << lpAccepted->info().hash;
3099
3101
3102 if (!mStreamMaps[sLedger].empty())
3103 {
3105
3106 jvObj[jss::type] = "ledgerClosed";
3107 jvObj[jss::ledger_index] = lpAccepted->info().seq;
3108 jvObj[jss::ledger_hash] = to_string(lpAccepted->info().hash);
3109 jvObj[jss::ledger_time] = Json::Value::UInt(
3110 lpAccepted->info().closeTime.time_since_epoch().count());
3111
3112 jvObj[jss::network_id] = app_.config().NETWORK_ID;
3113
3114 if (!lpAccepted->rules().enabled(featureXRPFees))
3115 jvObj[jss::fee_ref] = Config::FEE_UNITS_DEPRECATED;
3116 jvObj[jss::fee_base] = lpAccepted->fees().base.jsonClipped();
3117 jvObj[jss::reserve_base] = lpAccepted->fees().reserve.jsonClipped();
3118 jvObj[jss::reserve_inc] =
3119 lpAccepted->fees().increment.jsonClipped();
3120
3121 jvObj[jss::txn_count] = Json::UInt(alpAccepted->size());
3122
3124 {
3125 jvObj[jss::validated_ledgers] =
3127 }
3128
3129 auto it = mStreamMaps[sLedger].begin();
3130 while (it != mStreamMaps[sLedger].end())
3131 {
3132 InfoSub::pointer p = it->second.lock();
3133 if (p)
3134 {
3135 p->send(jvObj, true);
3136 ++it;
3137 }
3138 else
3139 it = mStreamMaps[sLedger].erase(it);
3140 }
3141 }
3142
3143 if (!mStreamMaps[sBookChanges].empty())
3144 {
3145 Json::Value jvObj = ripple::RPC::computeBookChanges(lpAccepted);
3146
3147 auto it = mStreamMaps[sBookChanges].begin();
3148 while (it != mStreamMaps[sBookChanges].end())
3149 {
3150 InfoSub::pointer p = it->second.lock();
3151 if (p)
3152 {
3153 p->send(jvObj, true);
3154 ++it;
3155 }
3156 else
3157 it = mStreamMaps[sBookChanges].erase(it);
3158 }
3159 }
3160
3161 {
3162 static bool firstTime = true;
3163 if (firstTime)
3164 {
3165 // First validated ledger, start delayed SubAccountHistory
3166 firstTime = false;
3167 for (auto& outer : mSubAccountHistory)
3168 {
3169 for (auto& inner : outer.second)
3170 {
3171 auto& subInfo = inner.second;
3172 if (subInfo.index_->separationLedgerSeq_ == 0)
3173 {
3175 alpAccepted->getLedger(), subInfo);
3176 }
3177 }
3178 }
3179 }
3180 }
3181 }
3182
3183 // Don't lock since pubAcceptedTransaction is locking.
3184 for (auto const& accTx : *alpAccepted)
3185 {
3186 JLOG(m_journal.trace()) << "pubAccepted: " << accTx->getJson();
3188 lpAccepted, *accTx, accTx == *(--alpAccepted->end()));
3189 }
3190}
3191
3192void
3194{
3196 app_.openLedger().current()->fees().base,
3198 app_.getFeeTrack()};
3199
3200 // only schedule the job if something has changed
3201 if (f != mLastFeeSummary)
3202 {
3204 jtCLIENT_FEE_CHANGE, "reportFeeChange->pubServer", [this]() {
3205 pubServer();
3206 });
3207 }
3208}
3209
3210void
3212{
3215 "reportConsensusStateChange->pubConsensus",
3216 [this, phase]() { pubConsensus(phase); });
3217}
3218
3219inline void
3221{
3222 m_localTX->sweep(view);
3223}
3224inline std::size_t
3226{
3227 return m_localTX->size();
3228}
3229
3230// This routine should only be used to publish accepted or validated
3231// transactions.
3234 std::shared_ptr<STTx const> const& transaction,
3235 TER result,
3236 bool validated,
3237 std::shared_ptr<ReadView const> const& ledger,
3239{
3241 std::string sToken;
3242 std::string sHuman;
3243
3244 transResultInfo(result, sToken, sHuman);
3245
3246 jvObj[jss::type] = "transaction";
3247 // NOTE jvObj is not a finished object for either API version. After
3248 // it's populated, we need to finish it for a specific API version. This is
3249 // done in a loop, near the end of this function.
3250 jvObj[jss::transaction] =
3251 transaction->getJson(JsonOptions::disable_API_prior_V2, false);
3252
3253 if (meta)
3254 {
3255 jvObj[jss::meta] = meta->get().getJson(JsonOptions::none);
3257 jvObj[jss::meta], *ledger, transaction, meta->get());
3258 RPC::insertNFTSyntheticInJson(jvObj, transaction, meta->get());
3260 jvObj[jss::meta], transaction, meta->get());
3261 }
3262
3263 // add CTID where the needed data for it exists
3264 if (auto const& lookup = ledger->txRead(transaction->getTransactionID());
3265 lookup.second && lookup.second->isFieldPresent(sfTransactionIndex))
3266 {
3267 uint32_t const txnSeq = lookup.second->getFieldU32(sfTransactionIndex);
3268 uint32_t netID = app_.config().NETWORK_ID;
3269 if (transaction->isFieldPresent(sfNetworkID))
3270 netID = transaction->getFieldU32(sfNetworkID);
3271
3273 RPC::encodeCTID(ledger->info().seq, txnSeq, netID);
3274 ctid)
3275 jvObj[jss::ctid] = *ctid;
3276 }
3277 if (!ledger->open())
3278 jvObj[jss::ledger_hash] = to_string(ledger->info().hash);
3279
3280 if (validated)
3281 {
3282 jvObj[jss::ledger_index] = ledger->info().seq;
3283 jvObj[jss::transaction][jss::date] =
3284 ledger->info().closeTime.time_since_epoch().count();
3285 jvObj[jss::validated] = true;
3286 jvObj[jss::close_time_iso] = to_string_iso(ledger->info().closeTime);
3287
3288 // WRITEME: Put the account next seq here
3289 }
3290 else
3291 {
3292 jvObj[jss::validated] = false;
3293 jvObj[jss::ledger_current_index] = ledger->info().seq;
3294 }
3295
3296 jvObj[jss::status] = validated ? "closed" : "proposed";
3297 jvObj[jss::engine_result] = sToken;
3298 jvObj[jss::engine_result_code] = result;
3299 jvObj[jss::engine_result_message] = sHuman;
3300
3301 if (transaction->getTxnType() == ttOFFER_CREATE)
3302 {
3303 auto const account = transaction->getAccountID(sfAccount);
3304 auto const amount = transaction->getFieldAmount(sfTakerGets);
3305
3306 // If the offer create is not self funded then add the owner balance
3307 if (account != amount.issue().account)
3308 {
3309 auto const ownerFunds = accountFunds(
3310 *ledger,
3311 account,
3312 amount,
3314 app_.journal("View"));
3315 jvObj[jss::transaction][jss::owner_funds] = ownerFunds.getText();
3316 }
3317 }
3318
3319 std::string const hash = to_string(transaction->getTransactionID());
3320 MultiApiJson multiObj{jvObj};
3322 multiObj.visit(), //
3323 [&]<unsigned Version>(
3325 RPC::insertDeliverMax(
3326 jvTx[jss::transaction], transaction->getTxnType(), Version);
3327
3328 if constexpr (Version > 1)
3329 {
3330 jvTx[jss::tx_json] = jvTx.removeMember(jss::transaction);
3331 jvTx[jss::hash] = hash;
3332 }
3333 else
3334 {
3335 jvTx[jss::transaction][jss::hash] = hash;
3336 }
3337 });
3338
3339 return multiObj;
3340}
3341
3342void
3344 std::shared_ptr<ReadView const> const& ledger,
3345 AcceptedLedgerTx const& transaction,
3346 bool last)
3347{
3348 auto const& stTxn = transaction.getTxn();
3349
3350 // Create two different Json objects, for different API versions
3351 auto const metaRef = std::ref(transaction.getMeta());
3352 auto const trResult = transaction.getResult();
3353 MultiApiJson jvObj = transJson(stTxn, trResult, true, ledger, metaRef);
3354
3355 {
3357
3358 auto it = mStreamMaps[sTransactions].begin();
3359 while (it != mStreamMaps[sTransactions].end())
3360 {
3361 InfoSub::pointer p = it->second.lock();
3362
3363 if (p)
3364 {
3365 jvObj.visit(
3366 p->getApiVersion(), //
3367 [&](Json::Value const& jv) { p->send(jv, true); });
3368 ++it;
3369 }
3370 else
3371 it = mStreamMaps[sTransactions].erase(it);
3372 }
3373
3374 it = mStreamMaps[sRTTransactions].begin();
3375
3376 while (it != mStreamMaps[sRTTransactions].end())
3377 {
3378 InfoSub::pointer p = it->second.lock();
3379
3380 if (p)
3381 {
3382 jvObj.visit(
3383 p->getApiVersion(), //
3384 [&](Json::Value const& jv) { p->send(jv, true); });
3385 ++it;
3386 }
3387 else
3388 it = mStreamMaps[sRTTransactions].erase(it);
3389 }
3390 }
3391
3392 if (transaction.getResult() == tesSUCCESS)
3393 app_.getOrderBookDB().processTxn(ledger, transaction, jvObj);
3394
3395 pubAccountTransaction(ledger, transaction, last);
3396}
3397
3398void
3400 std::shared_ptr<ReadView const> const& ledger,
3401 AcceptedLedgerTx const& transaction,
3402 bool last)
3403{
3405 int iProposed = 0;
3406 int iAccepted = 0;
3407
3408 std::vector<SubAccountHistoryInfo> accountHistoryNotify;
3409 auto const currLedgerSeq = ledger->seq();
3410 {
3412
3413 if (!mSubAccount.empty() || !mSubRTAccount.empty() ||
3415 {
3416 for (auto const& affectedAccount : transaction.getAffected())
3417 {
3418 if (auto simiIt = mSubRTAccount.find(affectedAccount);
3419 simiIt != mSubRTAccount.end())
3420 {
3421 auto it = simiIt->second.begin();
3422
3423 while (it != simiIt->second.end())
3424 {
3425 InfoSub::pointer p = it->second.lock();
3426
3427 if (p)
3428 {
3429 notify.insert(p);
3430 ++it;
3431 ++iProposed;
3432 }
3433 else
3434 it = simiIt->second.erase(it);
3435 }
3436 }
3437
3438 if (auto simiIt = mSubAccount.find(affectedAccount);
3439 simiIt != mSubAccount.end())
3440 {
3441 auto it = simiIt->second.begin();
3442 while (it != simiIt->second.end())
3443 {
3444 InfoSub::pointer p = it->second.lock();
3445
3446 if (p)
3447 {
3448 notify.insert(p);
3449 ++it;
3450 ++iAccepted;
3451 }
3452 else
3453 it = simiIt->second.erase(it);
3454 }
3455 }
3456
3457 if (auto histoIt = mSubAccountHistory.find(affectedAccount);
3458 histoIt != mSubAccountHistory.end())
3459 {
3460 auto& subs = histoIt->second;
3461 auto it = subs.begin();
3462 while (it != subs.end())
3463 {
3464 SubAccountHistoryInfoWeak const& info = it->second;
3465 if (currLedgerSeq <= info.index_->separationLedgerSeq_)
3466 {
3467 ++it;
3468 continue;
3469 }
3470
3471 if (auto isSptr = info.sinkWptr_.lock(); isSptr)
3472 {
3473 accountHistoryNotify.emplace_back(
3474 SubAccountHistoryInfo{isSptr, info.index_});
3475 ++it;
3476 }
3477 else
3478 {
3479 it = subs.erase(it);
3480 }
3481 }
3482 if (subs.empty())
3483 mSubAccountHistory.erase(histoIt);
3484 }
3485 }
3486 }
3487 }
3488
3489 JLOG(m_journal.trace())
3490 << "pubAccountTransaction: "
3491 << "proposed=" << iProposed << ", accepted=" << iAccepted;
3492
3493 if (!notify.empty() || !accountHistoryNotify.empty())
3494 {
3495 auto const& stTxn = transaction.getTxn();
3496
3497 // Create two different Json objects, for different API versions
3498 auto const metaRef = std::ref(transaction.getMeta());
3499 auto const trResult = transaction.getResult();
3500 MultiApiJson jvObj = transJson(stTxn, trResult, true, ledger, metaRef);
3501
3502 for (InfoSub::ref isrListener : notify)
3503 {
3504 jvObj.visit(
3505 isrListener->getApiVersion(), //
3506 [&](Json::Value const& jv) { isrListener->send(jv, true); });
3507 }
3508
3509 if (last)
3510 jvObj.set(jss::account_history_boundary, true);
3511
3512 XRPL_ASSERT(
3513 jvObj.isMember(jss::account_history_tx_stream) ==
3515 "ripple::NetworkOPsImp::pubAccountTransaction : "
3516 "account_history_tx_stream not set");
3517 for (auto& info : accountHistoryNotify)
3518 {
3519 auto& index = info.index_;
3520 if (index->forwardTxIndex_ == 0 && !index->haveHistorical_)
3521 jvObj.set(jss::account_history_tx_first, true);
3522
3523 jvObj.set(jss::account_history_tx_index, index->forwardTxIndex_++);
3524
3525 jvObj.visit(
3526 info.sink_->getApiVersion(), //
3527 [&](Json::Value const& jv) { info.sink_->send(jv, true); });
3528 }
3529 }
3530}
3531
3532void
3534 std::shared_ptr<ReadView const> const& ledger,
3536 TER result)
3537{
3539 int iProposed = 0;
3540
3541 std::vector<SubAccountHistoryInfo> accountHistoryNotify;
3542
3543 {
3545
3546 if (mSubRTAccount.empty())
3547 return;
3548
3549 if (!mSubAccount.empty() || !mSubRTAccount.empty() ||
3551 {
3552 for (auto const& affectedAccount : tx->getMentionedAccounts())
3553 {
3554 if (auto simiIt = mSubRTAccount.find(affectedAccount);
3555 simiIt != mSubRTAccount.end())
3556 {
3557 auto it = simiIt->second.begin();
3558
3559 while (it != simiIt->second.end())
3560 {
3561 InfoSub::pointer p = it->second.lock();
3562
3563 if (p)
3564 {
3565 notify.insert(p);
3566 ++it;
3567 ++iProposed;
3568 }
3569 else
3570 it = simiIt->second.erase(it);
3571 }
3572 }
3573 }
3574 }
3575 }
3576
3577 JLOG(m_journal.trace()) << "pubProposedAccountTransaction: " << iProposed;
3578
3579 if (!notify.empty() || !accountHistoryNotify.empty())
3580 {
3581 // Create two different Json objects, for different API versions
3582 MultiApiJson jvObj = transJson(tx, result, false, ledger, std::nullopt);
3583
3584 for (InfoSub::ref isrListener : notify)
3585 jvObj.visit(
3586 isrListener->getApiVersion(), //
3587 [&](Json::Value const& jv) { isrListener->send(jv, true); });
3588
3589 XRPL_ASSERT(
3590 jvObj.isMember(jss::account_history_tx_stream) ==
3592 "ripple::NetworkOPs::pubProposedAccountTransaction : "
3593 "account_history_tx_stream not set");
3594 for (auto& info : accountHistoryNotify)
3595 {
3596 auto& index = info.index_;
3597 if (index->forwardTxIndex_ == 0 && !index->haveHistorical_)
3598 jvObj.set(jss::account_history_tx_first, true);
3599 jvObj.set(jss::account_history_tx_index, index->forwardTxIndex_++);
3600 jvObj.visit(
3601 info.sink_->getApiVersion(), //
3602 [&](Json::Value const& jv) { info.sink_->send(jv, true); });
3603 }
3604 }
3605}
3606
3607//
3608// Monitoring
3609//
3610
3611void
3613 InfoSub::ref isrListener,
3614 hash_set<AccountID> const& vnaAccountIDs,
3615 bool rt)
3616{
3617 SubInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount;
3618
3619 for (auto const& naAccountID : vnaAccountIDs)
3620 {
3621 JLOG(m_journal.trace())
3622 << "subAccount: account: " << toBase58(naAccountID);
3623
3624 isrListener->insertSubAccountInfo(naAccountID, rt);
3625 }
3626
3628
3629 for (auto const& naAccountID : vnaAccountIDs)
3630 {
3631 auto simIterator = subMap.find(naAccountID);
3632 if (simIterator == subMap.end())
3633 {
3634 // Not found, note that account has a new single listner.
3635 SubMapType usisElement;
3636 usisElement[isrListener->getSeq()] = isrListener;
3637 // VFALCO NOTE This is making a needless copy of naAccountID
3638 subMap.insert(simIterator, make_pair(naAccountID, usisElement));
3639 }
3640 else
3641 {
3642 // Found, note that the account has another listener.
3643 simIterator->second[isrListener->getSeq()] = isrListener;
3644 }
3645 }
3646}
3647
3648void
3650 InfoSub::ref isrListener,
3651 hash_set<AccountID> const& vnaAccountIDs,
3652 bool rt)
3653{
3654 for (auto const& naAccountID : vnaAccountIDs)
3655 {
3656 // Remove from the InfoSub
3657 isrListener->deleteSubAccountInfo(naAccountID, rt);
3658 }
3659
3660 // Remove from the server
3661 unsubAccountInternal(isrListener->getSeq(), vnaAccountIDs, rt);
3662}
3663
3664void
3666 std::uint64_t uSeq,
3667 hash_set<AccountID> const& vnaAccountIDs,
3668 bool rt)
3669{
3671
3672 SubInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount;
3673
3674 for (auto const& naAccountID : vnaAccountIDs)
3675 {
3676 auto simIterator = subMap.find(naAccountID);
3677
3678 if (simIterator != subMap.end())
3679 {
3680 // Found
3681 simIterator->second.erase(uSeq);
3682
3683 if (simIterator->second.empty())
3684 {
3685 // Don't need hash entry.
3686 subMap.erase(simIterator);
3687 }
3688 }
3689 }
3690}
3691
3692void
3694{
3695 enum DatabaseType { Sqlite, None };
3696 static auto const databaseType = [&]() -> DatabaseType {
3697 // Use a dynamic_cast to return DatabaseType::None
3698 // on failure.
3699 if (dynamic_cast<SQLiteDatabase*>(&app_.getRelationalDatabase()))
3700 {
3701 return DatabaseType::Sqlite;
3702 }
3703 return DatabaseType::None;
3704 }();
3705
3706 if (databaseType == DatabaseType::None)
3707 {
3708 // LCOV_EXCL_START
3709 UNREACHABLE(
3710 "ripple::NetworkOPsImp::addAccountHistoryJob : no database");
3711 JLOG(m_journal.error())
3712 << "AccountHistory job for account "
3713 << toBase58(subInfo.index_->accountId_) << " no database";
3714 if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
3715 {
3716 sptr->send(rpcError(rpcINTERNAL), true);
3717 unsubAccountHistory(sptr, subInfo.index_->accountId_, false);
3718 }
3719 return;
3720 // LCOV_EXCL_STOP
3721 }
3722
3725 "AccountHistoryTxStream",
3726 [this, dbType = databaseType, subInfo]() {
3727 auto const& accountId = subInfo.index_->accountId_;
3728 auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_;
3729 auto& txHistoryIndex = subInfo.index_->historyTxIndex_;
3730
3731 JLOG(m_journal.trace())
3732 << "AccountHistory job for account " << toBase58(accountId)
3733 << " started. lastLedgerSeq=" << lastLedgerSeq;
3734
3735 auto isFirstTx = [&](std::shared_ptr<Transaction> const& tx,
3736 std::shared_ptr<TxMeta> const& meta) -> bool {
3737 /*
3738 * genesis account: first tx is the one with seq 1
3739 * other account: first tx is the one created the account
3740 */
3741 if (accountId == genesisAccountId)
3742 {
3743 auto stx = tx->getSTransaction();
3744 if (stx->getAccountID(sfAccount) == accountId &&
3745 stx->getSeqValue() == 1)
3746 return true;
3747 }
3748
3749 for (auto& node : meta->getNodes())
3750 {
3751 if (node.getFieldU16(sfLedgerEntryType) != ltACCOUNT_ROOT)
3752 continue;
3753
3754 if (node.isFieldPresent(sfNewFields))
3755 {
3756 if (auto inner = dynamic_cast<STObject const*>(
3757 node.peekAtPField(sfNewFields));
3758 inner)
3759 {
3760 if (inner->isFieldPresent(sfAccount) &&
3761 inner->getAccountID(sfAccount) == accountId)
3762 {
3763 return true;
3764 }
3765 }
3766 }
3767 }
3768
3769 return false;
3770 };
3771
3772 auto send = [&](Json::Value const& jvObj,
3773 bool unsubscribe) -> bool {
3774 if (auto sptr = subInfo.sinkWptr_.lock())
3775 {
3776 sptr->send(jvObj, true);
3777 if (unsubscribe)
3778 unsubAccountHistory(sptr, accountId, false);
3779 return true;
3780 }
3781
3782 return false;
3783 };
3784
3785 auto sendMultiApiJson = [&](MultiApiJson const& jvObj,
3786 bool unsubscribe) -> bool {
3787 if (auto sptr = subInfo.sinkWptr_.lock())
3788 {
3789 jvObj.visit(
3790 sptr->getApiVersion(), //
3791 [&](Json::Value const& jv) { sptr->send(jv, true); });
3792
3793 if (unsubscribe)
3794 unsubAccountHistory(sptr, accountId, false);
3795 return true;
3796 }
3797
3798 return false;
3799 };
3800
3801 auto getMoreTxns =
3802 [&](std::uint32_t minLedger,
3803 std::uint32_t maxLedger,
3808 switch (dbType)
3809 {
3810 case Sqlite: {
3811 auto db = static_cast<SQLiteDatabase*>(
3814 accountId, minLedger, maxLedger, marker, 0, true};
3815 return db->newestAccountTxPage(options);
3816 }
3817 // LCOV_EXCL_START
3818 default: {
3819 UNREACHABLE(
3820 "ripple::NetworkOPsImp::addAccountHistoryJob : "
3821 "getMoreTxns : invalid database type");
3822 return {};
3823 }
3824 // LCOV_EXCL_STOP
3825 }
3826 };
3827
3828 /*
3829 * search backward until the genesis ledger or asked to stop
3830 */
3831 while (lastLedgerSeq >= 2 && !subInfo.index_->stopHistorical_)
3832 {
3833 int feeChargeCount = 0;
3834 if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
3835 {
3836 sptr->getConsumer().charge(Resource::feeMediumBurdenRPC);
3837 ++feeChargeCount;
3838 }
3839 else
3840 {
3841 JLOG(m_journal.trace())
3842 << "AccountHistory job for account "
3843 << toBase58(accountId) << " no InfoSub. Fee charged "
3844 << feeChargeCount << " times.";
3845 return;
3846 }
3847
3848 // try to search in 1024 ledgers till reaching genesis ledgers
3849 auto startLedgerSeq =
3850 (lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 : 2);
3851 JLOG(m_journal.trace())
3852 << "AccountHistory job for account " << toBase58(accountId)
3853 << ", working on ledger range [" << startLedgerSeq << ","
3854 << lastLedgerSeq << "]";
3855
3856 auto haveRange = [&]() -> bool {
3857 std::uint32_t validatedMin = UINT_MAX;
3858 std::uint32_t validatedMax = 0;
3859 auto haveSomeValidatedLedgers =
3861 validatedMin, validatedMax);
3862
3863 return haveSomeValidatedLedgers &&
3864 validatedMin <= startLedgerSeq &&
3865 lastLedgerSeq <= validatedMax;
3866 }();
3867
3868 if (!haveRange)
3869 {
3870 JLOG(m_journal.debug())
3871 << "AccountHistory reschedule job for account "
3872 << toBase58(accountId) << ", incomplete ledger range ["
3873 << startLedgerSeq << "," << lastLedgerSeq << "]";
3875 return;
3876 }
3877
3879 while (!subInfo.index_->stopHistorical_)
3880 {
3881 auto dbResult =
3882 getMoreTxns(startLedgerSeq, lastLedgerSeq, marker);
3883 if (!dbResult)
3884 {
3885 // LCOV_EXCL_START
3886 UNREACHABLE(
3887 "ripple::NetworkOPsImp::addAccountHistoryJob : "
3888 "getMoreTxns failed");
3889 JLOG(m_journal.debug())
3890 << "AccountHistory job for account "
3891 << toBase58(accountId) << " getMoreTxns failed.";
3892 send(rpcError(rpcINTERNAL), true);
3893 return;
3894 // LCOV_EXCL_STOP
3895 }
3896
3897 auto const& txns = dbResult->first;
3898 marker = dbResult->second;
3899 size_t num_txns = txns.size();
3900 for (size_t i = 0; i < num_txns; ++i)
3901 {
3902 auto const& [tx, meta] = txns[i];
3903
3904 if (!tx || !meta)
3905 {
3906 JLOG(m_journal.debug())
3907 << "AccountHistory job for account "
3908 << toBase58(accountId) << " empty tx or meta.";
3909 send(rpcError(rpcINTERNAL), true);
3910 return;
3911 }
3912 auto curTxLedger =
3914 tx->getLedger());
3915 if (!curTxLedger)
3916 {
3917 // LCOV_EXCL_START
3918 UNREACHABLE(
3919 "ripple::NetworkOPsImp::addAccountHistoryJob : "
3920 "getLedgerBySeq failed");
3921 JLOG(m_journal.debug())
3922 << "AccountHistory job for account "
3923 << toBase58(accountId) << " no ledger.";
3924 send(rpcError(rpcINTERNAL), true);
3925 return;
3926 // LCOV_EXCL_STOP
3927 }
3929 tx->getSTransaction();
3930 if (!stTxn)
3931 {
3932 // LCOV_EXCL_START
3933 UNREACHABLE(
3934 "NetworkOPsImp::addAccountHistoryJob : "
3935 "getSTransaction failed");
3936 JLOG(m_journal.debug())
3937 << "AccountHistory job for account "
3938 << toBase58(accountId)
3939 << " getSTransaction failed.";
3940 send(rpcError(rpcINTERNAL), true);
3941 return;
3942 // LCOV_EXCL_STOP
3943 }
3944
3945 auto const mRef = std::ref(*meta);
3946 auto const trR = meta->getResultTER();
3947 MultiApiJson jvTx =
3948 transJson(stTxn, trR, true, curTxLedger, mRef);
3949
3950 jvTx.set(
3951 jss::account_history_tx_index, txHistoryIndex--);
3952 if (i + 1 == num_txns ||
3953 txns[i + 1].first->getLedger() != tx->getLedger())
3954 jvTx.set(jss::account_history_boundary, true);
3955
3956 if (isFirstTx(tx, meta))
3957 {
3958 jvTx.set(jss::account_history_tx_first, true);
3959 sendMultiApiJson(jvTx, false);
3960
3961 JLOG(m_journal.trace())
3962 << "AccountHistory job for account "
3963 << toBase58(accountId)
3964 << " done, found last tx.";
3965 return;
3966 }
3967 else
3968 {
3969 sendMultiApiJson(jvTx, false);
3970 }
3971 }
3972
3973 if (marker)
3974 {
3975 JLOG(m_journal.trace())
3976 << "AccountHistory job for account "
3977 << toBase58(accountId)
3978 << " paging, marker=" << marker->ledgerSeq << ":"
3979 << marker->txnSeq;
3980 }
3981 else
3982 {
3983 break;
3984 }
3985 }
3986
3987 if (!subInfo.index_->stopHistorical_)
3988 {
3989 lastLedgerSeq = startLedgerSeq - 1;
3990 if (lastLedgerSeq <= 1)
3991 {
3992 JLOG(m_journal.trace())
3993 << "AccountHistory job for account "
3994 << toBase58(accountId)
3995 << " done, reached genesis ledger.";
3996 return;
3997 }
3998 }
3999 }
4000 });
4001}
4002
4003void
4005 std::shared_ptr<ReadView const> const& ledger,
4007{
4008 subInfo.index_->separationLedgerSeq_ = ledger->seq();
4009 auto const& accountId = subInfo.index_->accountId_;
4010 auto const accountKeylet = keylet::account(accountId);
4011 if (!ledger->exists(accountKeylet))
4012 {
4013 JLOG(m_journal.debug())
4014 << "subAccountHistoryStart, no account " << toBase58(accountId)
4015 << ", no need to add AccountHistory job.";
4016 return;
4017 }
4018 if (accountId == genesisAccountId)
4019 {
4020 if (auto const sleAcct = ledger->read(accountKeylet); sleAcct)
4021 {
4022 if (sleAcct->getFieldU32(sfSequence) == 1)
4023 {
4024 JLOG(m_journal.debug())
4025 << "subAccountHistoryStart, genesis account "
4026 << toBase58(accountId)
4027 << " does not have tx, no need to add AccountHistory job.";
4028 return;
4029 }
4030 }
4031 else
4032 {
4033 // LCOV_EXCL_START
4034 UNREACHABLE(
4035 "ripple::NetworkOPsImp::subAccountHistoryStart : failed to "
4036 "access genesis account");
4037 return;
4038 // LCOV_EXCL_STOP
4039 }
4040 }
4041 subInfo.index_->historyLastLedgerSeq_ = ledger->seq();
4042 subInfo.index_->haveHistorical_ = true;
4043
4044 JLOG(m_journal.debug())
4045 << "subAccountHistoryStart, add AccountHistory job: accountId="
4046 << toBase58(accountId) << ", currentLedgerSeq=" << ledger->seq();
4047
4048 addAccountHistoryJob(subInfo);
4049}
4050
4053 InfoSub::ref isrListener,
4054 AccountID const& accountId)
4055{
4056 if (!isrListener->insertSubAccountHistory(accountId))
4057 {
4058 JLOG(m_journal.debug())
4059 << "subAccountHistory, already subscribed to account "
4060 << toBase58(accountId);
4061 return rpcINVALID_PARAMS;
4062 }
4063
4066 isrListener, std::make_shared<SubAccountHistoryIndex>(accountId)};
4067 auto simIterator = mSubAccountHistory.find(accountId);
4068 if (simIterator == mSubAccountHistory.end())
4069 {
4071 inner.emplace(isrListener->getSeq(), ahi);
4073 simIterator, std::make_pair(accountId, inner));
4074 }
4075 else
4076 {
4077 simIterator->second.emplace(isrListener->getSeq(), ahi);
4078 }
4079
4080 auto const ledger = app_.getLedgerMaster().getValidatedLedger();
4081 if (ledger)
4082 {
4083 subAccountHistoryStart(ledger, ahi);
4084 }
4085 else
4086 {
4087 // The node does not have validated ledgers, so wait for
4088 // one before start streaming.
4089 // In this case, the subscription is also considered successful.
4090 JLOG(m_journal.debug())
4091 << "subAccountHistory, no validated ledger yet, delay start";
4092 }
4093
4094 return rpcSUCCESS;
4095}
4096
4097void
4099 InfoSub::ref isrListener,
4100 AccountID const& account,
4101 bool historyOnly)
4102{
4103 if (!historyOnly)
4104 isrListener->deleteSubAccountHistory(account);
4105 unsubAccountHistoryInternal(isrListener->getSeq(), account, historyOnly);
4106}
4107
4108void
4110 std::uint64_t seq,
4111 AccountID const& account,
4112 bool historyOnly)
4113{
4115 auto simIterator = mSubAccountHistory.find(account);
4116 if (simIterator != mSubAccountHistory.end())
4117 {
4118 auto& subInfoMap = simIterator->second;
4119 auto subInfoIter = subInfoMap.find(seq);
4120 if (subInfoIter != subInfoMap.end())
4121 {
4122 subInfoIter->second.index_->stopHistorical_ = true;
4123 }
4124
4125 if (!historyOnly)
4126 {
4127 simIterator->second.erase(seq);
4128 if (simIterator->second.empty())
4129 {
4130 mSubAccountHistory.erase(simIterator);
4131 }
4132 }
4133 JLOG(m_journal.debug())
4134 << "unsubAccountHistory, account " << toBase58(account)
4135 << ", historyOnly = " << (historyOnly ? "true" : "false");
4136 }
4137}
4138
4139bool
4141{
4142 if (auto listeners = app_.getOrderBookDB().makeBookListeners(book))
4143 listeners->addSubscriber(isrListener);
4144 else
4145 {
4146 // LCOV_EXCL_START
4147 UNREACHABLE("ripple::NetworkOPsImp::subBook : null book listeners");
4148 // LCOV_EXCL_STOP
4149 }
4150 return true;
4151}
4152
4153bool
4155{
4156 if (auto listeners = app_.getOrderBookDB().getBookListeners(book))
4157 listeners->removeSubscriber(uSeq);
4158
4159 return true;
4160}
4161
4165{
4166 // This code-path is exclusively used when the server is in standalone
4167 // mode via `ledger_accept`
4168 XRPL_ASSERT(
4169 m_standalone, "ripple::NetworkOPsImp::acceptLedger : is standalone");
4170
4171 if (!m_standalone)
4172 Throw<std::runtime_error>(
4173 "Operation only possible in STANDALONE mode.");
4174
4175 // FIXME Could we improve on this and remove the need for a specialized
4176 // API in Consensus?
4177 beginConsensus(m_ledgerMaster.getClosedLedger()->info().hash, {});
4178 mConsensus.simulate(app_.timeKeeper().closeTime(), consensusDelay);
4179 return m_ledgerMaster.getCurrentLedger()->info().seq;
4180}
4181
4182// <-- bool: true=added, false=already there
4183bool
4185{
4186 if (auto lpClosed = m_ledgerMaster.getValidatedLedger())
4187 {
4188 jvResult[jss::ledger_index] = lpClosed->info().seq;
4189 jvResult[jss::ledger_hash] = to_string(lpClosed->info().hash);
4190 jvResult[jss::ledger_time] = Json::Value::UInt(
4191 lpClosed->info().closeTime.time_since_epoch().count());
4192 if (!lpClosed->rules().enabled(featureXRPFees))
4193 jvResult[jss::fee_ref] = Config::FEE_UNITS_DEPRECATED;
4194 jvResult[jss::fee_base] = lpClosed->fees().base.jsonClipped();
4195 jvResult[jss::reserve_base] = lpClosed->fees().reserve.jsonClipped();
4196 jvResult[jss::reserve_inc] = lpClosed->fees().increment.jsonClipped();
4197 jvResult[jss::network_id] = app_.config().NETWORK_ID;
4198 }
4199
4201 {
4202 jvResult[jss::validated_ledgers] =
4204 }
4205
4207 return mStreamMaps[sLedger]
4208 .emplace(isrListener->getSeq(), isrListener)
4209 .second;
4210}
4211
4212// <-- bool: true=added, false=already there
4213bool
4215{
4218 .emplace(isrListener->getSeq(), isrListener)
4219 .second;
4220}
4221
4222// <-- bool: true=erased, false=was not there
4223bool
4225{
4227 return mStreamMaps[sLedger].erase(uSeq);
4228}
4229
4230// <-- bool: true=erased, false=was not there
4231bool
4237
4238// <-- bool: true=added, false=already there
4239bool
4241{
4243 return mStreamMaps[sManifests]
4244 .emplace(isrListener->getSeq(), isrListener)
4245 .second;
4246}
4247
4248// <-- bool: true=erased, false=was not there
4249bool
4255
4256// <-- bool: true=added, false=already there
4257bool
4259 InfoSub::ref isrListener,
4260 Json::Value& jvResult,
4261 bool admin)
4262{
4263 uint256 uRandom;
4264
4265 if (m_standalone)
4266 jvResult[jss::stand_alone] = m_standalone;
4267
4268 // CHECKME: is it necessary to provide a random number here?
4269 beast::rngfill(uRandom.begin(), uRandom.size(), crypto_prng());
4270
4271 auto const& feeTrack = app_.getFeeTrack();
4272 jvResult[jss::random] = to_string(uRandom);
4273 jvResult[jss::server_status] = strOperatingMode(admin);
4274 jvResult[jss::load_base] = feeTrack.getLoadBase();
4275 jvResult[jss::load_factor] = feeTrack.getLoadFactor();
4276 jvResult[jss::hostid] = getHostId(admin);
4277 jvResult[jss::pubkey_node] =
4279
4281 return mStreamMaps[sServer]
4282 .emplace(isrListener->getSeq(), isrListener)
4283 .second;
4284}
4285
4286// <-- bool: true=erased, false=was not there
4287bool
4289{
4291 return mStreamMaps[sServer].erase(uSeq);
4292}
4293
4294// <-- bool: true=added, false=already there
4295bool
4297{
4300 .emplace(isrListener->getSeq(), isrListener)
4301 .second;
4302}
4303
4304// <-- bool: true=erased, false=was not there
4305bool
4311
4312// <-- bool: true=added, false=already there
4313bool
4315{
4318 .emplace(isrListener->getSeq(), isrListener)
4319 .second;
4320}
4321
4322// <-- bool: true=erased, false=was not there
4323bool
4329
4330// <-- bool: true=added, false=already there
4331bool
4333{
4336 .emplace(isrListener->getSeq(), isrListener)
4337 .second;
4338}
4339
4340void
4345
4346// <-- bool: true=erased, false=was not there
4347bool
4353
4354// <-- bool: true=added, false=already there
4355bool
4357{
4359 return mStreamMaps[sPeerStatus]
4360 .emplace(isrListener->getSeq(), isrListener)
4361 .second;
4362}
4363
4364// <-- bool: true=erased, false=was not there
4365bool
4371
4372// <-- bool: true=added, false=already there
4373bool
4375{
4378 .emplace(isrListener->getSeq(), isrListener)
4379 .second;
4380}
4381
4382// <-- bool: true=erased, false=was not there
4383bool
4389
4392{
4394
4395 subRpcMapType::iterator it = mRpcSubMap.find(strUrl);
4396
4397 if (it != mRpcSubMap.end())
4398 return it->second;
4399
4400 return InfoSub::pointer();
4401}
4402
4405{
4407
4408 mRpcSubMap.emplace(strUrl, rspEntry);
4409
4410 return rspEntry;
4411}
4412
4413bool
4415{
4417 auto pInfo = findRpcSub(strUrl);
4418
4419 if (!pInfo)
4420 return false;
4421
4422 // check to see if any of the stream maps still hold a weak reference to
4423 // this entry before removing
4424 for (SubMapType const& map : mStreamMaps)
4425 {
4426 if (map.find(pInfo->getSeq()) != map.end())
4427 return false;
4428 }
4429 mRpcSubMap.erase(strUrl);
4430 return true;
4431}
4432
4433#ifndef USE_NEW_BOOK_PAGE
4434
4435// NIKB FIXME this should be looked at. There's no reason why this shouldn't
4436// work, but it demonstrated poor performance.
4437//
4438void
4441 Book const& book,
4442 AccountID const& uTakerID,
4443 bool const bProof,
4444 unsigned int iLimit,
4445 Json::Value const& jvMarker,
4446 Json::Value& jvResult)
4447{ // CAUTION: This is the old get book page logic
4448 Json::Value& jvOffers =
4449 (jvResult[jss::offers] = Json::Value(Json::arrayValue));
4450
4452 uint256 const uBookBase = getBookBase(book);
4453 uint256 const uBookEnd = getQualityNext(uBookBase);
4454 uint256 uTipIndex = uBookBase;
4455
4456 if (auto stream = m_journal.trace())
4457 {
4458 stream << "getBookPage:" << book;
4459 stream << "getBookPage: uBookBase=" << uBookBase;
4460 stream << "getBookPage: uBookEnd=" << uBookEnd;
4461 stream << "getBookPage: uTipIndex=" << uTipIndex;
4462 }
4463
4464 ReadView const& view = *lpLedger;
4465
4466 bool const bGlobalFreeze = isGlobalFrozen(view, book.out.account) ||
4467 isGlobalFrozen(view, book.in.account);
4468
4469 bool bDone = false;
4470 bool bDirectAdvance = true;
4471
4472 std::shared_ptr<SLE const> sleOfferDir;
4473 uint256 offerIndex;
4474 unsigned int uBookEntry;
4475 STAmount saDirRate;
4476
4477 auto const rate = transferRate(view, book.out.account);
4478 auto viewJ = app_.journal("View");
4479
4480 while (!bDone && iLimit-- > 0)
4481 {
4482 if (bDirectAdvance)
4483 {
4484 bDirectAdvance = false;
4485
4486 JLOG(m_journal.trace()) << "getBookPage: bDirectAdvance";
4487
4488 auto const ledgerIndex = view.succ(uTipIndex, uBookEnd);
4489 if (ledgerIndex)
4490 sleOfferDir = view.read(keylet::page(*ledgerIndex));
4491 else
4492 sleOfferDir.reset();
4493
4494 if (!sleOfferDir)
4495 {
4496 JLOG(m_journal.trace()) << "getBookPage: bDone";
4497 bDone = true;
4498 }
4499 else
4500 {
4501 uTipIndex = sleOfferDir->key();
4502 saDirRate = amountFromQuality(getQuality(uTipIndex));
4503
4504 cdirFirst(view, uTipIndex, sleOfferDir, uBookEntry, offerIndex);
4505
4506 JLOG(m_journal.trace())
4507 << "getBookPage: uTipIndex=" << uTipIndex;
4508 JLOG(m_journal.trace())
4509 << "getBookPage: offerIndex=" << offerIndex;
4510 }
4511 }
4512
4513 if (!bDone)
4514 {
4515 auto sleOffer = view.read(keylet::offer(offerIndex));
4516
4517 if (sleOffer)
4518 {
4519 auto const uOfferOwnerID = sleOffer->getAccountID(sfAccount);
4520 auto const& saTakerGets = sleOffer->getFieldAmount(sfTakerGets);
4521 auto const& saTakerPays = sleOffer->getFieldAmount(sfTakerPays);
4522 STAmount saOwnerFunds;
4523 bool firstOwnerOffer(true);
4524
4525 if (book.out.account == uOfferOwnerID)
4526 {
4527 // If an offer is selling issuer's own IOUs, it is fully
4528 // funded.
4529 saOwnerFunds = saTakerGets;
4530 }
4531 else if (bGlobalFreeze)
4532 {
4533 // If either asset is globally frozen, consider all offers
4534 // that aren't ours to be totally unfunded
4535 saOwnerFunds.clear(book.out);
4536 }
4537 else
4538 {
4539 auto umBalanceEntry = umBalance.find(uOfferOwnerID);
4540 if (umBalanceEntry != umBalance.end())
4541 {
4542 // Found in running balance table.
4543
4544 saOwnerFunds = umBalanceEntry->second;
4545 firstOwnerOffer = false;
4546 }
4547 else
4548 {
4549 // Did not find balance in table.
4550
4551 saOwnerFunds = accountHolds(
4552 view,
4553 uOfferOwnerID,
4554 book.out.currency,
4555 book.out.account,
4557 viewJ);
4558
4559 if (saOwnerFunds < beast::zero)
4560 {
4561 // Treat negative funds as zero.
4562
4563 saOwnerFunds.clear();
4564 }
4565 }
4566 }
4567
4568 Json::Value jvOffer = sleOffer->getJson(JsonOptions::none);
4569
4570 STAmount saTakerGetsFunded;
4571 STAmount saOwnerFundsLimit = saOwnerFunds;
4572 Rate offerRate = parityRate;
4573
4574 if (rate != parityRate
4575 // Have a tranfer fee.
4576 && uTakerID != book.out.account
4577 // Not taking offers of own IOUs.
4578 && book.out.account != uOfferOwnerID)
4579 // Offer owner not issuing ownfunds
4580 {
4581 // Need to charge a transfer fee to offer owner.
4582 offerRate = rate;
4583 saOwnerFundsLimit = divide(saOwnerFunds, offerRate);
4584 }
4585
4586 if (saOwnerFundsLimit >= saTakerGets)
4587 {
4588 // Sufficient funds no shenanigans.
4589 saTakerGetsFunded = saTakerGets;
4590 }
4591 else
4592 {
4593 // Only provide, if not fully funded.
4594
4595 saTakerGetsFunded = saOwnerFundsLimit;
4596
4597 saTakerGetsFunded.setJson(jvOffer[jss::taker_gets_funded]);
4598 std::min(
4599 saTakerPays,
4600 multiply(
4601 saTakerGetsFunded, saDirRate, saTakerPays.issue()))
4602 .setJson(jvOffer[jss::taker_pays_funded]);
4603 }
4604
4605 STAmount saOwnerPays = (parityRate == offerRate)
4606 ? saTakerGetsFunded
4607 : std::min(
4608 saOwnerFunds, multiply(saTakerGetsFunded, offerRate));
4609
4610 umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
4611
4612 // Include all offers funded and unfunded
4613 Json::Value& jvOf = jvOffers.append(jvOffer);
4614 jvOf[jss::quality] = saDirRate.getText();
4615
4616 if (firstOwnerOffer)
4617 jvOf[jss::owner_funds] = saOwnerFunds.getText();
4618 }
4619 else
4620 {
4621 JLOG(m_journal.warn()) << "Missing offer";
4622 }
4623
4624 if (!cdirNext(view, uTipIndex, sleOfferDir, uBookEntry, offerIndex))
4625 {
4626 bDirectAdvance = true;
4627 }
4628 else
4629 {
4630 JLOG(m_journal.trace())
4631 << "getBookPage: offerIndex=" << offerIndex;
4632 }
4633 }
4634 }
4635
4636 // jvResult[jss::marker] = Json::Value(Json::arrayValue);
4637 // jvResult[jss::nodes] = Json::Value(Json::arrayValue);
4638}
4639
4640#else
4641
4642// This is the new code that uses the book iterators
4643// It has temporarily been disabled
4644
4645void
4648 Book const& book,
4649 AccountID const& uTakerID,
4650 bool const bProof,
4651 unsigned int iLimit,
4652 Json::Value const& jvMarker,
4653 Json::Value& jvResult)
4654{
4655 auto& jvOffers = (jvResult[jss::offers] = Json::Value(Json::arrayValue));
4656
4658
4659 MetaView lesActive(lpLedger, tapNONE, true);
4660 OrderBookIterator obIterator(lesActive, book);
4661
4662 auto const rate = transferRate(lesActive, book.out.account);
4663
4664 bool const bGlobalFreeze = lesActive.isGlobalFrozen(book.out.account) ||
4665 lesActive.isGlobalFrozen(book.in.account);
4666
4667 while (iLimit-- > 0 && obIterator.nextOffer())
4668 {
4669 SLE::pointer sleOffer = obIterator.getCurrentOffer();
4670 if (sleOffer)
4671 {
4672 auto const uOfferOwnerID = sleOffer->getAccountID(sfAccount);
4673 auto const& saTakerGets = sleOffer->getFieldAmount(sfTakerGets);
4674 auto const& saTakerPays = sleOffer->getFieldAmount(sfTakerPays);
4675 STAmount saDirRate = obIterator.getCurrentRate();
4676 STAmount saOwnerFunds;
4677
4678 if (book.out.account == uOfferOwnerID)
4679 {
4680 // If offer is selling issuer's own IOUs, it is fully funded.
4681 saOwnerFunds = saTakerGets;
4682 }
4683 else if (bGlobalFreeze)
4684 {
4685 // If either asset is globally frozen, consider all offers
4686 // that aren't ours to be totally unfunded
4687 saOwnerFunds.clear(book.out);
4688 }
4689 else
4690 {
4691 auto umBalanceEntry = umBalance.find(uOfferOwnerID);
4692
4693 if (umBalanceEntry != umBalance.end())
4694 {
4695 // Found in running balance table.
4696
4697 saOwnerFunds = umBalanceEntry->second;
4698 }
4699 else
4700 {
4701 // Did not find balance in table.
4702
4703 saOwnerFunds = lesActive.accountHolds(
4704 uOfferOwnerID,
4705 book.out.currency,
4706 book.out.account,
4708
4709 if (saOwnerFunds.isNegative())
4710 {
4711 // Treat negative funds as zero.
4712
4713 saOwnerFunds.zero();
4714 }
4715 }
4716 }
4717
4718 Json::Value jvOffer = sleOffer->getJson(JsonOptions::none);
4719
4720 STAmount saTakerGetsFunded;
4721 STAmount saOwnerFundsLimit = saOwnerFunds;
4722 Rate offerRate = parityRate;
4723
4724 if (rate != parityRate
4725 // Have a tranfer fee.
4726 && uTakerID != book.out.account
4727 // Not taking offers of own IOUs.
4728 && book.out.account != uOfferOwnerID)
4729 // Offer owner not issuing ownfunds
4730 {
4731 // Need to charge a transfer fee to offer owner.
4732 offerRate = rate;
4733 saOwnerFundsLimit = divide(saOwnerFunds, offerRate);
4734 }
4735
4736 if (saOwnerFundsLimit >= saTakerGets)
4737 {
4738 // Sufficient funds no shenanigans.
4739 saTakerGetsFunded = saTakerGets;
4740 }
4741 else
4742 {
4743 // Only provide, if not fully funded.
4744 saTakerGetsFunded = saOwnerFundsLimit;
4745
4746 saTakerGetsFunded.setJson(jvOffer[jss::taker_gets_funded]);
4747
4748 // TOOD(tom): The result of this expression is not used - what's
4749 // going on here?
4750 std::min(
4751 saTakerPays,
4752 multiply(saTakerGetsFunded, saDirRate, saTakerPays.issue()))
4753 .setJson(jvOffer[jss::taker_pays_funded]);
4754 }
4755
4756 STAmount saOwnerPays = (parityRate == offerRate)
4757 ? saTakerGetsFunded
4758 : std::min(
4759 saOwnerFunds, multiply(saTakerGetsFunded, offerRate));
4760
4761 umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
4762
4763 if (!saOwnerFunds.isZero() || uOfferOwnerID == uTakerID)
4764 {
4765 // Only provide funded offers and offers of the taker.
4766 Json::Value& jvOf = jvOffers.append(jvOffer);
4767 jvOf[jss::quality] = saDirRate.getText();
4768 }
4769 }
4770 }
4771
4772 // jvResult[jss::marker] = Json::Value(Json::arrayValue);
4773 // jvResult[jss::nodes] = Json::Value(Json::arrayValue);
4774}
4775
4776#endif
4777
4778inline void
4780{
4781 auto [counters, mode, start, initialSync] = accounting_.getCounterData();
4782 auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
4784 counters[static_cast<std::size_t>(mode)].dur += current;
4785
4788 counters[static_cast<std::size_t>(OperatingMode::DISCONNECTED)]
4789 .dur.count());
4791 counters[static_cast<std::size_t>(OperatingMode::CONNECTED)]
4792 .dur.count());
4794 counters[static_cast<std::size_t>(OperatingMode::SYNCING)].dur.count());
4796 counters[static_cast<std::size_t>(OperatingMode::TRACKING)]
4797 .dur.count());
4799 counters[static_cast<std::size_t>(OperatingMode::FULL)].dur.count());
4800
4802 counters[static_cast<std::size_t>(OperatingMode::DISCONNECTED)]
4803 .transitions);
4805 counters[static_cast<std::size_t>(OperatingMode::CONNECTED)]
4806 .transitions);
4808 counters[static_cast<std::size_t>(OperatingMode::SYNCING)].transitions);
4810 counters[static_cast<std::size_t>(OperatingMode::TRACKING)]
4811 .transitions);
4813 counters[static_cast<std::size_t>(OperatingMode::FULL)].transitions);
4814}
4815
4816void
4818{
4819 auto now = std::chrono::steady_clock::now();
4820
4821 std::lock_guard lock(mutex_);
4822 ++counters_[static_cast<std::size_t>(om)].transitions;
4823 if (om == OperatingMode::FULL &&
4824 counters_[static_cast<std::size_t>(om)].transitions == 1)
4825 {
4826 initialSyncUs_ = std::chrono::duration_cast<std::chrono::microseconds>(
4827 now - processStart_)
4828 .count();
4829 }
4830 counters_[static_cast<std::size_t>(mode_)].dur +=
4831 std::chrono::duration_cast<std::chrono::microseconds>(now - start_);
4832
4833 mode_ = om;
4834 start_ = now;
4835}
4836
4837void
4839{
4840 auto [counters, mode, start, initialSync] = getCounterData();
4841 auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
4843 counters[static_cast<std::size_t>(mode)].dur += current;
4844
4845 obj[jss::state_accounting] = Json::objectValue;
4847 i <= static_cast<std::size_t>(OperatingMode::FULL);
4848 ++i)
4849 {
4850 obj[jss::state_accounting][states_[i]] = Json::objectValue;
4851 auto& state = obj[jss::state_accounting][states_[i]];
4852 state[jss::transitions] = std::to_string(counters[i].transitions);
4853 state[jss::duration_us] = std::to_string(counters[i].dur.count());
4854 }
4855 obj[jss::server_state_duration_us] = std::to_string(current.count());
4856 if (initialSync)
4857 obj[jss::initial_sync_duration_us] = std::to_string(initialSync);
4858}
4859
4860//------------------------------------------------------------------------------
4861
4864 Application& app,
4866 bool standalone,
4867 std::size_t minPeerCount,
4868 bool startvalid,
4869 JobQueue& job_queue,
4871 ValidatorKeys const& validatorKeys,
4872 boost::asio::io_context& io_svc,
4873 beast::Journal journal,
4874 beast::insight::Collector::ptr const& collector)
4875{
4877 app,
4878 clock,
4879 standalone,
4880 minPeerCount,
4881 startvalid,
4882 job_queue,
4884 validatorKeys,
4885 io_svc,
4886 journal,
4887 collector);
4888}
4889
4890} // namespace ripple
T any_of(T... args)
T back_inserter(T... args)
T begin(T... args)
Decorator for streaming out compact json.
Lightweight wrapper to tag static string.
Definition json_value.h:45
Represents a JSON value.
Definition json_value.h:131
Json::UInt UInt
Definition json_value.h:138
Value & append(Value const &value)
Append value to array at the end.
bool isMember(char const *key) const
Return true if the object has a member named key.
Value get(UInt index, Value const &defaultValue) const
If the array contains at least index+1 elements, returns the element value, otherwise returns default...
A generic endpoint for log messages.
Definition Journal.h:41
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
Stream warn() const
Definition Journal.h:321
A metric for measuring an integral value.
Definition Gauge.h:21
void set(value_type value) const
Set the value on the gauge.
Definition Gauge.h:49
A reference to a handler for performing polled collection.
Definition Hook.h:13
A transaction that is in a closed ledger.
boost::container::flat_set< AccountID > const & getAffected() const
std::shared_ptr< STTx const > const & getTxn() const
TxMeta const & getMeta() const
virtual std::optional< NetClock::time_point > firstUnsupportedExpected() const =0
virtual Config & config()=0
virtual Overlay & overlay()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual OpenLedger & openLedger()=0
virtual beast::Journal journal(std::string const &name)=0
virtual NodeStore::Database & getNodeStore()=0
virtual ServerHandler & getServerHandler()=0
virtual std::chrono::milliseconds getIOLatency()=0
virtual OrderBookDB & getOrderBookDB()=0
virtual TimeKeeper & timeKeeper()=0
virtual TaggedCache< uint256, AcceptedLedger > & getAcceptedLedgerCache()=0
virtual JobQueue & getJobQueue()=0
virtual InboundLedgers & getInboundLedgers()=0
virtual ValidatorList & validators()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual LedgerMaster & getLedgerMaster()=0
virtual RelationalDatabase & getRelationalDatabase()=0
virtual ManifestCache & validatorManifests()=0
virtual TxQ & getTxQ()=0
virtual perf::PerfLog & getPerfLog()=0
virtual Cluster & cluster()=0
virtual AmendmentTable & getAmendmentTable()=0
virtual std::pair< PublicKey, SecretKey > const & nodeIdentity()=0
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
Specifies an order book.
Definition Book.h:17
Issue in
Definition Book.h:19
Issue out
Definition Book.h:20
Holds transactions which were deferred to the next pass of consensus.
The role of a ClosureCounter is to assist in shutdown by letting callers wait for the completion of c...
std::string const & name() const
Definition ClusterNode.h:27
std::uint32_t getLoadFee() const
Definition ClusterNode.h:33
NetClock::time_point getReportTime() const
Definition ClusterNode.h:39
PublicKey const & identity() const
Definition ClusterNode.h:45
std::size_t size() const
The number of nodes in the cluster list.
Definition Cluster.cpp:30
uint32_t NETWORK_ID
Definition Config.h:137
std::string SERVER_DOMAIN
Definition Config.h:259
std::size_t NODE_SIZE
Definition Config.h:194
static constexpr std::uint32_t FEE_UNITS_DEPRECATED
Definition Config.h:141
int RELAY_UNTRUSTED_VALIDATIONS
Definition Config.h:150
virtual void clearFailures()=0
virtual Json::Value getInfo()=0
std::shared_ptr< InfoSub > pointer
Definition InfoSub.h:35
AccountID account
Definition Issue.h:17
Currency currency
Definition Issue.h:16
A pool of threads to perform work.
Definition JobQueue.h:39
Json::Value getJson(int c=0)
Definition JobQueue.cpp:195
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:149
std::shared_ptr< Ledger const > getValidatedLedger()
bool haveValidated()
Whether we have ever fully validated a ledger.
std::shared_ptr< ReadView const > getCurrentLedger()
bool getValidatedRange(std::uint32_t &minVal, std::uint32_t &maxVal)
std::shared_ptr< Ledger const > getClosedLedger()
std::string getCompleteLedgers()
std::size_t getFetchPackCacheSize() const
std::shared_ptr< ReadView const > getPublishedLedger()
std::shared_ptr< Ledger const > getLedgerBySeq(std::uint32_t index)
std::chrono::seconds getValidatedLedgerAge()
Manages the current fee schedule.
std::uint32_t getClusterFee() const
std::uint32_t getLocalFee() const
std::uint32_t getLoadBase() const
std::uint32_t getRemoteFee() const
std::uint32_t getLoadFactor() const
Manages load sources.
Definition LoadManager.h:27
void heartbeat()
Reset the stall detection timer.
PublicKey getMasterKey(PublicKey const &pk) const
Returns ephemeral signing key's master public key.
Definition Manifest.cpp:304
State accounting records two attributes for each possible server state: 1) Amount of time spent in ea...
void mode(OperatingMode om)
Record state transition.
void json(Json::Value &obj) const
Output state counters in JSON format.
std::array< Counters, 5 > counters_
std::chrono::steady_clock::time_point start_
static std::array< Json::StaticString const, 5 > const states_
std::chrono::steady_clock::time_point const processStart_
Transaction with input flags and results to be applied in batches.
TransactionStatus(std::shared_ptr< Transaction > t, bool a, bool l, FailHard f)
std::shared_ptr< Transaction > const transaction
boost::asio::steady_timer accountHistoryTxTimer_
void pubProposedTransaction(std::shared_ptr< ReadView const > const &ledger, std::shared_ptr< STTx const > const &transaction, TER result) override
OperatingMode getOperatingMode() const override
std::string strOperatingMode(OperatingMode const mode, bool const admin) const override
bool preProcessTransaction(std::shared_ptr< Transaction > &transaction)
std::vector< TransactionStatus > mTransactions
bool unsubBookChanges(std::uint64_t uListener) override
std::atomic< OperatingMode > mMode
Json::Value getLedgerFetchInfo() override
bool isUNLBlocked() override
RCLConsensus mConsensus
void unsubAccount(InfoSub::ref ispListener, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
Json::Value getOwnerInfo(std::shared_ptr< ReadView const > lpLedger, AccountID const &account) override
void setNeedNetworkLedger() override
void setUNLBlocked() override
void pubConsensus(ConsensusPhase phase)
void transactionBatch()
Apply transactions in batches.
void apply(std::unique_lock< std::mutex > &batchLock)
Attempt to apply transactions and post-process based on the results.
void setAmendmentBlocked() override
bool checkLastClosedLedger(Overlay::PeerSequence const &, uint256 &networkClosed)
void processTransaction(std::shared_ptr< Transaction > &transaction, bool bUnlimited, bool bLocal, FailHard failType) override
Process transactions as they arrive from the network or which are submitted by clients.
void processTransactionSet(CanonicalTXSet const &set) override
Process a set of transactions synchronously, and ensuring that they are processed in one batch.
void clearUNLBlocked() override
boost::asio::steady_timer heartbeatTimer_
void updateLocalTx(ReadView const &view) override
bool unsubManifests(std::uint64_t uListener) override
DispatchState
Synchronization states for transaction batches.
std::optional< PublicKey > const validatorPK_
bool unsubTransactions(std::uint64_t uListener) override
void clearAmendmentWarned() override
std::size_t getLocalTxCount() override
std::unique_ptr< LocalTxs > m_localTX
bool subValidations(InfoSub::ref ispListener) override
bool subLedger(InfoSub::ref ispListener, Json::Value &jvResult) override
bool isAmendmentBlocked() override
void unsubAccountHistoryInternal(std::uint64_t seq, AccountID const &account, bool historyOnly) override
SubAccountHistoryMapType mSubAccountHistory
Json::Value getServerInfo(bool human, bool admin, bool counters) override
InfoSub::pointer addRpcSub(std::string const &strUrl, InfoSub::ref) override
boost::asio::steady_timer clusterTimer_
bool isAmendmentWarned() override
static std::array< char const *, 5 > const states_
bool subServer(InfoSub::ref ispListener, Json::Value &jvResult, bool admin) override
void unsubAccountInternal(std::uint64_t seq, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
std::atomic< bool > amendmentBlocked_
beast::Journal m_journal
SubInfoMapType mSubAccount
std::optional< PublicKey > const validatorMasterPK_
void unsubAccountHistory(InfoSub::ref ispListener, AccountID const &account, bool historyOnly) override
unsubscribe an account's transactions
std::set< uint256 > pendingValidations_
bool beginConsensus(uint256 const &networkClosed, std::unique_ptr< std::stringstream > const &clog) override
void doTransactionAsync(std::shared_ptr< Transaction > transaction, bool bUnlimited, FailHard failtype)
For transactions not submitted by a locally connected client, fire and forget.
void setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
bool unsubValidations(std::uint64_t uListener) override
void endConsensus(std::unique_ptr< std::stringstream > const &clog) override
ClosureCounter< void, boost::system::error_code const & > waitHandlerCounter_
void pubLedger(std::shared_ptr< ReadView const > const &lpAccepted) override
void addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
void doTransactionSync(std::shared_ptr< Transaction > transaction, bool bUnlimited, FailHard failType)
For transactions submitted directly by a client, apply batch of transactions and wait for this transa...
void setTimer(boost::asio::steady_timer &timer, std::chrono::milliseconds const &expiry_time, std::function< void()> onExpire, std::function< void()> onError)
std::array< SubMapType, SubTypes::sLastEntry > mStreamMaps
bool unsubPeerStatus(std::uint64_t uListener) override
void pubValidation(std::shared_ptr< STValidation > const &val) override
std::size_t const minPeerCount_
std::atomic< bool > unlBlocked_
bool subBook(InfoSub::ref ispListener, Book const &) override
std::uint32_t acceptLedger(std::optional< std::chrono::milliseconds > consensusDelay) override
Accepts the current transaction tree, return the new ledger's sequence.
void stateAccounting(Json::Value &obj) override
void submitTransaction(std::shared_ptr< STTx const > const &) override
bool unsubRTTransactions(std::uint64_t uListener) override
Json::Value getConsensusInfo() override
std::recursive_mutex mSubLock
std::atomic< bool > needNetworkLedger_
bool recvValidation(std::shared_ptr< STValidation > const &val, std::string const &source) override
void switchLastClosedLedger(std::shared_ptr< Ledger const > const &newLCL)
StateAccounting accounting_
void reportConsensusStateChange(ConsensusPhase phase)
bool subConsensus(InfoSub::ref ispListener) override
bool isNeedNetworkLedger() override
void setAmendmentWarned() override
bool processTrustedProposal(RCLCxPeerPos proposal) override
void doTransactionSyncBatch(std::unique_lock< std::mutex > &lock, std::function< bool(std::unique_lock< std::mutex > const &)> retryCallback)
bool subPeerStatus(InfoSub::ref ispListener) override
void mapComplete(std::shared_ptr< SHAMap > const &map, bool fromAcquire) override
bool tryRemoveRpcSub(std::string const &strUrl) override
void pubAccountTransaction(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &transaction, bool last)
LedgerMaster & m_ledgerMaster
void clearLedgerFetch() override
bool isBlocked() override
void consensusViewChange() override
void setStateTimer() override
Called to initially start our timers.
bool subManifests(InfoSub::ref ispListener) override
void pubValidatedTransaction(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &transaction, bool last)
void subAccount(InfoSub::ref ispListener, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
bool unsubServer(std::uint64_t uListener) override
MultiApiJson transJson(std::shared_ptr< STTx const > const &transaction, TER result, bool validated, std::shared_ptr< ReadView const > const &ledger, std::optional< std::reference_wrapper< TxMeta const > > meta)
ServerFeeSummary mLastFeeSummary
void pubPeerStatus(std::function< Json::Value(void)> const &) override
void setStandAlone() override
bool subRTTransactions(InfoSub::ref ispListener) override
void pubProposedAccountTransaction(std::shared_ptr< ReadView const > const &ledger, std::shared_ptr< STTx const > const &transaction, TER result)
std::condition_variable mCond
void setMode(OperatingMode om) override
void stop() override
void getBookPage(std::shared_ptr< ReadView const > &lpLedger, Book const &, AccountID const &uTakerID, bool const bProof, unsigned int iLimit, Json::Value const &jvMarker, Json::Value &jvResult) override
void clearNeedNetworkLedger() override
NetworkOPsImp(Application &app, NetworkOPs::clock_type &clock, bool standalone, std::size_t minPeerCount, bool start_valid, JobQueue &job_queue, LedgerMaster &ledgerMaster, ValidatorKeys const &validatorKeys, boost::asio::io_context &io_svc, beast::Journal journal, beast::insight::Collector::ptr const &collector)
DispatchState mDispatchState
bool subBookChanges(InfoSub::ref ispListener) override
SubInfoMapType mSubRTAccount
void reportFeeChange() override
bool unsubBook(std::uint64_t uListener, Book const &) override
void subAccountHistoryStart(std::shared_ptr< ReadView const > const &ledger, SubAccountHistoryInfoWeak &subInfo)
bool isFull() override
error_code_i subAccountHistory(InfoSub::ref ispListener, AccountID const &account) override
subscribe an account's new transactions and retrieve the account's historical transactions
std::mutex validationsMutex_
void pubManifest(Manifest const &) override
ConsensusPhase mLastConsensusPhase
bool subTransactions(InfoSub::ref ispListener) override
subRpcMapType mRpcSubMap
std::atomic< bool > amendmentWarned_
InfoSub::pointer findRpcSub(std::string const &strUrl) override
bool unsubLedger(std::uint64_t uListener) override
std::string getHostId(bool forAdmin)
bool unsubConsensus(std::uint64_t uListener) override
Provides server functionality for clients.
Definition NetworkOPs.h:70
void getCountsJson(Json::Value &obj)
Definition Database.cpp:248
std::shared_ptr< OpenView const > current() const
Returns a view to the current open ledger.
Writable ledger view that accumulates state and tx changes.
Definition OpenView.h:46
BookListeners::pointer getBookListeners(Book const &)
void processTxn(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &alTx, MultiApiJson const &jvObj)
BookListeners::pointer makeBookListeners(Book const &)
virtual std::optional< std::uint32_t > networkID() const =0
Returns the ID of the network this server is configured for, if any.
virtual std::uint64_t getPeerDisconnect() const =0
virtual std::size_t size() const =0
Returns the number of active peers.
virtual std::uint64_t getJqTransOverflow() const =0
virtual std::uint64_t getPeerDisconnectCharges() const =0
Manages the generic consensus algorithm for use by the RCL.
std::size_t prevProposers() const
Get the number of proposing peers that participated in the previous round.
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
std::chrono::milliseconds prevRoundTime() const
Get duration of the previous round.
Json::Value getJson(bool full) const
A peer's signed, proposed position for use in RCLConsensus.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
Represents a set of transactions in RCLConsensus.
Definition RCLCxTx.h:44
Wraps a ledger instance for use in generic Validations LedgerTrie.
static std::string getWordFromBlob(void const *blob, size_t bytes)
Chooses a single dictionary word from the data.
Definition RFC1751.cpp:488
Collects logging information.
std::unique_ptr< std::stringstream > const & ss()
A view into a ledger.
Definition ReadView.h:32
virtual std::shared_ptr< SLE const > read(Keylet const &k) const =0
Return the state item associated with a key.
virtual std::optional< key_type > succ(key_type const &key, std::optional< key_type > const &last=std::nullopt) const =0
Return the key of the next state item.
void setJson(Json::Value &) const
Definition STAmount.cpp:624
std::string getText() const override
Definition STAmount.cpp:664
Issue const & issue() const
Definition STAmount.h:477
std::optional< T > get(std::string const &name) const
std::size_t size() const noexcept
Definition Serializer.h:53
void const * data() const noexcept
Definition Serializer.h:59
void setup(Setup const &setup, beast::Journal journal)
time_point now() const override
Returns the current time, using the server's clock.
Definition TimeKeeper.h:45
std::chrono::seconds closeOffset() const
Definition TimeKeeper.h:64
time_point closeTime() const
Returns the predicted close time, in network time.
Definition TimeKeeper.h:57
Metrics getMetrics(OpenView const &view) const
Returns fee metrics in reference fee level units.
Definition TxQ.cpp:1757
static time_point now()
Validator keys and manifest as set in configuration file.
std::size_t count() const
Return the number of configured validator list sites.
std::optional< PublicKey > getTrustedKey(PublicKey const &identity) const
Returns master public key if public key is trusted.
std::optional< PublicKey > localPublicKey() const
This function returns the local validator public key or a std::nullopt.
std::optional< TimeKeeper::time_point > expires() const
Return the time when the validator list will expire.
std::size_t quorum() const
Get quorum value for current trusted key set.
constexpr double decimalXRP() const
Definition XRPAmount.h:243
Json::Value jsonClipped() const
Definition XRPAmount.h:199
iterator begin()
Definition base_uint.h:117
static constexpr std::size_t size()
Definition base_uint.h:507
bool isZero() const
Definition base_uint.h:521
bool isNonZero() const
Definition base_uint.h:526
virtual Json::Value currentJson() const =0
Render currently executing jobs and RPC calls and durations in Json.
virtual Json::Value countersJson() const =0
Render performance counters in Json.
Automatically unlocks and re-locks a unique_lock object.
Definition scope.h:212
T clear(T... args)
T emplace_back(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T get(T... args)
T insert(T... args)
T is_same_v
T is_sorted(T... args)
T lock(T... args)
T make_pair(T... args)
T max(T... args)
T min(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:26
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:27
int Int
unsigned int UInt
void rngfill(void *const buffer, std::size_t const bytes, Generator &g)
Definition rngfill.h:15
std::string const & getVersionString()
Server version.
Definition BuildInfo.cpp:49
std::optional< std::string > encodeCTID(uint32_t ledgerSeq, uint32_t txnIndex, uint32_t networkID) noexcept
Encodes ledger sequence, transaction index, and network ID into a CTID string.
Definition CTID.h:34
Json::Value computeBookChanges(std::shared_ptr< L const > const &lpAccepted)
Definition BookChanges.h:28
void insertNFTSyntheticInJson(Json::Value &, std::shared_ptr< STTx const > const &, TxMeta const &)
Adds common synthetic fields to transaction-related JSON responses.
void insertMPTokenIssuanceID(Json::Value &response, std::shared_ptr< STTx const > const &transaction, TxMeta const &transactionMeta)
void insertDeliveredAmount(Json::Value &meta, ReadView const &, std::shared_ptr< STTx const > const &serializedTx, TxMeta const &)
Add a delivered_amount field to the meta input/output parameter.
Charge const feeMediumBurdenRPC
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
Keylet account(AccountID const &id) noexcept
AccountID root.
Definition Indexes.cpp:165
Keylet page(uint256 const &root, std::uint64_t index=0) noexcept
A page in a directory.
Definition Indexes.cpp:361
Keylet offer(AccountID const &id, std::uint32_t seq) noexcept
An offer from an account.
Definition Indexes.cpp:255
Rate rate(Env &env, Account const &account, std::uint32_t const &seq)
Definition escrow.cpp:50
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
std::unique_ptr< NetworkOPs > make_NetworkOPs(Application &app, NetworkOPs::clock_type &clock, bool standalone, std::size_t minPeerCount, bool startvalid, JobQueue &job_queue, LedgerMaster &ledgerMaster, ValidatorKeys const &validatorKeys, boost::asio::io_context &io_svc, beast::Journal journal, beast::insight::Collector::ptr const &collector)
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition AccountID.cpp:95
STAmount divide(STAmount const &amount, Rate const &rate)
Definition Rate2.cpp:74
std::shared_ptr< STTx const > sterilize(STTx const &stx)
Sterilize a transaction.
Definition STTx.cpp:842
STAmount accountFunds(ReadView const &view, AccountID const &id, STAmount const &saDefault, FreezeHandling freezeHandling, beast::Journal j)
Definition View.cpp:535
@ fhZERO_IF_FROZEN
Definition View.h:58
@ fhIGNORE_FREEZE
Definition View.h:58
std::uint64_t getQuality(uint256 const &uBase)
Definition Indexes.cpp:130
@ rpcSUCCESS
Definition ErrorCodes.h:25
@ rpcINVALID_PARAMS
Definition ErrorCodes.h:65
@ rpcINTERNAL
Definition ErrorCodes.h:111
std::pair< PublicKey, SecretKey > generateKeyPair(KeyType type, Seed const &seed)
Generate a key pair deterministically.
auto constexpr muldiv_max
Definition mulDiv.h:9
std::unique_ptr< LocalTxs > make_LocalTxs()
Definition LocalTxs.cpp:173
STAmount amountFromQuality(std::uint64_t rate)
Definition STAmount.cpp:965
void handleNewValidation(Application &app, std::shared_ptr< STValidation > const &val, std::string const &source, BypassAccept const bypassAccept, std::optional< beast::Journal > j)
Handle a new validation.
@ warnRPC_EXPIRED_VALIDATOR_LIST
Definition ErrorCodes.h:156
@ warnRPC_UNSUPPORTED_MAJORITY
Definition ErrorCodes.h:154
@ warnRPC_AMENDMENT_BLOCKED
Definition ErrorCodes.h:155
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
std::unique_ptr< FeeVote > make_FeeVote(FeeSetup const &setup, beast::Journal journal)
Create an instance of the FeeVote logic.
OperatingMode
Specifies the mode under which the server believes it's operating.
Definition NetworkOPs.h:49
@ TRACKING
convinced we agree with the network
@ DISCONNECTED
not ready to process requests
@ CONNECTED
convinced we are talking to the network
@ FULL
we have the ledger and can even validate
@ SYNCING
fallen slightly behind
STAmount multiply(STAmount const &amount, Rate const &rate)
Definition Rate2.cpp:34
AccountID calcAccountID(PublicKey const &pk)
@ current
This was a new validation and was added.
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
Json::Value rpcError(int iError)
Definition RPCErr.cpp:12
@ tefPAST_SEQ
Definition TER.h:156
bool isTefFailure(TER x) noexcept
Definition TER.h:647
ConsensusPhase
Phases of consensus for a single ledger round.
static std::array< char const *, 5 > const stateNames
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:11
Rate transferRate(ReadView const &view, AccountID const &issuer)
Returns IOU issuer transfer fee as Rate.
Definition View.cpp:743
void forAllApiVersions(Fn const &fn, Args &&... args)
Definition ApiVersion.h:158
bool isTerRetry(TER x) noexcept
Definition TER.h:653
send_if_pred< Predicate > send_if(std::shared_ptr< Message > const &m, Predicate const &f)
Helper function to aid in type deduction.
Definition predicates.h:56
@ tesSUCCESS
Definition TER.h:226
uint256 getQualityNext(uint256 const &uBase)
Definition Indexes.cpp:122
STAmount accountHolds(ReadView const &view, AccountID const &account, Currency const &currency, AccountID const &issuer, FreezeHandling zeroIfFrozen, beast::Journal j)
Definition View.cpp:368
bool isTesSuccess(TER x) noexcept
Definition TER.h:659
Rules makeRulesGivenLedger(DigestAwareReadView const &ledger, Rules const &current)
Definition ReadView.cpp:50
std::string to_string_iso(date::sys_time< Duration > tp)
Definition chrono.h:73
bool cdirFirst(ReadView const &view, uint256 const &root, std::shared_ptr< SLE const > &page, unsigned int &index, uint256 &entry)
Returns the first entry in the directory, advancing the index.
Definition View.cpp:126
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:611
FeeSetup setup_FeeVote(Section const &section)
Definition Config.cpp:1110
bool isTemMalformed(TER x) noexcept
Definition TER.h:641
Number root(Number f, unsigned d)
Definition Number.cpp:617
std::optional< std::uint64_t > mulDiv(std::uint64_t value, std::uint64_t mul, std::uint64_t div)
Return value*mul/div accurately.
@ tapFAIL_HARD
Definition ApplyView.h:16
@ tapUNLIMITED
Definition ApplyView.h:23
@ tapNONE
Definition ApplyView.h:12
Json::Value getJson(LedgerFill const &fill)
Return a new Json::Value representing the ledger with given options.
@ ledgerMaster
ledger master data for signing
@ proposal
proposal for signing
bool cdirNext(ReadView const &view, uint256 const &root, std::shared_ptr< SLE const > &page, unsigned int &index, uint256 &entry)
Returns the next entry in the directory, advancing the index.
Definition View.cpp:137
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
Definition apply.cpp:25
Seed generateSeed(std::string const &passPhrase)
Generate a seed deterministically.
Definition Seed.cpp:57
constexpr std::size_t maxPoppedTransactions
@ terQUEUED
Definition TER.h:206
bool transResultInfo(TER code, std::string &token, std::string &text)
Definition TER.cpp:230
@ jtNETOP_CLUSTER
Definition Job.h:56
@ jtCLIENT_FEE_CHANGE
Definition Job.h:28
@ jtTRANSACTION
Definition Job.h:43
@ jtTXN_PROC
Definition Job.h:63
@ jtCLIENT_CONSENSUS
Definition Job.h:29
@ jtBATCH
Definition Job.h:46
@ jtCLIENT_ACCT_HIST
Definition Job.h:30
bool isTelLocal(TER x) noexcept
Definition TER.h:635
uint256 getBookBase(Book const &book)
Definition Indexes.cpp:96
constexpr std::uint32_t tfInnerBatchTxn
Definition TxFlags.h:42
Rate const parityRate
A transfer rate signifying a 1:1 exchange.
bool isGlobalFrozen(ReadView const &view, AccountID const &issuer)
Definition View.cpp:163
static std::uint32_t trunc32(std::uint64_t v)
@ temINVALID_FLAG
Definition TER.h:92
@ temBAD_SIGNATURE
Definition TER.h:86
static auto const genesisAccountId
STL namespace.
T owns_lock(T... args)
T ref(T... args)
T reserve(T... args)
T reset(T... args)
T set_intersection(T... args)
T size(T... args)
T str(T... args)
std::string serialized
The manifest in serialized form.
Definition Manifest.h:64
std::uint32_t sequence
The sequence number of this manifest.
Definition Manifest.h:76
std::string domain
The domain, if one was specified in the manifest; empty otherwise.
Definition Manifest.h:79
std::optional< Blob > getSignature() const
Returns manifest signature.
Definition Manifest.cpp:225
std::optional< PublicKey > signingKey
The ephemeral key associated with this manifest.
Definition Manifest.h:73
Blob getMasterSignature() const
Returns manifest master key signature.
Definition Manifest.cpp:236
PublicKey masterKey
The master key associated with this manifest.
Definition Manifest.h:67
Server fees published on server subscription.
bool operator!=(ServerFeeSummary const &b) const
std::optional< TxQ::Metrics > em
bool operator==(ServerFeeSummary const &b) const
beast::insight::Gauge full_transitions
Stats(Handler const &handler, beast::insight::Collector::ptr const &collector)
beast::insight::Hook hook
beast::insight::Gauge connected_duration
beast::insight::Gauge tracking_duration
beast::insight::Gauge connected_transitions
beast::insight::Gauge disconnected_transitions
beast::insight::Gauge syncing_duration
beast::insight::Gauge tracking_transitions
beast::insight::Gauge full_duration
beast::insight::Gauge disconnected_duration
beast::insight::Gauge syncing_transitions
SubAccountHistoryIndex(AccountID const &accountId)
std::shared_ptr< SubAccountHistoryIndex > index_
std::shared_ptr< SubAccountHistoryIndex > index_
Represents a transfer rate.
Definition Rate.h:21
Data format for exchanging consumption information across peers.
Definition Gossip.h:13
std::vector< Item > items
Definition Gossip.h:25
Changes in trusted nodes after updating validator list.
hash_set< NodeID > added
hash_set< NodeID > removed
Structure returned by TxQ::getMetrics, expressed in reference fee level units.
Definition TxQ.h:146
IsMemberResult isMember(char const *key) const
void set(char const *key, auto const &v)
Select all peers (except optional excluded) that are in our cluster.
Definition predicates.h:118
Sends a message to all peers.
Definition predicates.h:13
T swap(T... args)
T time_since_epoch(T... args)
T to_string(T... args)
T unlock(T... args)
T value_or(T... args)
T what(T... args)