rippled
Loading...
Searching...
No Matches
NetworkOPs.cpp
1#include <xrpld/app/consensus/RCLConsensus.h>
2#include <xrpld/app/consensus/RCLValidations.h>
3#include <xrpld/app/ledger/AcceptedLedger.h>
4#include <xrpld/app/ledger/InboundLedgers.h>
5#include <xrpld/app/ledger/LedgerMaster.h>
6#include <xrpld/app/ledger/LedgerToJson.h>
7#include <xrpld/app/ledger/LocalTxs.h>
8#include <xrpld/app/ledger/OpenLedger.h>
9#include <xrpld/app/ledger/OrderBookDB.h>
10#include <xrpld/app/ledger/TransactionMaster.h>
11#include <xrpld/app/main/LoadManager.h>
12#include <xrpld/app/main/Tuning.h>
13#include <xrpld/app/misc/AmendmentTable.h>
14#include <xrpld/app/misc/DeliverMax.h>
15#include <xrpld/app/misc/HashRouter.h>
16#include <xrpld/app/misc/LoadFeeTrack.h>
17#include <xrpld/app/misc/NetworkOPs.h>
18#include <xrpld/app/misc/Transaction.h>
19#include <xrpld/app/misc/TxQ.h>
20#include <xrpld/app/misc/ValidatorKeys.h>
21#include <xrpld/app/misc/ValidatorList.h>
22#include <xrpld/app/misc/detail/AccountTxPaging.h>
23#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
24#include <xrpld/app/tx/apply.h>
25#include <xrpld/consensus/Consensus.h>
26#include <xrpld/consensus/ConsensusParms.h>
27#include <xrpld/overlay/Cluster.h>
28#include <xrpld/overlay/Overlay.h>
29#include <xrpld/overlay/predicates.h>
30#include <xrpld/rpc/BookChanges.h>
31#include <xrpld/rpc/CTID.h>
32#include <xrpld/rpc/DeliveredAmount.h>
33#include <xrpld/rpc/MPTokenIssuanceID.h>
34#include <xrpld/rpc/ServerHandler.h>
35
36#include <xrpl/basics/UptimeClock.h>
37#include <xrpl/basics/mulDiv.h>
38#include <xrpl/basics/safe_cast.h>
39#include <xrpl/basics/scope.h>
40#include <xrpl/beast/utility/rngfill.h>
41#include <xrpl/core/PerfLog.h>
42#include <xrpl/crypto/RFC1751.h>
43#include <xrpl/crypto/csprng.h>
44#include <xrpl/protocol/BuildInfo.h>
45#include <xrpl/protocol/Feature.h>
46#include <xrpl/protocol/MultiApiJson.h>
47#include <xrpl/protocol/NFTSyntheticSerializer.h>
48#include <xrpl/protocol/RPCErr.h>
49#include <xrpl/protocol/TxFlags.h>
50#include <xrpl/protocol/jss.h>
51#include <xrpl/resource/Fees.h>
52#include <xrpl/resource/ResourceManager.h>
53
54#include <boost/asio/ip/host_name.hpp>
55#include <boost/asio/steady_timer.hpp>
56
57#include <algorithm>
58#include <exception>
59#include <mutex>
60#include <optional>
61#include <set>
62#include <sstream>
63#include <string>
64#include <tuple>
65#include <unordered_map>
66
67namespace xrpl {
68
69class NetworkOPsImp final : public NetworkOPs
70{
76 {
77 public:
79 bool const admin;
80 bool const local;
82 bool applied = false;
84
86 : transaction(t), admin(a), local(l), failType(f)
87 {
88 XRPL_ASSERT(
90 "xrpl::NetworkOPsImp::TransactionStatus::TransactionStatus : "
91 "valid inputs");
92 }
93 };
94
98 enum class DispatchState : unsigned char {
99 none,
100 scheduled,
101 running,
102 };
103
105
121 {
129
133 std::chrono::steady_clock::time_point start_ = std::chrono::steady_clock::now();
134 std::chrono::steady_clock::time_point const processStart_ = start_;
137
138 public:
140 {
141 counters_[static_cast<std::size_t>(OperatingMode::DISCONNECTED)].transitions = 1;
142 }
143
150 void
152
158 void
159 json(Json::Value& obj) const;
160
162 {
164 decltype(mode_) mode;
165 decltype(start_) start;
167 };
168
171 {
174 }
175 };
176
179 {
180 ServerFeeSummary() = default;
181
182 ServerFeeSummary(XRPAmount fee, TxQ::Metrics&& escalationMetrics, LoadFeeTrack const& loadFeeTrack);
183 bool
184 operator!=(ServerFeeSummary const& b) const;
185
186 bool
188 {
189 return !(*this != b);
190 }
191
196 };
197
198public:
200 Application& app,
202 bool standalone,
203 std::size_t minPeerCount,
204 bool start_valid,
205 JobQueue& job_queue,
207 ValidatorKeys const& validatorKeys,
208 boost::asio::io_context& io_svc,
209 beast::Journal journal,
210 beast::insight::Collector::ptr const& collector)
211 : app_(app)
212 , m_journal(journal)
215 , heartbeatTimer_(io_svc)
216 , clusterTimer_(io_svc)
217 , accountHistoryTxTimer_(io_svc)
218 , mConsensus(
219 app,
220 make_FeeVote(setup_FeeVote(app_.config().section("voting")), app_.logs().journal("FeeVote")),
222 *m_localTX,
223 app.getInboundTransactions(),
224 beast::get_abstract_clock<std::chrono::steady_clock>(),
225 validatorKeys,
226 app_.logs().journal("LedgerConsensus"))
227 , validatorPK_(validatorKeys.keys ? validatorKeys.keys->publicKey : decltype(validatorPK_){})
228 , validatorMasterPK_(validatorKeys.keys ? validatorKeys.keys->masterPublicKey : decltype(validatorMasterPK_){})
230 , m_job_queue(job_queue)
231 , m_standalone(standalone)
232 , minPeerCount_(start_valid ? 0 : minPeerCount)
233 , m_stats(std::bind(&NetworkOPsImp::collect_metrics, this), collector)
234 {
235 }
236
237 ~NetworkOPsImp() override
238 {
239 // This clear() is necessary to ensure the shared_ptrs in this map get
240 // destroyed NOW because the objects in this map invoke methods on this
241 // class when they are destroyed
243 }
244
245public:
247 getOperatingMode() const override;
248
250 strOperatingMode(OperatingMode const mode, bool const admin) const override;
251
253 strOperatingMode(bool const admin = false) const override;
254
255 //
256 // Transaction operations.
257 //
258
259 // Must complete immediately.
260 void
262
263 void
264 processTransaction(std::shared_ptr<Transaction>& transaction, bool bUnlimited, bool bLocal, FailHard failType)
265 override;
266
267 void
268 processTransactionSet(CanonicalTXSet const& set) override;
269
278 void
279 doTransactionSync(std::shared_ptr<Transaction> transaction, bool bUnlimited, FailHard failType);
280
290 void
291 doTransactionAsync(std::shared_ptr<Transaction> transaction, bool bUnlimited, FailHard failtype);
292
293private:
294 bool
296
297 void
300 std::function<bool(std::unique_lock<std::mutex> const&)> retryCallback);
301
302public:
306 void
308
314 void
316
317 //
318 // Owner functions.
319 //
320
322 getOwnerInfo(std::shared_ptr<ReadView const> lpLedger, AccountID const& account) override;
323
324 //
325 // Book functions.
326 //
327
328 void
331 Book const&,
332 AccountID const& uTakerID,
333 bool const bProof,
334 unsigned int iLimit,
335 Json::Value const& jvMarker,
336 Json::Value& jvResult) override;
337
338 // Ledger proposal/close functions.
339 bool
341
342 bool
343 recvValidation(std::shared_ptr<STValidation> const& val, std::string const& source) override;
344
345 void
346 mapComplete(std::shared_ptr<SHAMap> const& map, bool fromAcquire) override;
347
348 // Network state machine.
349
350 // Used for the "jump" case.
351private:
352 void
354 bool
356
357public:
358 bool
359 beginConsensus(uint256 const& networkClosed, std::unique_ptr<std::stringstream> const& clog) override;
360 void
362 void
363 setStandAlone() override;
364
368 void
369 setStateTimer() override;
370
371 void
372 setNeedNetworkLedger() override;
373 void
374 clearNeedNetworkLedger() override;
375 bool
376 isNeedNetworkLedger() override;
377 bool
378 isFull() override;
379
380 void
381 setMode(OperatingMode om) override;
382
383 bool
384 isBlocked() override;
385 bool
386 isAmendmentBlocked() override;
387 void
388 setAmendmentBlocked() override;
389 bool
390 isAmendmentWarned() override;
391 void
392 setAmendmentWarned() override;
393 void
394 clearAmendmentWarned() override;
395 bool
396 isUNLBlocked() override;
397 void
398 setUNLBlocked() override;
399 void
400 clearUNLBlocked() override;
401 void
402 consensusViewChange() override;
403
405 getConsensusInfo() override;
407 getServerInfo(bool human, bool admin, bool counters) override;
408 void
409 clearLedgerFetch() override;
411 getLedgerFetchInfo() override;
414 void
415 reportFeeChange() override;
416 void
418
419 void
420 updateLocalTx(ReadView const& view) override;
422 getLocalTxCount() override;
423
424 //
425 // Monitoring: publisher side.
426 //
427 void
428 pubLedger(std::shared_ptr<ReadView const> const& lpAccepted) override;
429 void
432 std::shared_ptr<STTx const> const& transaction,
433 TER result) override;
434 void
435 pubValidation(std::shared_ptr<STValidation> const& val) override;
436
437 //--------------------------------------------------------------------------
438 //
439 // InfoSub::Source.
440 //
441 void
442 subAccount(InfoSub::ref ispListener, hash_set<AccountID> const& vnaAccountIDs, bool rt) override;
443 void
444 unsubAccount(InfoSub::ref ispListener, hash_set<AccountID> const& vnaAccountIDs, bool rt) override;
445
446 // Just remove the subscription from the tracking
447 // not from the InfoSub. Needed for InfoSub destruction
448 void
449 unsubAccountInternal(std::uint64_t seq, hash_set<AccountID> const& vnaAccountIDs, bool rt) override;
450
452 subAccountHistory(InfoSub::ref ispListener, AccountID const& account) override;
453 void
454 unsubAccountHistory(InfoSub::ref ispListener, AccountID const& account, bool historyOnly) override;
455
456 void
457 unsubAccountHistoryInternal(std::uint64_t seq, AccountID const& account, bool historyOnly) override;
458
459 bool
460 subLedger(InfoSub::ref ispListener, Json::Value& jvResult) override;
461 bool
462 unsubLedger(std::uint64_t uListener) override;
463
464 bool
465 subBookChanges(InfoSub::ref ispListener) override;
466 bool
467 unsubBookChanges(std::uint64_t uListener) override;
468
469 bool
470 subServer(InfoSub::ref ispListener, Json::Value& jvResult, bool admin) override;
471 bool
472 unsubServer(std::uint64_t uListener) override;
473
474 bool
475 subBook(InfoSub::ref ispListener, Book const&) override;
476 bool
477 unsubBook(std::uint64_t uListener, Book const&) override;
478
479 bool
480 subManifests(InfoSub::ref ispListener) override;
481 bool
482 unsubManifests(std::uint64_t uListener) override;
483 void
484 pubManifest(Manifest const&) override;
485
486 bool
487 subTransactions(InfoSub::ref ispListener) override;
488 bool
489 unsubTransactions(std::uint64_t uListener) override;
490
491 bool
492 subRTTransactions(InfoSub::ref ispListener) override;
493 bool
494 unsubRTTransactions(std::uint64_t uListener) override;
495
496 bool
497 subValidations(InfoSub::ref ispListener) override;
498 bool
499 unsubValidations(std::uint64_t uListener) override;
500
501 bool
502 subPeerStatus(InfoSub::ref ispListener) override;
503 bool
504 unsubPeerStatus(std::uint64_t uListener) override;
505 void
506 pubPeerStatus(std::function<Json::Value(void)> const&) override;
507
508 bool
509 subConsensus(InfoSub::ref ispListener) override;
510 bool
511 unsubConsensus(std::uint64_t uListener) override;
512
514 findRpcSub(std::string const& strUrl) override;
516 addRpcSub(std::string const& strUrl, InfoSub::ref) override;
517 bool
518 tryRemoveRpcSub(std::string const& strUrl) override;
519
520 void
521 stop() override
522 {
523 {
524 try
525 {
526 heartbeatTimer_.cancel();
527 }
528 catch (boost::system::system_error const& e)
529 {
530 JLOG(m_journal.error()) << "NetworkOPs: heartbeatTimer cancel error: " << e.what();
531 }
532
533 try
534 {
535 clusterTimer_.cancel();
536 }
537 catch (boost::system::system_error const& e)
538 {
539 JLOG(m_journal.error()) << "NetworkOPs: clusterTimer cancel error: " << e.what();
540 }
541
542 try
543 {
544 accountHistoryTxTimer_.cancel();
545 }
546 catch (boost::system::system_error const& e)
547 {
548 JLOG(m_journal.error()) << "NetworkOPs: accountHistoryTxTimer cancel error: " << e.what();
549 }
550 }
551 // Make sure that any waitHandlers pending in our timers are done.
552 using namespace std::chrono_literals;
553 waitHandlerCounter_.join("NetworkOPs", 1s, m_journal);
554 }
555
556 void
557 stateAccounting(Json::Value& obj) override;
558
559private:
560 void
561 setTimer(
562 boost::asio::steady_timer& timer,
563 std::chrono::milliseconds const& expiry_time,
564 std::function<void()> onExpire,
565 std::function<void()> onError);
566 void
568 void
570 void
572 void
574
576 transJson(
577 std::shared_ptr<STTx const> const& transaction,
578 TER result,
579 bool validated,
582
583 void
586 AcceptedLedgerTx const& transaction,
587 bool last);
588
589 void
592 AcceptedLedgerTx const& transaction,
593 bool last);
594
595 void
598 std::shared_ptr<STTx const> const& transaction,
599 TER result);
600
601 void
602 pubServer();
603 void
605
607 getHostId(bool forAdmin);
608
609private:
613
614 /*
615 * With a validated ledger to separate history and future, the node
616 * streams historical txns with negative indexes starting from -1,
617 * and streams future txns starting from index 0.
618 * The SubAccountHistoryIndex struct maintains these indexes.
619 * It also has a flag stopHistorical_ for stopping streaming
620 * the historical txns.
621 */
657
661 void
663 void
665 void
667
670
672
674
676
681
683 boost::asio::steady_timer heartbeatTimer_;
684 boost::asio::steady_timer clusterTimer_;
685 boost::asio::steady_timer accountHistoryTxTimer_;
686
688
691
693
695
698
700
702
703 enum SubTypes {
704 sLedger, // Accepted ledgers.
705 sManifests, // Received validator manifests.
706 sServer, // When server changes connectivity state.
707 sTransactions, // All accepted transactions.
708 sRTTransactions, // All proposed and accepted transactions.
709 sValidations, // Received validations.
710 sPeerStatus, // Peer status changes.
711 sConsensusPhase, // Consensus phase
712 sBookChanges, // Per-ledger order book changes
713 sLastEntry // Any new entry must be ADDED ABOVE this one
714 };
715
717
719
721
722 // Whether we are in standalone mode.
723 bool const m_standalone;
724
725 // The number of nodes that we need to consider ourselves connected.
727
728 // Transaction batching.
733
735
738
739private:
740 struct Stats
741 {
742 template <class Handler>
743 Stats(Handler const& handler, beast::insight::Collector::ptr const& collector)
744 : hook(collector->make_hook(handler))
745 , disconnected_duration(collector->make_gauge("State_Accounting", "Disconnected_duration"))
746 , connected_duration(collector->make_gauge("State_Accounting", "Connected_duration"))
747 , syncing_duration(collector->make_gauge("State_Accounting", "Syncing_duration"))
748 , tracking_duration(collector->make_gauge("State_Accounting", "Tracking_duration"))
749 , full_duration(collector->make_gauge("State_Accounting", "Full_duration"))
750 , disconnected_transitions(collector->make_gauge("State_Accounting", "Disconnected_transitions"))
751 , connected_transitions(collector->make_gauge("State_Accounting", "Connected_transitions"))
752 , syncing_transitions(collector->make_gauge("State_Accounting", "Syncing_transitions"))
753 , tracking_transitions(collector->make_gauge("State_Accounting", "Tracking_transitions"))
754 , full_transitions(collector->make_gauge("State_Accounting", "Full_transitions"))
755 {
756 }
757
764
770 };
771
772 std::mutex m_statsMutex; // Mutex to lock m_stats
774
775private:
776 void
778};
779
780//------------------------------------------------------------------------------
781
782static std::array<char const*, 5> const stateNames{{"disconnected", "connected", "syncing", "tracking", "full"}};
783
785
792
793static auto const genesisAccountId =
795
796//------------------------------------------------------------------------------
797inline OperatingMode
799{
800 return mMode;
801}
802
803inline std::string
804NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
805{
806 return strOperatingMode(mMode, admin);
807}
808
809inline void
814
815inline void
820
821inline void
826
827inline bool
832
833inline bool
838
841{
842 static std::string const hostname = boost::asio::ip::host_name();
843
844 if (forAdmin)
845 return hostname;
846
847 // For non-admin uses hash the node public key into a
848 // single RFC1751 word:
849 static std::string const shroudedHostId = [this]() {
850 auto const& id = app_.nodeIdentity();
851
852 return RFC1751::getWordFromBlob(id.first.data(), id.first.size());
853 }();
854
855 return shroudedHostId;
856}
857
858void
860{
862
863 // Only do this work if a cluster is configured
864 if (app_.cluster().size() != 0)
866}
867
868void
870 boost::asio::steady_timer& timer,
871 std::chrono::milliseconds const& expiry_time,
872 std::function<void()> onExpire,
873 std::function<void()> onError)
874{
875 // Only start the timer if waitHandlerCounter_ is not yet joined.
876 if (auto optionalCountedHandler =
877 waitHandlerCounter_.wrap([this, onExpire, onError](boost::system::error_code const& e) {
878 if ((e.value() == boost::system::errc::success) && (!m_job_queue.isStopped()))
879 {
880 onExpire();
881 }
882 // Recover as best we can if an unexpected error occurs.
883 if (e.value() != boost::system::errc::success && e.value() != boost::asio::error::operation_aborted)
884 {
885 // Try again later and hope for the best.
886 JLOG(m_journal.error()) << "Timer got error '" << e.message() << "'. Restarting timer.";
887 onError();
888 }
889 }))
890 {
891 timer.expires_after(expiry_time);
892 timer.async_wait(std::move(*optionalCountedHandler));
893 }
894}
895
896void
897NetworkOPsImp::setHeartbeatTimer()
898{
899 setTimer(
900 heartbeatTimer_,
901 mConsensus.parms().ledgerGRANULARITY,
902 [this]() { m_job_queue.addJob(jtNETOP_TIMER, "NetHeart", [this]() { processHeartbeatTimer(); }); },
903 [this]() { setHeartbeatTimer(); });
904}
905
906void
907NetworkOPsImp::setClusterTimer()
908{
909 using namespace std::chrono_literals;
910
911 setTimer(
912 clusterTimer_,
913 10s,
914 [this]() { m_job_queue.addJob(jtNETOP_CLUSTER, "NetCluster", [this]() { processClusterTimer(); }); },
915 [this]() { setClusterTimer(); });
916}
917
918void
919NetworkOPsImp::setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
920{
921 JLOG(m_journal.debug()) << "Scheduling AccountHistory job for account " << toBase58(subInfo.index_->accountId_);
922 using namespace std::chrono_literals;
923 setTimer(
924 accountHistoryTxTimer_,
925 4s,
926 [this, subInfo]() { addAccountHistoryJob(subInfo); },
927 [this, subInfo]() { setAccountHistoryJobTimer(subInfo); });
928}
929
930void
931NetworkOPsImp::processHeartbeatTimer()
932{
933 RclConsensusLogger clog("Heartbeat Timer", mConsensus.validating(), m_journal);
934 {
935 std::unique_lock lock{app_.getMasterMutex()};
936
937 // VFALCO NOTE This is for diagnosing a crash on exit
938 LoadManager& mgr(app_.getLoadManager());
939 mgr.heartbeat();
940
941 std::size_t const numPeers = app_.overlay().size();
942
943 // do we have sufficient peers? If not, we are disconnected.
944 if (numPeers < minPeerCount_)
945 {
946 if (mMode != OperatingMode::DISCONNECTED)
947 {
948 setMode(OperatingMode::DISCONNECTED);
950 ss << "Node count (" << numPeers << ") has fallen "
951 << "below required minimum (" << minPeerCount_ << ").";
952 JLOG(m_journal.warn()) << ss.str();
953 CLOG(clog.ss()) << "set mode to DISCONNECTED: " << ss.str();
954 }
955 else
956 {
957 CLOG(clog.ss()) << "already DISCONNECTED. too few peers (" << numPeers << "), need at least "
958 << minPeerCount_;
959 }
960
961 // MasterMutex lock need not be held to call setHeartbeatTimer()
962 lock.unlock();
963 // We do not call mConsensus.timerEntry until there are enough
964 // peers providing meaningful inputs to consensus
965 setHeartbeatTimer();
966
967 return;
968 }
969
970 if (mMode == OperatingMode::DISCONNECTED)
971 {
972 setMode(OperatingMode::CONNECTED);
973 JLOG(m_journal.info()) << "Node count (" << numPeers << ") is sufficient.";
974 CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers << " peers. ";
975 }
976
977 // Check if the last validated ledger forces a change between these
978 // states.
979 auto origMode = mMode.load();
980 CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
981 if (mMode == OperatingMode::SYNCING)
982 setMode(OperatingMode::SYNCING);
983 else if (mMode == OperatingMode::CONNECTED)
984 setMode(OperatingMode::CONNECTED);
985 auto newMode = mMode.load();
986 if (origMode != newMode)
987 {
988 CLOG(clog.ss()) << ", changing to " << strOperatingMode(newMode, true);
989 }
990 CLOG(clog.ss()) << ". ";
991 }
992
993 mConsensus.timerEntry(app_.timeKeeper().closeTime(), clog.ss());
994
995 CLOG(clog.ss()) << "consensus phase " << to_string(mLastConsensusPhase);
996 ConsensusPhase const currPhase = mConsensus.phase();
997 if (mLastConsensusPhase != currPhase)
998 {
999 reportConsensusStateChange(currPhase);
1000 mLastConsensusPhase = currPhase;
1001 CLOG(clog.ss()) << " changed to " << to_string(mLastConsensusPhase);
1002 }
1003 CLOG(clog.ss()) << ". ";
1004
1005 setHeartbeatTimer();
1006}
1007
1008void
1009NetworkOPsImp::processClusterTimer()
1010{
1011 if (app_.cluster().size() == 0)
1012 return;
1013
1014 using namespace std::chrono_literals;
1015
1016 bool const update = app_.cluster().update(
1017 app_.nodeIdentity().first,
1018 "",
1019 (m_ledgerMaster.getValidatedLedgerAge() <= 4min) ? app_.getFeeTrack().getLocalFee() : 0,
1020 app_.timeKeeper().now());
1021
1022 if (!update)
1023 {
1024 JLOG(m_journal.debug()) << "Too soon to send cluster update";
1025 setClusterTimer();
1026 return;
1027 }
1028
1029 protocol::TMCluster cluster;
1030 app_.cluster().for_each([&cluster](ClusterNode const& node) {
1031 protocol::TMClusterNode& n = *cluster.add_clusternodes();
1032 n.set_publickey(toBase58(TokenType::NodePublic, node.identity()));
1033 n.set_reporttime(node.getReportTime().time_since_epoch().count());
1034 n.set_nodeload(node.getLoadFee());
1035 if (!node.name().empty())
1036 n.set_nodename(node.name());
1037 });
1038
1039 Resource::Gossip gossip = app_.getResourceManager().exportConsumers();
1040 for (auto& item : gossip.items)
1041 {
1042 protocol::TMLoadSource& node = *cluster.add_loadsources();
1043 node.set_name(to_string(item.address));
1044 node.set_cost(item.balance);
1045 }
1046 app_.overlay().foreach(send_if(std::make_shared<Message>(cluster, protocol::mtCLUSTER), peer_in_cluster()));
1047 setClusterTimer();
1048}
1049
1050//------------------------------------------------------------------------------
1051
1053NetworkOPsImp::strOperatingMode(OperatingMode const mode, bool const admin) const
1054{
1055 if (mode == OperatingMode::FULL && admin)
1056 {
1057 auto const consensusMode = mConsensus.mode();
1058 if (consensusMode != ConsensusMode::wrongLedger)
1059 {
1060 if (consensusMode == ConsensusMode::proposing)
1061 return "proposing";
1062
1063 if (mConsensus.validating())
1064 return "validating";
1065 }
1066 }
1067
1068 return states_[static_cast<std::size_t>(mode)];
1069}
1070
1071void
1072NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
1073{
1074 if (isNeedNetworkLedger())
1075 {
1076 // Nothing we can do if we've never been in sync
1077 return;
1078 }
1079
1080 // Enforce Network bar for batch txn
1081 if (iTrans->isFlag(tfInnerBatchTxn) && m_ledgerMaster.getValidatedRules().enabled(featureBatch))
1082 {
1083 JLOG(m_journal.error()) << "Submitted transaction invalid: tfInnerBatchTxn flag present.";
1084 return;
1085 }
1086
1087 // this is an asynchronous interface
1088 auto const trans = sterilize(*iTrans);
1089
1090 auto const txid = trans->getTransactionID();
1091 auto const flags = app_.getHashRouter().getFlags(txid);
1092
1093 if ((flags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
1094 {
1095 JLOG(m_journal.warn()) << "Submitted transaction cached bad";
1096 return;
1097 }
1098
1099 try
1100 {
1101 auto const [validity, reason] =
1102 checkValidity(app_.getHashRouter(), *trans, m_ledgerMaster.getValidatedRules(), app_.config());
1103
1104 if (validity != Validity::Valid)
1105 {
1106 JLOG(m_journal.warn()) << "Submitted transaction invalid: " << reason;
1107 return;
1108 }
1109 }
1110 catch (std::exception const& ex)
1111 {
1112 JLOG(m_journal.warn()) << "Exception checking transaction " << txid << ": " << ex.what();
1113
1114 return;
1115 }
1116
1117 std::string reason;
1118
1119 auto tx = std::make_shared<Transaction>(trans, reason, app_);
1120
1121 m_job_queue.addJob(jtTRANSACTION, "SubmitTxn", [this, tx]() {
1122 auto t = tx;
1123 processTransaction(t, false, false, FailHard::no);
1124 });
1125}
1126
1127bool
1128NetworkOPsImp::preProcessTransaction(std::shared_ptr<Transaction>& transaction)
1129{
1130 auto const newFlags = app_.getHashRouter().getFlags(transaction->getID());
1131
1132 if ((newFlags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
1133 {
1134 // cached bad
1135 JLOG(m_journal.warn()) << transaction->getID() << ": cached bad!\n";
1136 transaction->setStatus(INVALID);
1137 transaction->setResult(temBAD_SIGNATURE);
1138 return false;
1139 }
1140
1141 auto const view = m_ledgerMaster.getCurrentLedger();
1142
1143 // This function is called by several different parts of the codebase
1144 // under no circumstances will we ever accept an inner txn within a batch
1145 // txn from the network.
1146 auto const sttx = *transaction->getSTransaction();
1147 if (sttx.isFlag(tfInnerBatchTxn) && view->rules().enabled(featureBatch))
1148 {
1149 transaction->setStatus(INVALID);
1150 transaction->setResult(temINVALID_FLAG);
1151 app_.getHashRouter().setFlags(transaction->getID(), HashRouterFlags::BAD);
1152 return false;
1153 }
1154
1155 // NOTE ximinez - I think this check is redundant,
1156 // but I'm not 100% sure yet.
1157 // If so, only cost is looking up HashRouter flags.
1158 auto const [validity, reason] = checkValidity(app_.getHashRouter(), sttx, view->rules(), app_.config());
1159 XRPL_ASSERT(validity == Validity::Valid, "xrpl::NetworkOPsImp::processTransaction : valid validity");
1160
1161 // Not concerned with local checks at this point.
1162 if (validity == Validity::SigBad)
1163 {
1164 JLOG(m_journal.info()) << "Transaction has bad signature: " << reason;
1165 transaction->setStatus(INVALID);
1166 transaction->setResult(temBAD_SIGNATURE);
1167 app_.getHashRouter().setFlags(transaction->getID(), HashRouterFlags::BAD);
1168 return false;
1169 }
1170
1171 // canonicalize can change our pointer
1172 app_.getMasterTransaction().canonicalize(&transaction);
1173
1174 return true;
1175}
1176
1177void
1178NetworkOPsImp::processTransaction(
1179 std::shared_ptr<Transaction>& transaction,
1180 bool bUnlimited,
1181 bool bLocal,
1182 FailHard failType)
1183{
1184 auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN");
1185
1186 // preProcessTransaction can change our pointer
1187 if (!preProcessTransaction(transaction))
1188 return;
1189
1190 if (bLocal)
1191 doTransactionSync(transaction, bUnlimited, failType);
1192 else
1193 doTransactionAsync(transaction, bUnlimited, failType);
1194}
1195
1196void
1197NetworkOPsImp::doTransactionAsync(std::shared_ptr<Transaction> transaction, bool bUnlimited, FailHard failType)
1198{
1199 std::lock_guard lock(mMutex);
1200
1201 if (transaction->getApplying())
1202 return;
1203
1204 mTransactions.push_back(TransactionStatus(transaction, bUnlimited, false, failType));
1205 transaction->setApplying();
1206
1207 if (mDispatchState == DispatchState::none)
1208 {
1209 if (m_job_queue.addJob(jtBATCH, "TxBatchAsync", [this]() { transactionBatch(); }))
1210 {
1211 mDispatchState = DispatchState::scheduled;
1212 }
1213 }
1214}
1215
1216void
1217NetworkOPsImp::doTransactionSync(std::shared_ptr<Transaction> transaction, bool bUnlimited, FailHard failType)
1218{
1219 std::unique_lock<std::mutex> lock(mMutex);
1220
1221 if (!transaction->getApplying())
1222 {
1223 mTransactions.push_back(TransactionStatus(transaction, bUnlimited, true, failType));
1224 transaction->setApplying();
1225 }
1226
1227 doTransactionSyncBatch(
1228 lock, [&transaction](std::unique_lock<std::mutex> const&) { return transaction->getApplying(); });
1229}
1230
1231void
1232NetworkOPsImp::doTransactionSyncBatch(
1234 std::function<bool(std::unique_lock<std::mutex> const&)> retryCallback)
1235{
1236 do
1237 {
1238 if (mDispatchState == DispatchState::running)
1239 {
1240 // A batch processing job is already running, so wait.
1241 mCond.wait(lock);
1242 }
1243 else
1244 {
1245 apply(lock);
1246
1247 if (mTransactions.size())
1248 {
1249 // More transactions need to be applied, but by another job.
1250 if (m_job_queue.addJob(jtBATCH, "TxBatchSync", [this]() { transactionBatch(); }))
1251 {
1252 mDispatchState = DispatchState::scheduled;
1253 }
1254 }
1255 }
1256 } while (retryCallback(lock));
1257}
1258
1259void
1260NetworkOPsImp::processTransactionSet(CanonicalTXSet const& set)
1261{
1262 auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXNSet");
1264 candidates.reserve(set.size());
1265 for (auto const& [_, tx] : set)
1266 {
1267 std::string reason;
1268 auto transaction = std::make_shared<Transaction>(tx, reason, app_);
1269
1270 if (transaction->getStatus() == INVALID)
1271 {
1272 if (!reason.empty())
1273 {
1274 JLOG(m_journal.trace()) << "Exception checking transaction: " << reason;
1275 }
1276 app_.getHashRouter().setFlags(tx->getTransactionID(), HashRouterFlags::BAD);
1277 continue;
1278 }
1279
1280 // preProcessTransaction can change our pointer
1281 if (!preProcessTransaction(transaction))
1282 continue;
1283
1284 candidates.emplace_back(transaction);
1285 }
1286
1287 std::vector<TransactionStatus> transactions;
1288 transactions.reserve(candidates.size());
1289
1290 std::unique_lock lock(mMutex);
1291
1292 for (auto& transaction : candidates)
1293 {
1294 if (!transaction->getApplying())
1295 {
1296 transactions.emplace_back(transaction, false, false, FailHard::no);
1297 transaction->setApplying();
1298 }
1299 }
1300
1301 if (mTransactions.empty())
1302 mTransactions.swap(transactions);
1303 else
1304 {
1305 mTransactions.reserve(mTransactions.size() + transactions.size());
1306 for (auto& t : transactions)
1307 mTransactions.push_back(std::move(t));
1308 }
1309 if (mTransactions.empty())
1310 {
1311 JLOG(m_journal.debug()) << "No transaction to process!";
1312 return;
1313 }
1314
1315 doTransactionSyncBatch(lock, [&](std::unique_lock<std::mutex> const&) {
1316 XRPL_ASSERT(lock.owns_lock(), "xrpl::NetworkOPsImp::processTransactionSet has lock");
1317 return std::any_of(
1318 mTransactions.begin(), mTransactions.end(), [](auto const& t) { return t.transaction->getApplying(); });
1319 });
1320}
1321
1322void
1323NetworkOPsImp::transactionBatch()
1324{
1325 std::unique_lock<std::mutex> lock(mMutex);
1326
1327 if (mDispatchState == DispatchState::running)
1328 return;
1329
1330 while (mTransactions.size())
1331 {
1332 apply(lock);
1333 }
1334}
1335
1336void
1337NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
1338{
1340 std::vector<TransactionStatus> transactions;
1341 mTransactions.swap(transactions);
1342 XRPL_ASSERT(!transactions.empty(), "xrpl::NetworkOPsImp::apply : non-empty transactions");
1343 XRPL_ASSERT(mDispatchState != DispatchState::running, "xrpl::NetworkOPsImp::apply : is not running");
1344
1345 mDispatchState = DispatchState::running;
1346
1347 batchLock.unlock();
1348
1349 {
1350 std::unique_lock masterLock{app_.getMasterMutex(), std::defer_lock};
1351 bool changed = false;
1352 {
1353 std::unique_lock ledgerLock{m_ledgerMaster.peekMutex(), std::defer_lock};
1354 std::lock(masterLock, ledgerLock);
1355
1356 app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
1357 for (TransactionStatus& e : transactions)
1358 {
1359 // we check before adding to the batch
1360 ApplyFlags flags = tapNONE;
1361 if (e.admin)
1362 flags |= tapUNLIMITED;
1363
1364 if (e.failType == FailHard::yes)
1365 flags |= tapFAIL_HARD;
1366
1367 auto const result = app_.getTxQ().apply(app_, view, e.transaction->getSTransaction(), flags, j);
1368 e.result = result.ter;
1369 e.applied = result.applied;
1370 changed = changed || result.applied;
1371 }
1372 return changed;
1373 });
1374 }
1375 if (changed)
1376 reportFeeChange();
1377
1378 std::optional<LedgerIndex> validatedLedgerIndex;
1379 if (auto const l = m_ledgerMaster.getValidatedLedger())
1380 validatedLedgerIndex = l->header().seq;
1381
1382 auto newOL = app_.openLedger().current();
1383 for (TransactionStatus& e : transactions)
1384 {
1385 e.transaction->clearSubmitResult();
1386
1387 if (e.applied)
1388 {
1389 pubProposedTransaction(newOL, e.transaction->getSTransaction(), e.result);
1390 e.transaction->setApplied();
1391 }
1392
1393 e.transaction->setResult(e.result);
1394
1395 if (isTemMalformed(e.result))
1396 app_.getHashRouter().setFlags(e.transaction->getID(), HashRouterFlags::BAD);
1397
1398#ifdef DEBUG
1399 if (e.result != tesSUCCESS)
1400 {
1401 std::string token, human;
1402
1403 if (transResultInfo(e.result, token, human))
1404 {
1405 JLOG(m_journal.info()) << "TransactionResult: " << token << ": " << human;
1406 }
1407 }
1408#endif
1409
1410 bool addLocal = e.local;
1411
1412 if (e.result == tesSUCCESS)
1413 {
1414 JLOG(m_journal.debug()) << "Transaction is now included in open ledger";
1415 e.transaction->setStatus(INCLUDED);
1416
1417 // Pop as many "reasonable" transactions for this account as
1418 // possible. "Reasonable" means they have sequential sequence
1419 // numbers, or use tickets.
1420 auto const& txCur = e.transaction->getSTransaction();
1421
1422 std::size_t count = 0;
1423 for (auto txNext = m_ledgerMaster.popAcctTransaction(txCur); txNext && count < maxPoppedTransactions;
1424 txNext = m_ledgerMaster.popAcctTransaction(txCur), ++count)
1425 {
1426 if (!batchLock.owns_lock())
1427 batchLock.lock();
1428 std::string reason;
1429 auto const trans = sterilize(*txNext);
1430 auto t = std::make_shared<Transaction>(trans, reason, app_);
1431 if (t->getApplying())
1432 break;
1433 submit_held.emplace_back(t, false, false, FailHard::no);
1434 t->setApplying();
1435 }
1436 if (batchLock.owns_lock())
1437 batchLock.unlock();
1438 }
1439 else if (e.result == tefPAST_SEQ)
1440 {
1441 // duplicate or conflict
1442 JLOG(m_journal.info()) << "Transaction is obsolete";
1443 e.transaction->setStatus(OBSOLETE);
1444 }
1445 else if (e.result == terQUEUED)
1446 {
1447 JLOG(m_journal.debug()) << "Transaction is likely to claim a"
1448 << " fee, but is queued until fee drops";
1449
1450 e.transaction->setStatus(HELD);
1451 // Add to held transactions, because it could get
1452 // kicked out of the queue, and this will try to
1453 // put it back.
1454 m_ledgerMaster.addHeldTransaction(e.transaction);
1455 e.transaction->setQueued();
1456 e.transaction->setKept();
1457 }
1458 else if (isTerRetry(e.result) || isTelLocal(e.result) || isTefFailure(e.result))
1459 {
1460 if (e.failType != FailHard::yes)
1461 {
1462 auto const lastLedgerSeq = e.transaction->getSTransaction()->at(~sfLastLedgerSequence);
1463 auto const ledgersLeft = lastLedgerSeq ? *lastLedgerSeq - m_ledgerMaster.getCurrentLedgerIndex()
1465 // If any of these conditions are met, the transaction can
1466 // be held:
1467 // 1. It was submitted locally. (Note that this flag is only
1468 // true on the initial submission.)
1469 // 2. The transaction has a LastLedgerSequence, and the
1470 // LastLedgerSequence is fewer than LocalTxs::holdLedgers
1471 // (5) ledgers into the future. (Remember that an
1472 // unseated optional compares as less than all seated
1473 // values, so it has to be checked explicitly first.)
1474 // 3. The HashRouterFlags::BAD flag is not set on the txID.
1475 // (setFlags
1476 // checks before setting. If the flag is set, it returns
1477 // false, which means it's been held once without one of
1478 // the other conditions, so don't hold it again. Time's
1479 // up!)
1480 //
1481 if (e.local || (ledgersLeft && ledgersLeft <= LocalTxs::holdLedgers) ||
1482 app_.getHashRouter().setFlags(e.transaction->getID(), HashRouterFlags::HELD))
1483 {
1484 // transaction should be held
1485 JLOG(m_journal.debug()) << "Transaction should be held: " << e.result;
1486 e.transaction->setStatus(HELD);
1487 m_ledgerMaster.addHeldTransaction(e.transaction);
1488 e.transaction->setKept();
1489 }
1490 else
1491 JLOG(m_journal.debug())
1492 << "Not holding transaction " << e.transaction->getID() << ": "
1493 << (e.local ? "local" : "network") << ", "
1494 << "result: " << e.result
1495 << " ledgers left: " << (ledgersLeft ? to_string(*ledgersLeft) : "unspecified");
1496 }
1497 }
1498 else
1499 {
1500 JLOG(m_journal.debug()) << "Status other than success " << e.result;
1501 e.transaction->setStatus(INVALID);
1502 }
1503
1504 auto const enforceFailHard = e.failType == FailHard::yes && !isTesSuccess(e.result);
1505
1506 if (addLocal && !enforceFailHard)
1507 {
1508 m_localTX->push_back(m_ledgerMaster.getCurrentLedgerIndex(), e.transaction->getSTransaction());
1509 e.transaction->setKept();
1510 }
1511
1512 if ((e.applied || ((mMode != OperatingMode::FULL) && (e.failType != FailHard::yes) && e.local) ||
1513 (e.result == terQUEUED)) &&
1514 !enforceFailHard)
1515 {
1516 auto const toSkip = app_.getHashRouter().shouldRelay(e.transaction->getID());
1517 if (auto const sttx = *(e.transaction->getSTransaction()); toSkip &&
1518 // Skip relaying if it's an inner batch txn. The flag should
1519 // only be set if the Batch feature is enabled. If Batch is
1520 // not enabled, the flag is always invalid, so don't relay
1521 // it regardless.
1522 !(sttx.isFlag(tfInnerBatchTxn)))
1523 {
1524 protocol::TMTransaction tx;
1525 Serializer s;
1526
1527 sttx.add(s);
1528 tx.set_rawtransaction(s.data(), s.size());
1529 tx.set_status(protocol::tsCURRENT);
1530 tx.set_receivetimestamp(app_.timeKeeper().now().time_since_epoch().count());
1531 tx.set_deferred(e.result == terQUEUED);
1532 // FIXME: This should be when we received it
1533 app_.overlay().relay(e.transaction->getID(), tx, *toSkip);
1534 e.transaction->setBroadcast();
1535 }
1536 }
1537
1538 if (validatedLedgerIndex)
1539 {
1540 auto [fee, accountSeq, availableSeq] =
1541 app_.getTxQ().getTxRequiredFeeAndSeq(*newOL, e.transaction->getSTransaction());
1542 e.transaction->setCurrentLedgerState(*validatedLedgerIndex, fee, accountSeq, availableSeq);
1543 }
1544 }
1545 }
1546
1547 batchLock.lock();
1548
1549 for (TransactionStatus& e : transactions)
1550 e.transaction->clearApplying();
1551
1552 if (!submit_held.empty())
1553 {
1554 if (mTransactions.empty())
1555 mTransactions.swap(submit_held);
1556 else
1557 {
1558 mTransactions.reserve(mTransactions.size() + submit_held.size());
1559 for (auto& e : submit_held)
1560 mTransactions.push_back(std::move(e));
1561 }
1562 }
1563
1564 mCond.notify_all();
1565
1566 mDispatchState = DispatchState::none;
1567}
1568
1569//
1570// Owner functions
1571//
1572
1574NetworkOPsImp::getOwnerInfo(std::shared_ptr<ReadView const> lpLedger, AccountID const& account)
1575{
1576 Json::Value jvObjects(Json::objectValue);
1577 auto root = keylet::ownerDir(account);
1578 auto sleNode = lpLedger->read(keylet::page(root));
1579 if (sleNode)
1580 {
1581 std::uint64_t uNodeDir;
1582
1583 do
1584 {
1585 for (auto const& uDirEntry : sleNode->getFieldV256(sfIndexes))
1586 {
1587 auto sleCur = lpLedger->read(keylet::child(uDirEntry));
1588 XRPL_ASSERT(sleCur, "xrpl::NetworkOPsImp::getOwnerInfo : non-null child SLE");
1589
1590 switch (sleCur->getType())
1591 {
1592 case ltOFFER:
1593 if (!jvObjects.isMember(jss::offers))
1594 jvObjects[jss::offers] = Json::Value(Json::arrayValue);
1595
1596 jvObjects[jss::offers].append(sleCur->getJson(JsonOptions::none));
1597 break;
1598
1599 case ltRIPPLE_STATE:
1600 if (!jvObjects.isMember(jss::ripple_lines))
1601 {
1602 jvObjects[jss::ripple_lines] = Json::Value(Json::arrayValue);
1603 }
1604
1605 jvObjects[jss::ripple_lines].append(sleCur->getJson(JsonOptions::none));
1606 break;
1607
1608 case ltACCOUNT_ROOT:
1609 case ltDIR_NODE:
1610 // LCOV_EXCL_START
1611 default:
1612 UNREACHABLE(
1613 "xrpl::NetworkOPsImp::getOwnerInfo : invalid "
1614 "type");
1615 break;
1616 // LCOV_EXCL_STOP
1617 }
1618 }
1619
1620 uNodeDir = sleNode->getFieldU64(sfIndexNext);
1621
1622 if (uNodeDir)
1623 {
1624 sleNode = lpLedger->read(keylet::page(root, uNodeDir));
1625 XRPL_ASSERT(sleNode, "xrpl::NetworkOPsImp::getOwnerInfo : read next page");
1626 }
1627 } while (uNodeDir);
1628 }
1629
1630 return jvObjects;
1631}
1632
1633//
1634// Other
1635//
1636
1637inline bool
1638NetworkOPsImp::isBlocked()
1639{
1640 return isAmendmentBlocked() || isUNLBlocked();
1641}
1642
1643inline bool
1644NetworkOPsImp::isAmendmentBlocked()
1645{
1646 return amendmentBlocked_;
1647}
1648
1649void
1650NetworkOPsImp::setAmendmentBlocked()
1651{
1652 amendmentBlocked_ = true;
1653 setMode(OperatingMode::CONNECTED);
1654}
1655
1656inline bool
1657NetworkOPsImp::isAmendmentWarned()
1658{
1659 return !amendmentBlocked_ && amendmentWarned_;
1660}
1661
1662inline void
1663NetworkOPsImp::setAmendmentWarned()
1664{
1665 amendmentWarned_ = true;
1666}
1667
1668inline void
1669NetworkOPsImp::clearAmendmentWarned()
1670{
1671 amendmentWarned_ = false;
1672}
1673
1674inline bool
1675NetworkOPsImp::isUNLBlocked()
1676{
1677 return unlBlocked_;
1678}
1679
1680void
1681NetworkOPsImp::setUNLBlocked()
1682{
1683 unlBlocked_ = true;
1684 setMode(OperatingMode::CONNECTED);
1685}
1686
1687inline void
1688NetworkOPsImp::clearUNLBlocked()
1689{
1690 unlBlocked_ = false;
1691}
1692
1693bool
1694NetworkOPsImp::checkLastClosedLedger(Overlay::PeerSequence const& peerList, uint256& networkClosed)
1695{
1696 // Returns true if there's an *abnormal* ledger issue, normal changing in
1697 // TRACKING mode should return false. Do we have sufficient validations for
1698 // our last closed ledger? Or do sufficient nodes agree? And do we have no
1699 // better ledger available? If so, we are either tracking or full.
1700
1701 JLOG(m_journal.trace()) << "NetworkOPsImp::checkLastClosedLedger";
1702
1703 auto const ourClosed = m_ledgerMaster.getClosedLedger();
1704
1705 if (!ourClosed)
1706 return false;
1707
1708 uint256 closedLedger = ourClosed->header().hash;
1709 uint256 prevClosedLedger = ourClosed->header().parentHash;
1710 JLOG(m_journal.trace()) << "OurClosed: " << closedLedger;
1711 JLOG(m_journal.trace()) << "PrevClosed: " << prevClosedLedger;
1712
1713 //-------------------------------------------------------------------------
1714 // Determine preferred last closed ledger
1715
1716 auto& validations = app_.getValidations();
1717 JLOG(m_journal.debug()) << "ValidationTrie " << Json::Compact(validations.getJsonTrie());
1718
1719 // Will rely on peer LCL if no trusted validations exist
1721 peerCounts[closedLedger] = 0;
1722 if (mMode >= OperatingMode::TRACKING)
1723 peerCounts[closedLedger]++;
1724
1725 for (auto& peer : peerList)
1726 {
1727 uint256 peerLedger = peer->getClosedLedgerHash();
1728
1729 if (peerLedger.isNonZero())
1730 ++peerCounts[peerLedger];
1731 }
1732
1733 for (auto const& it : peerCounts)
1734 JLOG(m_journal.debug()) << "L: " << it.first << " n=" << it.second;
1735
1736 uint256 preferredLCL = validations.getPreferredLCL(
1737 RCLValidatedLedger{ourClosed, validations.adaptor().journal()},
1738 m_ledgerMaster.getValidLedgerIndex(),
1739 peerCounts);
1740
1741 bool switchLedgers = preferredLCL != closedLedger;
1742 if (switchLedgers)
1743 closedLedger = preferredLCL;
1744 //-------------------------------------------------------------------------
1745 if (switchLedgers && (closedLedger == prevClosedLedger))
1746 {
1747 // don't switch to our own previous ledger
1748 JLOG(m_journal.info()) << "We won't switch to our own previous ledger";
1749 networkClosed = ourClosed->header().hash;
1750 switchLedgers = false;
1751 }
1752 else
1753 networkClosed = closedLedger;
1754
1755 if (!switchLedgers)
1756 return false;
1757
1758 auto consensus = m_ledgerMaster.getLedgerByHash(closedLedger);
1759
1760 if (!consensus)
1761 consensus = app_.getInboundLedgers().acquire(closedLedger, 0, InboundLedger::Reason::CONSENSUS);
1762
1763 if (consensus &&
1764 (!m_ledgerMaster.canBeCurrent(consensus) ||
1765 !m_ledgerMaster.isCompatible(*consensus, m_journal.debug(), "Not switching")))
1766 {
1767 // Don't switch to a ledger not on the validated chain
1768 // or with an invalid close time or sequence
1769 networkClosed = ourClosed->header().hash;
1770 return false;
1771 }
1772
1773 JLOG(m_journal.warn()) << "We are not running on the consensus ledger";
1774 JLOG(m_journal.info()) << "Our LCL: " << ourClosed->header().hash << getJson({*ourClosed, {}});
1775 JLOG(m_journal.info()) << "Net LCL " << closedLedger;
1776
1777 if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
1778 {
1779 setMode(OperatingMode::CONNECTED);
1780 }
1781
1782 if (consensus)
1783 {
1784 // FIXME: If this rewinds the ledger sequence, or has the same
1785 // sequence, we should update the status on any stored transactions
1786 // in the invalidated ledgers.
1787 switchLastClosedLedger(consensus);
1788 }
1789
1790 return true;
1791}
1792
1793void
1794NetworkOPsImp::switchLastClosedLedger(std::shared_ptr<Ledger const> const& newLCL)
1795{
1796 // set the newLCL as our last closed ledger -- this is abnormal code
1797 JLOG(m_journal.error()) << "JUMP last closed ledger to " << newLCL->header().hash;
1798
1799 clearNeedNetworkLedger();
1800
1801 // Update fee computations.
1802 app_.getTxQ().processClosedLedger(app_, *newLCL, true);
1803
1804 // Caller must own master lock
1805 {
1806 // Apply tx in old open ledger to new
1807 // open ledger. Then apply local tx.
1808
1809 auto retries = m_localTX->getTxSet();
1810 auto const lastVal = app_.getLedgerMaster().getValidatedLedger();
1812 if (lastVal)
1813 rules = makeRulesGivenLedger(*lastVal, app_.config().features);
1814 else
1815 rules.emplace(app_.config().features);
1816 app_.openLedger().accept(
1817 app_,
1818 *rules,
1819 newLCL,
1820 OrderedTxs({}),
1821 false,
1822 retries,
1823 tapNONE,
1824 "jump",
1825 [&](OpenView& view, beast::Journal j) {
1826 // Stuff the ledger with transactions from the queue.
1827 return app_.getTxQ().accept(app_, view);
1828 });
1829 }
1830
1831 m_ledgerMaster.switchLCL(newLCL);
1832
1833 protocol::TMStatusChange s;
1834 s.set_newevent(protocol::neSWITCHED_LEDGER);
1835 s.set_ledgerseq(newLCL->header().seq);
1836 s.set_networktime(app_.timeKeeper().now().time_since_epoch().count());
1837 s.set_ledgerhashprevious(newLCL->header().parentHash.begin(), newLCL->header().parentHash.size());
1838 s.set_ledgerhash(newLCL->header().hash.begin(), newLCL->header().hash.size());
1839
1840 app_.overlay().foreach(send_always(std::make_shared<Message>(s, protocol::mtSTATUS_CHANGE)));
1841}
1842
1843bool
1844NetworkOPsImp::beginConsensus(uint256 const& networkClosed, std::unique_ptr<std::stringstream> const& clog)
1845{
1846 XRPL_ASSERT(networkClosed.isNonZero(), "xrpl::NetworkOPsImp::beginConsensus : nonzero input");
1847
1848 auto closingInfo = m_ledgerMaster.getCurrentLedger()->header();
1849
1850 JLOG(m_journal.info()) << "Consensus time for #" << closingInfo.seq << " with LCL " << closingInfo.parentHash;
1851
1852 auto prevLedger = m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);
1853
1854 if (!prevLedger)
1855 {
1856 // this shouldn't happen unless we jump ledgers
1857 if (mMode == OperatingMode::FULL)
1858 {
1859 JLOG(m_journal.warn()) << "Don't have LCL, going to tracking";
1860 setMode(OperatingMode::TRACKING);
1861 CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. ";
1862 }
1863
1864 CLOG(clog) << "beginConsensus no previous ledger. ";
1865 return false;
1866 }
1867
1868 XRPL_ASSERT(
1869 prevLedger->header().hash == closingInfo.parentHash,
1870 "xrpl::NetworkOPsImp::beginConsensus : prevLedger hash matches "
1871 "parent");
1872 XRPL_ASSERT(
1873 closingInfo.parentHash == m_ledgerMaster.getClosedLedger()->header().hash,
1874 "xrpl::NetworkOPsImp::beginConsensus : closedLedger parent matches "
1875 "hash");
1876
1877 app_.validators().setNegativeUNL(prevLedger->negativeUNL());
1878 TrustChanges const changes = app_.validators().updateTrusted(
1879 app_.getValidations().getCurrentNodeIDs(),
1880 closingInfo.parentCloseTime,
1881 *this,
1882 app_.overlay(),
1883 app_.getHashRouter());
1884
1885 if (!changes.added.empty() || !changes.removed.empty())
1886 {
1887 app_.getValidations().trustChanged(changes.added, changes.removed);
1888 // Update the AmendmentTable so it tracks the current validators.
1889 app_.getAmendmentTable().trustChanged(app_.validators().getQuorumKeys().second);
1890 }
1891
1892 mConsensus.startRound(
1893 app_.timeKeeper().closeTime(), networkClosed, prevLedger, changes.removed, changes.added, clog);
1894
1895 ConsensusPhase const currPhase = mConsensus.phase();
1896 if (mLastConsensusPhase != currPhase)
1897 {
1898 reportConsensusStateChange(currPhase);
1899 mLastConsensusPhase = currPhase;
1900 }
1901
1902 JLOG(m_journal.debug()) << "Initiating consensus engine";
1903 return true;
1904}
1905
1906bool
1907NetworkOPsImp::processTrustedProposal(RCLCxPeerPos peerPos)
1908{
1909 auto const& peerKey = peerPos.publicKey();
1910 if (validatorPK_ == peerKey || validatorMasterPK_ == peerKey)
1911 {
1912 // Could indicate a operator misconfiguration where two nodes are
1913 // running with the same validator key configured, so this isn't fatal,
1914 // and it doesn't necessarily indicate peer misbehavior. But since this
1915 // is a trusted message, it could be a very big deal. Either way, we
1916 // don't want to relay the proposal. Note that the byzantine behavior
1917 // detection in handleNewValidation will notify other peers.
1918 //
1919 // Another, innocuous explanation is unusual message routing and delays,
1920 // causing this node to receive its own messages back.
1921 JLOG(m_journal.error()) << "Received a proposal signed by MY KEY from a peer. This may "
1922 "indicate a misconfiguration where another node has the same "
1923 "validator key, or may be caused by unusual message routing and "
1924 "delays.";
1925 return false;
1926 }
1927
1928 return mConsensus.peerProposal(app_.timeKeeper().closeTime(), peerPos);
1929}
1930
1931void
1932NetworkOPsImp::mapComplete(std::shared_ptr<SHAMap> const& map, bool fromAcquire)
1933{
1934 // We now have an additional transaction set
1935 // either created locally during the consensus process
1936 // or acquired from a peer
1937
1938 // Inform peers we have this set
1939 protocol::TMHaveTransactionSet msg;
1940 msg.set_hash(map->getHash().as_uint256().begin(), 256 / 8);
1941 msg.set_status(protocol::tsHAVE);
1942 app_.overlay().foreach(send_always(std::make_shared<Message>(msg, protocol::mtHAVE_SET)));
1943
1944 // We acquired it because consensus asked us to
1945 if (fromAcquire)
1946 mConsensus.gotTxSet(app_.timeKeeper().closeTime(), RCLTxSet{map});
1947}
1948
1949void
1950NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
1951{
1952 uint256 deadLedger = m_ledgerMaster.getClosedLedger()->header().parentHash;
1953
1954 for (auto const& it : app_.overlay().getActivePeers())
1955 {
1956 if (it && (it->getClosedLedgerHash() == deadLedger))
1957 {
1958 JLOG(m_journal.trace()) << "Killing obsolete peer status";
1959 it->cycleStatus();
1960 }
1961 }
1962
1963 uint256 networkClosed;
1964 bool ledgerChange = checkLastClosedLedger(app_.overlay().getActivePeers(), networkClosed);
1965
1966 if (networkClosed.isZero())
1967 {
1968 CLOG(clog) << "endConsensus last closed ledger is zero. ";
1969 return;
1970 }
1971
1972 // WRITEME: Unless we are in FULL and in the process of doing a consensus,
1973 // we must count how many nodes share our LCL, how many nodes disagree with
1974 // our LCL, and how many validations our LCL has. We also want to check
1975 // timing to make sure there shouldn't be a newer LCL. We need this
1976 // information to do the next three tests.
1977
1978 if (((mMode == OperatingMode::CONNECTED) || (mMode == OperatingMode::SYNCING)) && !ledgerChange)
1979 {
1980 // Count number of peers that agree with us and UNL nodes whose
1981 // validations we have for LCL. If the ledger is good enough, go to
1982 // TRACKING - TODO
1983 if (!needNetworkLedger_)
1984 setMode(OperatingMode::TRACKING);
1985 }
1986
1987 if (((mMode == OperatingMode::CONNECTED) || (mMode == OperatingMode::TRACKING)) && !ledgerChange)
1988 {
1989 // check if the ledger is good enough to go to FULL
1990 // Note: Do not go to FULL if we don't have the previous ledger
1991 // check if the ledger is bad enough to go to CONNECTED -- TODO
1992 auto current = m_ledgerMaster.getCurrentLedger();
1993 if (app_.timeKeeper().now() < (current->header().parentCloseTime + 2 * current->header().closeTimeResolution))
1994 {
1995 setMode(OperatingMode::FULL);
1996 }
1997 }
1998
1999 beginConsensus(networkClosed, clog);
2000}
2001
2002void
2003NetworkOPsImp::consensusViewChange()
2004{
2005 if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING))
2006 {
2007 setMode(OperatingMode::CONNECTED);
2008 }
2009}
2010
2011void
2012NetworkOPsImp::pubManifest(Manifest const& mo)
2013{
2014 // VFALCO consider std::shared_mutex
2015 std::lock_guard sl(mSubLock);
2016
2017 if (!mStreamMaps[sManifests].empty())
2018 {
2020
2021 jvObj[jss::type] = "manifestReceived";
2022 jvObj[jss::master_key] = toBase58(TokenType::NodePublic, mo.masterKey);
2023 if (mo.signingKey)
2024 jvObj[jss::signing_key] = toBase58(TokenType::NodePublic, *mo.signingKey);
2025 jvObj[jss::seq] = Json::UInt(mo.sequence);
2026 if (auto sig = mo.getSignature())
2027 jvObj[jss::signature] = strHex(*sig);
2028 jvObj[jss::master_signature] = strHex(mo.getMasterSignature());
2029 if (!mo.domain.empty())
2030 jvObj[jss::domain] = mo.domain;
2031 jvObj[jss::manifest] = strHex(mo.serialized);
2032
2033 for (auto i = mStreamMaps[sManifests].begin(); i != mStreamMaps[sManifests].end();)
2034 {
2035 if (auto p = i->second.lock())
2036 {
2037 p->send(jvObj, true);
2038 ++i;
2039 }
2040 else
2041 {
2042 i = mStreamMaps[sManifests].erase(i);
2043 }
2044 }
2045 }
2046}
2047
2048NetworkOPsImp::ServerFeeSummary::ServerFeeSummary(
2049 XRPAmount fee,
2050 TxQ::Metrics&& escalationMetrics,
2051 LoadFeeTrack const& loadFeeTrack)
2052 : loadFactorServer{loadFeeTrack.getLoadFactor()}
2053 , loadBaseServer{loadFeeTrack.getLoadBase()}
2054 , baseFee{fee}
2055 , em{std::move(escalationMetrics)}
2056{
2057}
2058
2059bool
2061{
2062 if (loadFactorServer != b.loadFactorServer || loadBaseServer != b.loadBaseServer || baseFee != b.baseFee ||
2063 em.has_value() != b.em.has_value())
2064 return true;
2065
2066 if (em && b.em)
2067 {
2068 return (
2069 em->minProcessingFeeLevel != b.em->minProcessingFeeLevel ||
2070 em->openLedgerFeeLevel != b.em->openLedgerFeeLevel || em->referenceFeeLevel != b.em->referenceFeeLevel);
2071 }
2072
2073 return false;
2074}
2075
2076// Need to cap to uint64 to uint32 due to JSON limitations
2077static std::uint32_t
2079{
2081
2082 return std::min(max32, v);
2083};
2084
2085void
2087{
2088 // VFALCO TODO Don't hold the lock across calls to send...make a copy of the
2089 // list into a local array while holding the lock then release
2090 // the lock and call send on everyone.
2091 //
2093
2094 if (!mStreamMaps[sServer].empty())
2095 {
2097
2099 app_.openLedger().current()->fees().base,
2101 app_.getFeeTrack()};
2102
2103 jvObj[jss::type] = "serverStatus";
2104 jvObj[jss::server_status] = strOperatingMode();
2105 jvObj[jss::load_base] = f.loadBaseServer;
2106 jvObj[jss::load_factor_server] = f.loadFactorServer;
2107 jvObj[jss::base_fee] = f.baseFee.jsonClipped();
2108
2109 if (f.em)
2110 {
2111 auto const loadFactor = std::max(
2112 safe_cast<std::uint64_t>(f.loadFactorServer),
2113 mulDiv(f.em->openLedgerFeeLevel, f.loadBaseServer, f.em->referenceFeeLevel).value_or(xrpl::muldiv_max));
2114
2115 jvObj[jss::load_factor] = trunc32(loadFactor);
2116 jvObj[jss::load_factor_fee_escalation] = f.em->openLedgerFeeLevel.jsonClipped();
2117 jvObj[jss::load_factor_fee_queue] = f.em->minProcessingFeeLevel.jsonClipped();
2118 jvObj[jss::load_factor_fee_reference] = f.em->referenceFeeLevel.jsonClipped();
2119 }
2120 else
2121 jvObj[jss::load_factor] = f.loadFactorServer;
2122
2123 mLastFeeSummary = f;
2124
2125 for (auto i = mStreamMaps[sServer].begin(); i != mStreamMaps[sServer].end();)
2126 {
2127 InfoSub::pointer p = i->second.lock();
2128
2129 // VFALCO TODO research the possibility of using thread queues and
2130 // linearizing the deletion of subscribers with the
2131 // sending of JSON data.
2132 if (p)
2133 {
2134 p->send(jvObj, true);
2135 ++i;
2136 }
2137 else
2138 {
2139 i = mStreamMaps[sServer].erase(i);
2140 }
2141 }
2142 }
2143}
2144
2145void
2147{
2149
2150 auto& streamMap = mStreamMaps[sConsensusPhase];
2151 if (!streamMap.empty())
2152 {
2154 jvObj[jss::type] = "consensusPhase";
2155 jvObj[jss::consensus] = to_string(phase);
2156
2157 for (auto i = streamMap.begin(); i != streamMap.end();)
2158 {
2159 if (auto p = i->second.lock())
2160 {
2161 p->send(jvObj, true);
2162 ++i;
2163 }
2164 else
2165 {
2166 i = streamMap.erase(i);
2167 }
2168 }
2169 }
2170}
2171
2172void
2174{
2175 // VFALCO consider std::shared_mutex
2177
2178 if (!mStreamMaps[sValidations].empty())
2179 {
2181
2182 auto const signerPublic = val->getSignerPublic();
2183
2184 jvObj[jss::type] = "validationReceived";
2185 jvObj[jss::validation_public_key] = toBase58(TokenType::NodePublic, signerPublic);
2186 jvObj[jss::ledger_hash] = to_string(val->getLedgerHash());
2187 jvObj[jss::signature] = strHex(val->getSignature());
2188 jvObj[jss::full] = val->isFull();
2189 jvObj[jss::flags] = val->getFlags();
2190 jvObj[jss::signing_time] = *(*val)[~sfSigningTime];
2191 jvObj[jss::data] = strHex(val->getSerializer().slice());
2192 jvObj[jss::network_id] = app_.config().NETWORK_ID;
2193
2194 if (auto version = (*val)[~sfServerVersion])
2195 jvObj[jss::server_version] = std::to_string(*version);
2196
2197 if (auto cookie = (*val)[~sfCookie])
2198 jvObj[jss::cookie] = std::to_string(*cookie);
2199
2200 if (auto hash = (*val)[~sfValidatedHash])
2201 jvObj[jss::validated_hash] = strHex(*hash);
2202
2203 auto const masterKey = app_.validatorManifests().getMasterKey(signerPublic);
2204
2205 if (masterKey != signerPublic)
2206 jvObj[jss::master_key] = toBase58(TokenType::NodePublic, masterKey);
2207
2208 // NOTE *seq is a number, but old API versions used string. We replace
2209 // number with a string using MultiApiJson near end of this function
2210 if (auto const seq = (*val)[~sfLedgerSequence])
2211 jvObj[jss::ledger_index] = *seq;
2212
2213 if (val->isFieldPresent(sfAmendments))
2214 {
2215 jvObj[jss::amendments] = Json::Value(Json::arrayValue);
2216 for (auto const& amendment : val->getFieldV256(sfAmendments))
2217 jvObj[jss::amendments].append(to_string(amendment));
2218 }
2219
2220 if (auto const closeTime = (*val)[~sfCloseTime])
2221 jvObj[jss::close_time] = *closeTime;
2222
2223 if (auto const loadFee = (*val)[~sfLoadFee])
2224 jvObj[jss::load_fee] = *loadFee;
2225
2226 if (auto const baseFee = val->at(~sfBaseFee))
2227 jvObj[jss::base_fee] = static_cast<double>(*baseFee);
2228
2229 if (auto const reserveBase = val->at(~sfReserveBase))
2230 jvObj[jss::reserve_base] = *reserveBase;
2231
2232 if (auto const reserveInc = val->at(~sfReserveIncrement))
2233 jvObj[jss::reserve_inc] = *reserveInc;
2234
2235 // (The ~ operator converts the Proxy to a std::optional, which
2236 // simplifies later operations)
2237 if (auto const baseFeeXRP = ~val->at(~sfBaseFeeDrops); baseFeeXRP && baseFeeXRP->native())
2238 jvObj[jss::base_fee] = baseFeeXRP->xrp().jsonClipped();
2239
2240 if (auto const reserveBaseXRP = ~val->at(~sfReserveBaseDrops); reserveBaseXRP && reserveBaseXRP->native())
2241 jvObj[jss::reserve_base] = reserveBaseXRP->xrp().jsonClipped();
2242
2243 if (auto const reserveIncXRP = ~val->at(~sfReserveIncrementDrops); reserveIncXRP && reserveIncXRP->native())
2244 jvObj[jss::reserve_inc] = reserveIncXRP->xrp().jsonClipped();
2245
2246 // NOTE Use MultiApiJson to publish two slightly different JSON objects
2247 // for consumers supporting different API versions
2248 MultiApiJson multiObj{jvObj};
2249 multiObj.visit(
2250 RPC::apiVersion<1>, //
2251 [](Json::Value& jvTx) {
2252 // Type conversion for older API versions to string
2253 if (jvTx.isMember(jss::ledger_index))
2254 {
2255 jvTx[jss::ledger_index] = std::to_string(jvTx[jss::ledger_index].asUInt());
2256 }
2257 });
2258
2259 for (auto i = mStreamMaps[sValidations].begin(); i != mStreamMaps[sValidations].end();)
2260 {
2261 if (auto p = i->second.lock())
2262 {
2263 multiObj.visit(
2264 p->getApiVersion(), //
2265 [&](Json::Value const& jv) { p->send(jv, true); });
2266 ++i;
2267 }
2268 else
2269 {
2270 i = mStreamMaps[sValidations].erase(i);
2271 }
2272 }
2273 }
2274}
2275
2276void
2278{
2280
2281 if (!mStreamMaps[sPeerStatus].empty())
2282 {
2283 Json::Value jvObj(func());
2284
2285 jvObj[jss::type] = "peerStatusChange";
2286
2287 for (auto i = mStreamMaps[sPeerStatus].begin(); i != mStreamMaps[sPeerStatus].end();)
2288 {
2289 InfoSub::pointer p = i->second.lock();
2290
2291 if (p)
2292 {
2293 p->send(jvObj, true);
2294 ++i;
2295 }
2296 else
2297 {
2298 i = mStreamMaps[sPeerStatus].erase(i);
2299 }
2300 }
2301 }
2302}
2303
2304void
2306{
2307 using namespace std::chrono_literals;
2308 if (om == OperatingMode::CONNECTED)
2309 {
2312 }
2313 else if (om == OperatingMode::SYNCING)
2314 {
2317 }
2318
2319 if ((om > OperatingMode::CONNECTED) && isBlocked())
2321
2322 if (mMode == om)
2323 return;
2324
2325 mMode = om;
2326
2327 accounting_.mode(om);
2328
2329 JLOG(m_journal.info()) << "STATE->" << strOperatingMode();
2330 pubServer();
2331}
2332
2333bool
2335{
2336 JLOG(m_journal.trace()) << "recvValidation " << val->getLedgerHash() << " from " << source;
2337
2339 BypassAccept bypassAccept = BypassAccept::no;
2340 try
2341 {
2342 if (pendingValidations_.contains(val->getLedgerHash()))
2343 bypassAccept = BypassAccept::yes;
2344 else
2345 pendingValidations_.insert(val->getLedgerHash());
2346 scope_unlock unlock(lock);
2347 handleNewValidation(app_, val, source, bypassAccept, m_journal);
2348 }
2349 catch (std::exception const& e)
2350 {
2351 JLOG(m_journal.warn()) << "Exception thrown for handling new validation " << val->getLedgerHash() << ": "
2352 << e.what();
2353 }
2354 catch (...)
2355 {
2356 JLOG(m_journal.warn()) << "Unknown exception thrown for handling new validation " << val->getLedgerHash();
2357 }
2358 if (bypassAccept == BypassAccept::no)
2359 {
2360 pendingValidations_.erase(val->getLedgerHash());
2361 }
2362 lock.unlock();
2363
2364 pubValidation(val);
2365
2366 JLOG(m_journal.debug()) << [this, &val]() -> auto {
2368 ss << "VALIDATION: " << val->render() << " master_key: ";
2369 auto master = app_.validators().getTrustedKey(val->getSignerPublic());
2370 if (master)
2371 {
2372 ss << toBase58(TokenType::NodePublic, *master);
2373 }
2374 else
2375 {
2376 ss << "none";
2377 }
2378 return ss.str();
2379 }();
2380
2381 // We will always relay trusted validations; if configured, we will
2382 // also relay all untrusted validations.
2383 return app_.config().RELAY_UNTRUSTED_VALIDATIONS == 1 || val->isTrusted();
2384}
2385
2388{
2389 return mConsensus.getJson(true);
2390}
2391
2393NetworkOPsImp::getServerInfo(bool human, bool admin, bool counters)
2394{
2396
2397 // System-level warnings
2398 {
2399 Json::Value warnings{Json::arrayValue};
2400 if (isAmendmentBlocked())
2401 {
2402 Json::Value& w = warnings.append(Json::objectValue);
2403 w[jss::id] = warnRPC_AMENDMENT_BLOCKED;
2404 w[jss::message] =
2405 "This server is amendment blocked, and must be updated to be "
2406 "able to stay in sync with the network.";
2407 }
2408 if (isUNLBlocked())
2409 {
2410 Json::Value& w = warnings.append(Json::objectValue);
2411 w[jss::id] = warnRPC_EXPIRED_VALIDATOR_LIST;
2412 w[jss::message] =
2413 "This server has an expired validator list. validators.txt "
2414 "may be incorrectly configured or some [validator_list_sites] "
2415 "may be unreachable.";
2416 }
2417 if (admin && isAmendmentWarned())
2418 {
2419 Json::Value& w = warnings.append(Json::objectValue);
2420 w[jss::id] = warnRPC_UNSUPPORTED_MAJORITY;
2421 w[jss::message] =
2422 "One or more unsupported amendments have reached majority. "
2423 "Upgrade to the latest version before they are activated "
2424 "to avoid being amendment blocked.";
2425 if (auto const expected = app_.getAmendmentTable().firstUnsupportedExpected())
2426 {
2427 auto& d = w[jss::details] = Json::objectValue;
2428 d[jss::expected_date] = expected->time_since_epoch().count();
2429 d[jss::expected_date_UTC] = to_string(*expected);
2430 }
2431 }
2432
2433 if (warnings.size())
2434 info[jss::warnings] = std::move(warnings);
2435 }
2436
2437 // hostid: unique string describing the machine
2438 if (human)
2439 info[jss::hostid] = getHostId(admin);
2440
2441 // domain: if configured with a domain, report it:
2442 if (!app_.config().SERVER_DOMAIN.empty())
2443 info[jss::server_domain] = app_.config().SERVER_DOMAIN;
2444
2445 info[jss::build_version] = BuildInfo::getVersionString();
2446
2447 info[jss::server_state] = strOperatingMode(admin);
2448
2449 info[jss::time] = to_string(std::chrono::floor<std::chrono::microseconds>(std::chrono::system_clock::now()));
2450
2452 info[jss::network_ledger] = "waiting";
2453
2454 info[jss::validation_quorum] = static_cast<Json::UInt>(app_.validators().quorum());
2455
2456 if (admin)
2457 {
2458 switch (app_.config().NODE_SIZE)
2459 {
2460 case 0:
2461 info[jss::node_size] = "tiny";
2462 break;
2463 case 1:
2464 info[jss::node_size] = "small";
2465 break;
2466 case 2:
2467 info[jss::node_size] = "medium";
2468 break;
2469 case 3:
2470 info[jss::node_size] = "large";
2471 break;
2472 case 4:
2473 info[jss::node_size] = "huge";
2474 break;
2475 }
2476
2477 auto when = app_.validators().expires();
2478
2479 if (!human)
2480 {
2481 if (when)
2482 info[jss::validator_list_expires] = safe_cast<Json::UInt>(when->time_since_epoch().count());
2483 else
2484 info[jss::validator_list_expires] = 0;
2485 }
2486 else
2487 {
2488 auto& x = (info[jss::validator_list] = Json::objectValue);
2489
2490 x[jss::count] = static_cast<Json::UInt>(app_.validators().count());
2491
2492 if (when)
2493 {
2494 if (*when == TimeKeeper::time_point::max())
2495 {
2496 x[jss::expiration] = "never";
2497 x[jss::status] = "active";
2498 }
2499 else
2500 {
2501 x[jss::expiration] = to_string(*when);
2502
2503 if (*when > app_.timeKeeper().now())
2504 x[jss::status] = "active";
2505 else
2506 x[jss::status] = "expired";
2507 }
2508 }
2509 else
2510 {
2511 x[jss::status] = "unknown";
2512 x[jss::expiration] = "unknown";
2513 }
2514 }
2515
2516#if defined(GIT_COMMIT_HASH) || defined(GIT_BRANCH)
2517 {
2518 auto& x = (info[jss::git] = Json::objectValue);
2519#ifdef GIT_COMMIT_HASH
2520 x[jss::hash] = GIT_COMMIT_HASH;
2521#endif
2522#ifdef GIT_BRANCH
2523 x[jss::branch] = GIT_BRANCH;
2524#endif
2525 }
2526#endif
2527 }
2528 info[jss::io_latency_ms] = static_cast<Json::UInt>(app_.getIOLatency().count());
2529
2530 if (admin)
2531 {
2532 if (auto const localPubKey = app_.validators().localPublicKey(); localPubKey && app_.getValidationPublicKey())
2533 {
2534 info[jss::pubkey_validator] = toBase58(TokenType::NodePublic, localPubKey.value());
2535 }
2536 else
2537 {
2538 info[jss::pubkey_validator] = "none";
2539 }
2540 }
2541
2542 if (counters)
2543 {
2544 info[jss::counters] = app_.getPerfLog().countersJson();
2545
2546 Json::Value nodestore(Json::objectValue);
2547 app_.getNodeStore().getCountsJson(nodestore);
2548 info[jss::counters][jss::nodestore] = nodestore;
2549 info[jss::current_activities] = app_.getPerfLog().currentJson();
2550 }
2551
2552 info[jss::pubkey_node] = toBase58(TokenType::NodePublic, app_.nodeIdentity().first);
2553
2554 info[jss::complete_ledgers] = app_.getLedgerMaster().getCompleteLedgers();
2555
2557 info[jss::amendment_blocked] = true;
2558
2559 auto const fp = m_ledgerMaster.getFetchPackCacheSize();
2560
2561 if (fp != 0)
2562 info[jss::fetch_pack] = Json::UInt(fp);
2563
2564 info[jss::peers] = Json::UInt(app_.overlay().size());
2565
2566 Json::Value lastClose = Json::objectValue;
2567 lastClose[jss::proposers] = Json::UInt(mConsensus.prevProposers());
2568
2569 if (human)
2570 {
2571 lastClose[jss::converge_time_s] = std::chrono::duration<double>{mConsensus.prevRoundTime()}.count();
2572 }
2573 else
2574 {
2575 lastClose[jss::converge_time] = Json::Int(mConsensus.prevRoundTime().count());
2576 }
2577
2578 info[jss::last_close] = lastClose;
2579
2580 // info[jss::consensus] = mConsensus.getJson();
2581
2582 if (admin)
2583 info[jss::load] = m_job_queue.getJson();
2584
2585 if (auto const netid = app_.overlay().networkID())
2586 info[jss::network_id] = static_cast<Json::UInt>(*netid);
2587
2588 auto const escalationMetrics = app_.getTxQ().getMetrics(*app_.openLedger().current());
2589
2590 auto const loadFactorServer = app_.getFeeTrack().getLoadFactor();
2591 auto const loadBaseServer = app_.getFeeTrack().getLoadBase();
2592 /* Scale the escalated fee level to unitless "load factor".
2593 In practice, this just strips the units, but it will continue
2594 to work correctly if either base value ever changes. */
2595 auto const loadFactorFeeEscalation =
2596 mulDiv(escalationMetrics.openLedgerFeeLevel, loadBaseServer, escalationMetrics.referenceFeeLevel)
2598
2599 auto const loadFactor = std::max(safe_cast<std::uint64_t>(loadFactorServer), loadFactorFeeEscalation);
2600
2601 if (!human)
2602 {
2603 info[jss::load_base] = loadBaseServer;
2604 info[jss::load_factor] = trunc32(loadFactor);
2605 info[jss::load_factor_server] = loadFactorServer;
2606
2607 /* Json::Value doesn't support uint64, so clamp to max
2608 uint32 value. This is mostly theoretical, since there
2609 probably isn't enough extant XRP to drive the factor
2610 that high.
2611 */
2612 info[jss::load_factor_fee_escalation] = escalationMetrics.openLedgerFeeLevel.jsonClipped();
2613 info[jss::load_factor_fee_queue] = escalationMetrics.minProcessingFeeLevel.jsonClipped();
2614 info[jss::load_factor_fee_reference] = escalationMetrics.referenceFeeLevel.jsonClipped();
2615 }
2616 else
2617 {
2618 info[jss::load_factor] = static_cast<double>(loadFactor) / loadBaseServer;
2619
2620 if (loadFactorServer != loadFactor)
2621 info[jss::load_factor_server] = static_cast<double>(loadFactorServer) / loadBaseServer;
2622
2623 if (admin)
2624 {
2626 if (fee != loadBaseServer)
2627 info[jss::load_factor_local] = static_cast<double>(fee) / loadBaseServer;
2628 fee = app_.getFeeTrack().getRemoteFee();
2629 if (fee != loadBaseServer)
2630 info[jss::load_factor_net] = static_cast<double>(fee) / loadBaseServer;
2631 fee = app_.getFeeTrack().getClusterFee();
2632 if (fee != loadBaseServer)
2633 info[jss::load_factor_cluster] = static_cast<double>(fee) / loadBaseServer;
2634 }
2635 if (escalationMetrics.openLedgerFeeLevel != escalationMetrics.referenceFeeLevel &&
2636 (admin || loadFactorFeeEscalation != loadFactor))
2637 info[jss::load_factor_fee_escalation] =
2638 escalationMetrics.openLedgerFeeLevel.decimalFromReference(escalationMetrics.referenceFeeLevel);
2639 if (escalationMetrics.minProcessingFeeLevel != escalationMetrics.referenceFeeLevel)
2640 info[jss::load_factor_fee_queue] =
2641 escalationMetrics.minProcessingFeeLevel.decimalFromReference(escalationMetrics.referenceFeeLevel);
2642 }
2643
2644 bool valid = false;
2645 auto lpClosed = m_ledgerMaster.getValidatedLedger();
2646
2647 if (lpClosed)
2648 valid = true;
2649 else
2650 lpClosed = m_ledgerMaster.getClosedLedger();
2651
2652 if (lpClosed)
2653 {
2654 XRPAmount const baseFee = lpClosed->fees().base;
2656 l[jss::seq] = Json::UInt(lpClosed->header().seq);
2657 l[jss::hash] = to_string(lpClosed->header().hash);
2658
2659 if (!human)
2660 {
2661 l[jss::base_fee] = baseFee.jsonClipped();
2662 l[jss::reserve_base] = lpClosed->fees().reserve.jsonClipped();
2663 l[jss::reserve_inc] = lpClosed->fees().increment.jsonClipped();
2664 l[jss::close_time] = Json::Value::UInt(lpClosed->header().closeTime.time_since_epoch().count());
2665 }
2666 else
2667 {
2668 l[jss::base_fee_xrp] = baseFee.decimalXRP();
2669 l[jss::reserve_base_xrp] = lpClosed->fees().reserve.decimalXRP();
2670 l[jss::reserve_inc_xrp] = lpClosed->fees().increment.decimalXRP();
2671
2672 if (auto const closeOffset = app_.timeKeeper().closeOffset(); std::abs(closeOffset.count()) >= 60)
2673 l[jss::close_time_offset] = static_cast<std::uint32_t>(closeOffset.count());
2674
2675 constexpr std::chrono::seconds highAgeThreshold{1000000};
2677 {
2678 auto const age = m_ledgerMaster.getValidatedLedgerAge();
2679 l[jss::age] = Json::UInt(age < highAgeThreshold ? age.count() : 0);
2680 }
2681 else
2682 {
2683 auto lCloseTime = lpClosed->header().closeTime;
2684 auto closeTime = app_.timeKeeper().closeTime();
2685 if (lCloseTime <= closeTime)
2686 {
2687 using namespace std::chrono_literals;
2688 auto age = closeTime - lCloseTime;
2689 l[jss::age] = Json::UInt(age < highAgeThreshold ? age.count() : 0);
2690 }
2691 }
2692 }
2693
2694 if (valid)
2695 info[jss::validated_ledger] = l;
2696 else
2697 info[jss::closed_ledger] = l;
2698
2699 auto lpPublished = m_ledgerMaster.getPublishedLedger();
2700 if (!lpPublished)
2701 info[jss::published_ledger] = "none";
2702 else if (lpPublished->header().seq != lpClosed->header().seq)
2703 info[jss::published_ledger] = lpPublished->header().seq;
2704 }
2705
2706 accounting_.json(info);
2707 info[jss::uptime] = UptimeClock::now().time_since_epoch().count();
2708 info[jss::jq_trans_overflow] = std::to_string(app_.overlay().getJqTransOverflow());
2709 info[jss::peer_disconnects] = std::to_string(app_.overlay().getPeerDisconnect());
2710 info[jss::peer_disconnects_resources] = std::to_string(app_.overlay().getPeerDisconnectCharges());
2711
2712 // This array must be sorted in increasing order.
2713 static constexpr std::array<std::string_view, 7> protocols{"http", "https", "peer", "ws", "ws2", "wss", "wss2"};
2714 static_assert(std::is_sorted(std::begin(protocols), std::end(protocols)));
2715 {
2717 for (auto const& port : app_.getServerHandler().setup().ports)
2718 {
2719 // Don't publish admin ports for non-admin users
2720 if (!admin &&
2721 !(port.admin_nets_v4.empty() && port.admin_nets_v6.empty() && port.admin_user.empty() &&
2722 port.admin_password.empty()))
2723 continue;
2726 std::begin(port.protocol),
2727 std::end(port.protocol),
2728 std::begin(protocols),
2729 std::end(protocols),
2730 std::back_inserter(proto));
2731 if (!proto.empty())
2732 {
2733 auto& jv = ports.append(Json::Value(Json::objectValue));
2734 jv[jss::port] = std::to_string(port.port);
2735 jv[jss::protocol] = Json::Value{Json::arrayValue};
2736 for (auto const& p : proto)
2737 jv[jss::protocol].append(p);
2738 }
2739 }
2740
2741 if (app_.config().exists(SECTION_PORT_GRPC))
2742 {
2743 auto const& grpcSection = app_.config().section(SECTION_PORT_GRPC);
2744 auto const optPort = grpcSection.get("port");
2745 if (optPort && grpcSection.get("ip"))
2746 {
2747 auto& jv = ports.append(Json::Value(Json::objectValue));
2748 jv[jss::port] = *optPort;
2749 jv[jss::protocol] = Json::Value{Json::arrayValue};
2750 jv[jss::protocol].append("grpc");
2751 }
2752 }
2753 info[jss::ports] = std::move(ports);
2754 }
2755
2756 return info;
2757}
2758
2759void
2764
2770
2771void
2773 std::shared_ptr<ReadView const> const& ledger,
2774 std::shared_ptr<STTx const> const& transaction,
2775 TER result)
2776{
2777 // never publish an inner txn inside a batch txn. The flag should
2778 // only be set if the Batch feature is enabled. If Batch is not
2779 // enabled, the flag is always invalid, so don't publish it
2780 // regardless.
2781 if (transaction->isFlag(tfInnerBatchTxn))
2782 return;
2783
2784 MultiApiJson jvObj = transJson(transaction, result, false, ledger, std::nullopt);
2785
2786 {
2788
2789 auto it = mStreamMaps[sRTTransactions].begin();
2790 while (it != mStreamMaps[sRTTransactions].end())
2791 {
2792 InfoSub::pointer p = it->second.lock();
2793
2794 if (p)
2795 {
2796 jvObj.visit(
2797 p->getApiVersion(), //
2798 [&](Json::Value const& jv) { p->send(jv, true); });
2799 ++it;
2800 }
2801 else
2802 {
2803 it = mStreamMaps[sRTTransactions].erase(it);
2804 }
2805 }
2806 }
2807
2808 pubProposedAccountTransaction(ledger, transaction, result);
2809}
2810
2811void
2813{
2814 // Ledgers are published only when they acquire sufficient validations
2815 // Holes are filled across connection loss or other catastrophe
2816
2817 std::shared_ptr<AcceptedLedger> alpAccepted = app_.getAcceptedLedgerCache().fetch(lpAccepted->header().hash);
2818 if (!alpAccepted)
2819 {
2820 alpAccepted = std::make_shared<AcceptedLedger>(lpAccepted, app_);
2821 app_.getAcceptedLedgerCache().canonicalize_replace_client(lpAccepted->header().hash, alpAccepted);
2822 }
2823
2824 XRPL_ASSERT(alpAccepted->getLedger().get() == lpAccepted.get(), "xrpl::NetworkOPsImp::pubLedger : accepted input");
2825
2826 {
2827 JLOG(m_journal.debug()) << "Publishing ledger " << lpAccepted->header().seq << " " << lpAccepted->header().hash;
2828
2830
2831 if (!mStreamMaps[sLedger].empty())
2832 {
2834
2835 jvObj[jss::type] = "ledgerClosed";
2836 jvObj[jss::ledger_index] = lpAccepted->header().seq;
2837 jvObj[jss::ledger_hash] = to_string(lpAccepted->header().hash);
2838 jvObj[jss::ledger_time] = Json::Value::UInt(lpAccepted->header().closeTime.time_since_epoch().count());
2839
2840 jvObj[jss::network_id] = app_.config().NETWORK_ID;
2841
2842 if (!lpAccepted->rules().enabled(featureXRPFees))
2843 jvObj[jss::fee_ref] = Config::FEE_UNITS_DEPRECATED;
2844 jvObj[jss::fee_base] = lpAccepted->fees().base.jsonClipped();
2845 jvObj[jss::reserve_base] = lpAccepted->fees().reserve.jsonClipped();
2846 jvObj[jss::reserve_inc] = lpAccepted->fees().increment.jsonClipped();
2847
2848 jvObj[jss::txn_count] = Json::UInt(alpAccepted->size());
2849
2851 {
2852 jvObj[jss::validated_ledgers] = app_.getLedgerMaster().getCompleteLedgers();
2853 }
2854
2855 auto it = mStreamMaps[sLedger].begin();
2856 while (it != mStreamMaps[sLedger].end())
2857 {
2858 InfoSub::pointer p = it->second.lock();
2859 if (p)
2860 {
2861 p->send(jvObj, true);
2862 ++it;
2863 }
2864 else
2865 it = mStreamMaps[sLedger].erase(it);
2866 }
2867 }
2868
2869 if (!mStreamMaps[sBookChanges].empty())
2870 {
2871 Json::Value jvObj = xrpl::RPC::computeBookChanges(lpAccepted);
2872
2873 auto it = mStreamMaps[sBookChanges].begin();
2874 while (it != mStreamMaps[sBookChanges].end())
2875 {
2876 InfoSub::pointer p = it->second.lock();
2877 if (p)
2878 {
2879 p->send(jvObj, true);
2880 ++it;
2881 }
2882 else
2883 it = mStreamMaps[sBookChanges].erase(it);
2884 }
2885 }
2886
2887 {
2888 static bool firstTime = true;
2889 if (firstTime)
2890 {
2891 // First validated ledger, start delayed SubAccountHistory
2892 firstTime = false;
2893 for (auto& outer : mSubAccountHistory)
2894 {
2895 for (auto& inner : outer.second)
2896 {
2897 auto& subInfo = inner.second;
2898 if (subInfo.index_->separationLedgerSeq_ == 0)
2899 {
2900 subAccountHistoryStart(alpAccepted->getLedger(), subInfo);
2901 }
2902 }
2903 }
2904 }
2905 }
2906 }
2907
2908 // Don't lock since pubAcceptedTransaction is locking.
2909 for (auto const& accTx : *alpAccepted)
2910 {
2911 JLOG(m_journal.trace()) << "pubAccepted: " << accTx->getJson();
2912 pubValidatedTransaction(lpAccepted, *accTx, accTx == *(--alpAccepted->end()));
2913 }
2914}
2915
2916void
2918{
2920 app_.openLedger().current()->fees().base,
2922 app_.getFeeTrack()};
2923
2924 // only schedule the job if something has changed
2925 if (f != mLastFeeSummary)
2926 {
2927 m_job_queue.addJob(jtCLIENT_FEE_CHANGE, "PubFee", [this]() { pubServer(); });
2928 }
2929}
2930
2931void
2933{
2934 m_job_queue.addJob(jtCLIENT_CONSENSUS, "PubCons", [this, phase]() { pubConsensus(phase); });
2935}
2936
2937inline void
2939{
2940 m_localTX->sweep(view);
2941}
2942inline std::size_t
2944{
2945 return m_localTX->size();
2946}
2947
2948// This routine should only be used to publish accepted or validated
2949// transactions.
2952 std::shared_ptr<STTx const> const& transaction,
2953 TER result,
2954 bool validated,
2955 std::shared_ptr<ReadView const> const& ledger,
2957{
2959 std::string sToken;
2960 std::string sHuman;
2961
2962 transResultInfo(result, sToken, sHuman);
2963
2964 jvObj[jss::type] = "transaction";
2965 // NOTE jvObj is not a finished object for either API version. After
2966 // it's populated, we need to finish it for a specific API version. This is
2967 // done in a loop, near the end of this function.
2968 jvObj[jss::transaction] = transaction->getJson(JsonOptions::disable_API_prior_V2, false);
2969
2970 if (meta)
2971 {
2972 jvObj[jss::meta] = meta->get().getJson(JsonOptions::none);
2973 RPC::insertDeliveredAmount(jvObj[jss::meta], *ledger, transaction, meta->get());
2974 RPC::insertNFTSyntheticInJson(jvObj, transaction, meta->get());
2975 RPC::insertMPTokenIssuanceID(jvObj[jss::meta], transaction, meta->get());
2976 }
2977
2978 // add CTID where the needed data for it exists
2979 if (auto const& lookup = ledger->txRead(transaction->getTransactionID());
2980 lookup.second && lookup.second->isFieldPresent(sfTransactionIndex))
2981 {
2982 uint32_t const txnSeq = lookup.second->getFieldU32(sfTransactionIndex);
2983 uint32_t netID = app_.config().NETWORK_ID;
2984 if (transaction->isFieldPresent(sfNetworkID))
2985 netID = transaction->getFieldU32(sfNetworkID);
2986
2987 if (std::optional<std::string> ctid = RPC::encodeCTID(ledger->header().seq, txnSeq, netID); ctid)
2988 jvObj[jss::ctid] = *ctid;
2989 }
2990 if (!ledger->open())
2991 jvObj[jss::ledger_hash] = to_string(ledger->header().hash);
2992
2993 if (validated)
2994 {
2995 jvObj[jss::ledger_index] = ledger->header().seq;
2996 jvObj[jss::transaction][jss::date] = ledger->header().closeTime.time_since_epoch().count();
2997 jvObj[jss::validated] = true;
2998 jvObj[jss::close_time_iso] = to_string_iso(ledger->header().closeTime);
2999
3000 // WRITEME: Put the account next seq here
3001 }
3002 else
3003 {
3004 jvObj[jss::validated] = false;
3005 jvObj[jss::ledger_current_index] = ledger->header().seq;
3006 }
3007
3008 jvObj[jss::status] = validated ? "closed" : "proposed";
3009 jvObj[jss::engine_result] = sToken;
3010 jvObj[jss::engine_result_code] = result;
3011 jvObj[jss::engine_result_message] = sHuman;
3012
3013 if (transaction->getTxnType() == ttOFFER_CREATE)
3014 {
3015 auto const account = transaction->getAccountID(sfAccount);
3016 auto const amount = transaction->getFieldAmount(sfTakerGets);
3017
3018 // If the offer create is not self funded then add the owner balance
3019 if (account != amount.issue().account)
3020 {
3021 auto const ownerFunds = accountFunds(*ledger, account, amount, fhIGNORE_FREEZE, app_.journal("View"));
3022 jvObj[jss::transaction][jss::owner_funds] = ownerFunds.getText();
3023 }
3024 }
3025
3026 std::string const hash = to_string(transaction->getTransactionID());
3027 MultiApiJson multiObj{jvObj};
3029 multiObj.visit(), //
3030 [&]<unsigned Version>(Json::Value& jvTx, std::integral_constant<unsigned, Version>) {
3031 RPC::insertDeliverMax(jvTx[jss::transaction], transaction->getTxnType(), Version);
3032
3033 if constexpr (Version > 1)
3034 {
3035 jvTx[jss::tx_json] = jvTx.removeMember(jss::transaction);
3036 jvTx[jss::hash] = hash;
3037 }
3038 else
3039 {
3040 jvTx[jss::transaction][jss::hash] = hash;
3041 }
3042 });
3043
3044 return multiObj;
3045}
3046
3047void
3049 std::shared_ptr<ReadView const> const& ledger,
3050 AcceptedLedgerTx const& transaction,
3051 bool last)
3052{
3053 auto const& stTxn = transaction.getTxn();
3054
3055 // Create two different Json objects, for different API versions
3056 auto const metaRef = std::ref(transaction.getMeta());
3057 auto const trResult = transaction.getResult();
3058 MultiApiJson jvObj = transJson(stTxn, trResult, true, ledger, metaRef);
3059
3060 {
3062
3063 auto it = mStreamMaps[sTransactions].begin();
3064 while (it != mStreamMaps[sTransactions].end())
3065 {
3066 InfoSub::pointer p = it->second.lock();
3067
3068 if (p)
3069 {
3070 jvObj.visit(
3071 p->getApiVersion(), //
3072 [&](Json::Value const& jv) { p->send(jv, true); });
3073 ++it;
3074 }
3075 else
3076 it = mStreamMaps[sTransactions].erase(it);
3077 }
3078
3079 it = mStreamMaps[sRTTransactions].begin();
3080
3081 while (it != mStreamMaps[sRTTransactions].end())
3082 {
3083 InfoSub::pointer p = it->second.lock();
3084
3085 if (p)
3086 {
3087 jvObj.visit(
3088 p->getApiVersion(), //
3089 [&](Json::Value const& jv) { p->send(jv, true); });
3090 ++it;
3091 }
3092 else
3093 it = mStreamMaps[sRTTransactions].erase(it);
3094 }
3095 }
3096
3097 if (transaction.getResult() == tesSUCCESS)
3098 app_.getOrderBookDB().processTxn(ledger, transaction, jvObj);
3099
3100 pubAccountTransaction(ledger, transaction, last);
3101}
3102
3103void
3105 std::shared_ptr<ReadView const> const& ledger,
3106 AcceptedLedgerTx const& transaction,
3107 bool last)
3108{
3110 int iProposed = 0;
3111 int iAccepted = 0;
3112
3113 std::vector<SubAccountHistoryInfo> accountHistoryNotify;
3114 auto const currLedgerSeq = ledger->seq();
3115 {
3117
3119 {
3120 for (auto const& affectedAccount : transaction.getAffected())
3121 {
3122 if (auto simiIt = mSubRTAccount.find(affectedAccount); simiIt != mSubRTAccount.end())
3123 {
3124 auto it = simiIt->second.begin();
3125
3126 while (it != simiIt->second.end())
3127 {
3128 InfoSub::pointer p = it->second.lock();
3129
3130 if (p)
3131 {
3132 notify.insert(p);
3133 ++it;
3134 ++iProposed;
3135 }
3136 else
3137 it = simiIt->second.erase(it);
3138 }
3139 }
3140
3141 if (auto simiIt = mSubAccount.find(affectedAccount); simiIt != mSubAccount.end())
3142 {
3143 auto it = simiIt->second.begin();
3144 while (it != simiIt->second.end())
3145 {
3146 InfoSub::pointer p = it->second.lock();
3147
3148 if (p)
3149 {
3150 notify.insert(p);
3151 ++it;
3152 ++iAccepted;
3153 }
3154 else
3155 it = simiIt->second.erase(it);
3156 }
3157 }
3158
3159 if (auto historyIt = mSubAccountHistory.find(affectedAccount); historyIt != mSubAccountHistory.end())
3160 {
3161 auto& subs = historyIt->second;
3162 auto it = subs.begin();
3163 while (it != subs.end())
3164 {
3165 SubAccountHistoryInfoWeak const& info = it->second;
3166 if (currLedgerSeq <= info.index_->separationLedgerSeq_)
3167 {
3168 ++it;
3169 continue;
3170 }
3171
3172 if (auto isSptr = info.sinkWptr_.lock(); isSptr)
3173 {
3174 accountHistoryNotify.emplace_back(SubAccountHistoryInfo{isSptr, info.index_});
3175 ++it;
3176 }
3177 else
3178 {
3179 it = subs.erase(it);
3180 }
3181 }
3182 if (subs.empty())
3183 mSubAccountHistory.erase(historyIt);
3184 }
3185 }
3186 }
3187 }
3188
3189 JLOG(m_journal.trace()) << "pubAccountTransaction: "
3190 << "proposed=" << iProposed << ", accepted=" << iAccepted;
3191
3192 if (!notify.empty() || !accountHistoryNotify.empty())
3193 {
3194 auto const& stTxn = transaction.getTxn();
3195
3196 // Create two different Json objects, for different API versions
3197 auto const metaRef = std::ref(transaction.getMeta());
3198 auto const trResult = transaction.getResult();
3199 MultiApiJson jvObj = transJson(stTxn, trResult, true, ledger, metaRef);
3200
3201 for (InfoSub::ref isrListener : notify)
3202 {
3203 jvObj.visit(
3204 isrListener->getApiVersion(), //
3205 [&](Json::Value const& jv) { isrListener->send(jv, true); });
3206 }
3207
3208 if (last)
3209 jvObj.set(jss::account_history_boundary, true);
3210
3211 XRPL_ASSERT(
3212 jvObj.isMember(jss::account_history_tx_stream) == MultiApiJson::none,
3213 "xrpl::NetworkOPsImp::pubAccountTransaction : "
3214 "account_history_tx_stream not set");
3215 for (auto& info : accountHistoryNotify)
3216 {
3217 auto& index = info.index_;
3218 if (index->forwardTxIndex_ == 0 && !index->haveHistorical_)
3219 jvObj.set(jss::account_history_tx_first, true);
3220
3221 jvObj.set(jss::account_history_tx_index, index->forwardTxIndex_++);
3222
3223 jvObj.visit(
3224 info.sink_->getApiVersion(), //
3225 [&](Json::Value const& jv) { info.sink_->send(jv, true); });
3226 }
3227 }
3228}
3229
3230void
3232 std::shared_ptr<ReadView const> const& ledger,
3234 TER result)
3235{
3237 int iProposed = 0;
3238
3239 std::vector<SubAccountHistoryInfo> accountHistoryNotify;
3240
3241 {
3243
3244 if (mSubRTAccount.empty())
3245 return;
3246
3248 {
3249 for (auto const& affectedAccount : tx->getMentionedAccounts())
3250 {
3251 if (auto simiIt = mSubRTAccount.find(affectedAccount); simiIt != mSubRTAccount.end())
3252 {
3253 auto it = simiIt->second.begin();
3254
3255 while (it != simiIt->second.end())
3256 {
3257 InfoSub::pointer p = it->second.lock();
3258
3259 if (p)
3260 {
3261 notify.insert(p);
3262 ++it;
3263 ++iProposed;
3264 }
3265 else
3266 it = simiIt->second.erase(it);
3267 }
3268 }
3269 }
3270 }
3271 }
3272
3273 JLOG(m_journal.trace()) << "pubProposedAccountTransaction: " << iProposed;
3274
3275 if (!notify.empty() || !accountHistoryNotify.empty())
3276 {
3277 // Create two different Json objects, for different API versions
3278 MultiApiJson jvObj = transJson(tx, result, false, ledger, std::nullopt);
3279
3280 for (InfoSub::ref isrListener : notify)
3281 jvObj.visit(
3282 isrListener->getApiVersion(), //
3283 [&](Json::Value const& jv) { isrListener->send(jv, true); });
3284
3285 XRPL_ASSERT(
3286 jvObj.isMember(jss::account_history_tx_stream) == MultiApiJson::none,
3287 "xrpl::NetworkOPs::pubProposedAccountTransaction : "
3288 "account_history_tx_stream not set");
3289 for (auto& info : accountHistoryNotify)
3290 {
3291 auto& index = info.index_;
3292 if (index->forwardTxIndex_ == 0 && !index->haveHistorical_)
3293 jvObj.set(jss::account_history_tx_first, true);
3294 jvObj.set(jss::account_history_tx_index, index->forwardTxIndex_++);
3295 jvObj.visit(
3296 info.sink_->getApiVersion(), //
3297 [&](Json::Value const& jv) { info.sink_->send(jv, true); });
3298 }
3299 }
3300}
3301
3302//
3303// Monitoring
3304//
3305
3306void
3307NetworkOPsImp::subAccount(InfoSub::ref isrListener, hash_set<AccountID> const& vnaAccountIDs, bool rt)
3308{
3309 SubInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount;
3310
3311 for (auto const& naAccountID : vnaAccountIDs)
3312 {
3313 JLOG(m_journal.trace()) << "subAccount: account: " << toBase58(naAccountID);
3314
3315 isrListener->insertSubAccountInfo(naAccountID, rt);
3316 }
3317
3319
3320 for (auto const& naAccountID : vnaAccountIDs)
3321 {
3322 auto simIterator = subMap.find(naAccountID);
3323 if (simIterator == subMap.end())
3324 {
3325 // Not found, note that account has a new single listener.
3326 SubMapType usisElement;
3327 usisElement[isrListener->getSeq()] = isrListener;
3328 // VFALCO NOTE This is making a needless copy of naAccountID
3329 subMap.insert(simIterator, make_pair(naAccountID, usisElement));
3330 }
3331 else
3332 {
3333 // Found, note that the account has another listener.
3334 simIterator->second[isrListener->getSeq()] = isrListener;
3335 }
3336 }
3337}
3338
3339void
3340NetworkOPsImp::unsubAccount(InfoSub::ref isrListener, hash_set<AccountID> const& vnaAccountIDs, bool rt)
3341{
3342 for (auto const& naAccountID : vnaAccountIDs)
3343 {
3344 // Remove from the InfoSub
3345 isrListener->deleteSubAccountInfo(naAccountID, rt);
3346 }
3347
3348 // Remove from the server
3349 unsubAccountInternal(isrListener->getSeq(), vnaAccountIDs, rt);
3350}
3351
3352void
3354{
3356
3357 SubInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount;
3358
3359 for (auto const& naAccountID : vnaAccountIDs)
3360 {
3361 auto simIterator = subMap.find(naAccountID);
3362
3363 if (simIterator != subMap.end())
3364 {
3365 // Found
3366 simIterator->second.erase(uSeq);
3367
3368 if (simIterator->second.empty())
3369 {
3370 // Don't need hash entry.
3371 subMap.erase(simIterator);
3372 }
3373 }
3374 }
3375}
3376
3377void
3379{
3380 enum DatabaseType { Sqlite, None };
3381 static auto const databaseType = [&]() -> DatabaseType {
3382 // Use a dynamic_cast to return DatabaseType::None
3383 // on failure.
3384 if (dynamic_cast<SQLiteDatabase*>(&app_.getRelationalDatabase()))
3385 {
3386 return DatabaseType::Sqlite;
3387 }
3388 return DatabaseType::None;
3389 }();
3390
3391 if (databaseType == DatabaseType::None)
3392 {
3393 // LCOV_EXCL_START
3394 UNREACHABLE("xrpl::NetworkOPsImp::addAccountHistoryJob : no database");
3395 JLOG(m_journal.error()) << "AccountHistory job for account " << toBase58(subInfo.index_->accountId_)
3396 << " no database";
3397 if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
3398 {
3399 sptr->send(rpcError(rpcINTERNAL), true);
3400 unsubAccountHistory(sptr, subInfo.index_->accountId_, false);
3401 }
3402 return;
3403 // LCOV_EXCL_STOP
3404 }
3405
3406 app_.getJobQueue().addJob(jtCLIENT_ACCT_HIST, "HistTxStream", [this, dbType = databaseType, subInfo]() {
3407 auto const& accountId = subInfo.index_->accountId_;
3408 auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_;
3409 auto& txHistoryIndex = subInfo.index_->historyTxIndex_;
3410
3411 JLOG(m_journal.trace()) << "AccountHistory job for account " << toBase58(accountId)
3412 << " started. lastLedgerSeq=" << lastLedgerSeq;
3413
3414 auto isFirstTx = [&](std::shared_ptr<Transaction> const& tx, std::shared_ptr<TxMeta> const& meta) -> bool {
3415 /*
3416 * genesis account: first tx is the one with seq 1
3417 * other account: first tx is the one created the account
3418 */
3419 if (accountId == genesisAccountId)
3420 {
3421 auto stx = tx->getSTransaction();
3422 if (stx->getAccountID(sfAccount) == accountId && stx->getSeqValue() == 1)
3423 return true;
3424 }
3425
3426 for (auto& node : meta->getNodes())
3427 {
3428 if (node.getFieldU16(sfLedgerEntryType) != ltACCOUNT_ROOT)
3429 continue;
3430
3431 if (node.isFieldPresent(sfNewFields))
3432 {
3433 if (auto inner = dynamic_cast<STObject const*>(node.peekAtPField(sfNewFields)); inner)
3434 {
3435 if (inner->isFieldPresent(sfAccount) && inner->getAccountID(sfAccount) == accountId)
3436 {
3437 return true;
3438 }
3439 }
3440 }
3441 }
3442
3443 return false;
3444 };
3445
3446 auto send = [&](Json::Value const& jvObj, bool unsubscribe) -> bool {
3447 if (auto sptr = subInfo.sinkWptr_.lock())
3448 {
3449 sptr->send(jvObj, true);
3450 if (unsubscribe)
3451 unsubAccountHistory(sptr, accountId, false);
3452 return true;
3453 }
3454
3455 return false;
3456 };
3457
3458 auto sendMultiApiJson = [&](MultiApiJson const& jvObj, bool unsubscribe) -> bool {
3459 if (auto sptr = subInfo.sinkWptr_.lock())
3460 {
3461 jvObj.visit(
3462 sptr->getApiVersion(), //
3463 [&](Json::Value const& jv) { sptr->send(jv, true); });
3464
3465 if (unsubscribe)
3466 unsubAccountHistory(sptr, accountId, false);
3467 return true;
3468 }
3469
3470 return false;
3471 };
3472
3473 auto getMoreTxns = [&](std::uint32_t minLedger,
3474 std::uint32_t maxLedger,
3476 -> std::optional<
3478 switch (dbType)
3479 {
3480 case Sqlite: {
3481 auto db = static_cast<SQLiteDatabase*>(&app_.getRelationalDatabase());
3482 RelationalDatabase::AccountTxPageOptions options{accountId, minLedger, maxLedger, marker, 0, true};
3483 return db->newestAccountTxPage(options);
3484 }
3485 // LCOV_EXCL_START
3486 default: {
3487 UNREACHABLE(
3488 "xrpl::NetworkOPsImp::addAccountHistoryJob : "
3489 "getMoreTxns : invalid database type");
3490 return {};
3491 }
3492 // LCOV_EXCL_STOP
3493 }
3494 };
3495
3496 /*
3497 * search backward until the genesis ledger or asked to stop
3498 */
3499 while (lastLedgerSeq >= 2 && !subInfo.index_->stopHistorical_)
3500 {
3501 int feeChargeCount = 0;
3502 if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
3503 {
3504 sptr->getConsumer().charge(Resource::feeMediumBurdenRPC);
3505 ++feeChargeCount;
3506 }
3507 else
3508 {
3509 JLOG(m_journal.trace()) << "AccountHistory job for account " << toBase58(accountId)
3510 << " no InfoSub. Fee charged " << feeChargeCount << " times.";
3511 return;
3512 }
3513
3514 // try to search in 1024 ledgers till reaching genesis ledgers
3515 auto startLedgerSeq = (lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 : 2);
3516 JLOG(m_journal.trace()) << "AccountHistory job for account " << toBase58(accountId)
3517 << ", working on ledger range [" << startLedgerSeq << "," << lastLedgerSeq << "]";
3518
3519 auto haveRange = [&]() -> bool {
3520 std::uint32_t validatedMin = UINT_MAX;
3521 std::uint32_t validatedMax = 0;
3522 auto haveSomeValidatedLedgers = app_.getLedgerMaster().getValidatedRange(validatedMin, validatedMax);
3523
3524 return haveSomeValidatedLedgers && validatedMin <= startLedgerSeq && lastLedgerSeq <= validatedMax;
3525 }();
3526
3527 if (!haveRange)
3528 {
3529 JLOG(m_journal.debug()) << "AccountHistory reschedule job for account " << toBase58(accountId)
3530 << ", incomplete ledger range [" << startLedgerSeq << "," << lastLedgerSeq
3531 << "]";
3533 return;
3534 }
3535
3537 while (!subInfo.index_->stopHistorical_)
3538 {
3539 auto dbResult = getMoreTxns(startLedgerSeq, lastLedgerSeq, marker);
3540 if (!dbResult)
3541 {
3542 // LCOV_EXCL_START
3543 UNREACHABLE(
3544 "xrpl::NetworkOPsImp::addAccountHistoryJob : "
3545 "getMoreTxns failed");
3546 JLOG(m_journal.debug())
3547 << "AccountHistory job for account " << toBase58(accountId) << " getMoreTxns failed.";
3548 send(rpcError(rpcINTERNAL), true);
3549 return;
3550 // LCOV_EXCL_STOP
3551 }
3552
3553 auto const& txns = dbResult->first;
3554 marker = dbResult->second;
3555 size_t num_txns = txns.size();
3556 for (size_t i = 0; i < num_txns; ++i)
3557 {
3558 auto const& [tx, meta] = txns[i];
3559
3560 if (!tx || !meta)
3561 {
3562 JLOG(m_journal.debug())
3563 << "AccountHistory job for account " << toBase58(accountId) << " empty tx or meta.";
3564 send(rpcError(rpcINTERNAL), true);
3565 return;
3566 }
3567 auto curTxLedger = app_.getLedgerMaster().getLedgerBySeq(tx->getLedger());
3568 if (!curTxLedger)
3569 {
3570 // LCOV_EXCL_START
3571 UNREACHABLE(
3572 "xrpl::NetworkOPsImp::addAccountHistoryJob : "
3573 "getLedgerBySeq failed");
3574 JLOG(m_journal.debug())
3575 << "AccountHistory job for account " << toBase58(accountId) << " no ledger.";
3576 send(rpcError(rpcINTERNAL), true);
3577 return;
3578 // LCOV_EXCL_STOP
3579 }
3580 std::shared_ptr<STTx const> stTxn = tx->getSTransaction();
3581 if (!stTxn)
3582 {
3583 // LCOV_EXCL_START
3584 UNREACHABLE(
3585 "NetworkOPsImp::addAccountHistoryJob : "
3586 "getSTransaction failed");
3587 JLOG(m_journal.debug())
3588 << "AccountHistory job for account " << toBase58(accountId) << " getSTransaction failed.";
3589 send(rpcError(rpcINTERNAL), true);
3590 return;
3591 // LCOV_EXCL_STOP
3592 }
3593
3594 auto const mRef = std::ref(*meta);
3595 auto const trR = meta->getResultTER();
3596 MultiApiJson jvTx = transJson(stTxn, trR, true, curTxLedger, mRef);
3597
3598 jvTx.set(jss::account_history_tx_index, txHistoryIndex--);
3599 if (i + 1 == num_txns || txns[i + 1].first->getLedger() != tx->getLedger())
3600 jvTx.set(jss::account_history_boundary, true);
3601
3602 if (isFirstTx(tx, meta))
3603 {
3604 jvTx.set(jss::account_history_tx_first, true);
3605 sendMultiApiJson(jvTx, false);
3606
3607 JLOG(m_journal.trace())
3608 << "AccountHistory job for account " << toBase58(accountId) << " done, found last tx.";
3609 return;
3610 }
3611 else
3612 {
3613 sendMultiApiJson(jvTx, false);
3614 }
3615 }
3616
3617 if (marker)
3618 {
3619 JLOG(m_journal.trace()) << "AccountHistory job for account " << toBase58(accountId)
3620 << " paging, marker=" << marker->ledgerSeq << ":" << marker->txnSeq;
3621 }
3622 else
3623 {
3624 break;
3625 }
3626 }
3627
3628 if (!subInfo.index_->stopHistorical_)
3629 {
3630 lastLedgerSeq = startLedgerSeq - 1;
3631 if (lastLedgerSeq <= 1)
3632 {
3633 JLOG(m_journal.trace())
3634 << "AccountHistory job for account " << toBase58(accountId) << " done, reached genesis ledger.";
3635 return;
3636 }
3637 }
3638 }
3639 });
3640}
3641
3642void
3644{
3645 subInfo.index_->separationLedgerSeq_ = ledger->seq();
3646 auto const& accountId = subInfo.index_->accountId_;
3647 auto const accountKeylet = keylet::account(accountId);
3648 if (!ledger->exists(accountKeylet))
3649 {
3650 JLOG(m_journal.debug()) << "subAccountHistoryStart, no account " << toBase58(accountId)
3651 << ", no need to add AccountHistory job.";
3652 return;
3653 }
3654 if (accountId == genesisAccountId)
3655 {
3656 if (auto const sleAcct = ledger->read(accountKeylet); sleAcct)
3657 {
3658 if (sleAcct->getFieldU32(sfSequence) == 1)
3659 {
3660 JLOG(m_journal.debug()) << "subAccountHistoryStart, genesis account " << toBase58(accountId)
3661 << " does not have tx, no need to add AccountHistory job.";
3662 return;
3663 }
3664 }
3665 else
3666 {
3667 // LCOV_EXCL_START
3668 UNREACHABLE(
3669 "xrpl::NetworkOPsImp::subAccountHistoryStart : failed to "
3670 "access genesis account");
3671 return;
3672 // LCOV_EXCL_STOP
3673 }
3674 }
3675 subInfo.index_->historyLastLedgerSeq_ = ledger->seq();
3676 subInfo.index_->haveHistorical_ = true;
3677
3678 JLOG(m_journal.debug()) << "subAccountHistoryStart, add AccountHistory job: accountId=" << toBase58(accountId)
3679 << ", currentLedgerSeq=" << ledger->seq();
3680
3681 addAccountHistoryJob(subInfo);
3682}
3683
3686{
3687 if (!isrListener->insertSubAccountHistory(accountId))
3688 {
3689 JLOG(m_journal.debug()) << "subAccountHistory, already subscribed to account " << toBase58(accountId);
3690 return rpcINVALID_PARAMS;
3691 }
3692
3695 auto simIterator = mSubAccountHistory.find(accountId);
3696 if (simIterator == mSubAccountHistory.end())
3697 {
3699 inner.emplace(isrListener->getSeq(), ahi);
3700 mSubAccountHistory.insert(simIterator, std::make_pair(accountId, inner));
3701 }
3702 else
3703 {
3704 simIterator->second.emplace(isrListener->getSeq(), ahi);
3705 }
3706
3707 auto const ledger = app_.getLedgerMaster().getValidatedLedger();
3708 if (ledger)
3709 {
3710 subAccountHistoryStart(ledger, ahi);
3711 }
3712 else
3713 {
3714 // The node does not have validated ledgers, so wait for
3715 // one before start streaming.
3716 // In this case, the subscription is also considered successful.
3717 JLOG(m_journal.debug()) << "subAccountHistory, no validated ledger yet, delay start";
3718 }
3719
3720 return rpcSUCCESS;
3721}
3722
3723void
3724NetworkOPsImp::unsubAccountHistory(InfoSub::ref isrListener, AccountID const& account, bool historyOnly)
3725{
3726 if (!historyOnly)
3727 isrListener->deleteSubAccountHistory(account);
3728 unsubAccountHistoryInternal(isrListener->getSeq(), account, historyOnly);
3729}
3730
3731void
3733{
3735 auto simIterator = mSubAccountHistory.find(account);
3736 if (simIterator != mSubAccountHistory.end())
3737 {
3738 auto& subInfoMap = simIterator->second;
3739 auto subInfoIter = subInfoMap.find(seq);
3740 if (subInfoIter != subInfoMap.end())
3741 {
3742 subInfoIter->second.index_->stopHistorical_ = true;
3743 }
3744
3745 if (!historyOnly)
3746 {
3747 simIterator->second.erase(seq);
3748 if (simIterator->second.empty())
3749 {
3750 mSubAccountHistory.erase(simIterator);
3751 }
3752 }
3753 JLOG(m_journal.debug()) << "unsubAccountHistory, account " << toBase58(account)
3754 << ", historyOnly = " << (historyOnly ? "true" : "false");
3755 }
3756}
3757
3758bool
3760{
3761 if (auto listeners = app_.getOrderBookDB().makeBookListeners(book))
3762 listeners->addSubscriber(isrListener);
3763 else
3764 {
3765 // LCOV_EXCL_START
3766 UNREACHABLE("xrpl::NetworkOPsImp::subBook : null book listeners");
3767 // LCOV_EXCL_STOP
3768 }
3769 return true;
3770}
3771
3772bool
3774{
3775 if (auto listeners = app_.getOrderBookDB().getBookListeners(book))
3776 listeners->removeSubscriber(uSeq);
3777
3778 return true;
3779}
3780
3783{
3784 // This code-path is exclusively used when the server is in standalone
3785 // mode via `ledger_accept`
3786 XRPL_ASSERT(m_standalone, "xrpl::NetworkOPsImp::acceptLedger : is standalone");
3787
3788 if (!m_standalone)
3789 Throw<std::runtime_error>("Operation only possible in STANDALONE mode.");
3790
3791 // FIXME Could we improve on this and remove the need for a specialized
3792 // API in Consensus?
3793 beginConsensus(m_ledgerMaster.getClosedLedger()->header().hash, {});
3794 mConsensus.simulate(app_.timeKeeper().closeTime(), consensusDelay);
3795 return m_ledgerMaster.getCurrentLedger()->header().seq;
3796}
3797
3798// <-- bool: true=added, false=already there
3799bool
3801{
3802 if (auto lpClosed = m_ledgerMaster.getValidatedLedger())
3803 {
3804 jvResult[jss::ledger_index] = lpClosed->header().seq;
3805 jvResult[jss::ledger_hash] = to_string(lpClosed->header().hash);
3806 jvResult[jss::ledger_time] = Json::Value::UInt(lpClosed->header().closeTime.time_since_epoch().count());
3807 if (!lpClosed->rules().enabled(featureXRPFees))
3808 jvResult[jss::fee_ref] = Config::FEE_UNITS_DEPRECATED;
3809 jvResult[jss::fee_base] = lpClosed->fees().base.jsonClipped();
3810 jvResult[jss::reserve_base] = lpClosed->fees().reserve.jsonClipped();
3811 jvResult[jss::reserve_inc] = lpClosed->fees().increment.jsonClipped();
3812 jvResult[jss::network_id] = app_.config().NETWORK_ID;
3813 }
3814
3816 {
3817 jvResult[jss::validated_ledgers] = app_.getLedgerMaster().getCompleteLedgers();
3818 }
3819
3821 return mStreamMaps[sLedger].emplace(isrListener->getSeq(), isrListener).second;
3822}
3823
3824// <-- bool: true=added, false=already there
3825bool
3827{
3829 return mStreamMaps[sBookChanges].emplace(isrListener->getSeq(), isrListener).second;
3830}
3831
3832// <-- bool: true=erased, false=was not there
3833bool
3835{
3837 return mStreamMaps[sLedger].erase(uSeq);
3838}
3839
3840// <-- bool: true=erased, false=was not there
3841bool
3847
3848// <-- bool: true=added, false=already there
3849bool
3851{
3853 return mStreamMaps[sManifests].emplace(isrListener->getSeq(), isrListener).second;
3854}
3855
3856// <-- bool: true=erased, false=was not there
3857bool
3863
3864// <-- bool: true=added, false=already there
3865bool
3866NetworkOPsImp::subServer(InfoSub::ref isrListener, Json::Value& jvResult, bool admin)
3867{
3868 uint256 uRandom;
3869
3870 if (m_standalone)
3871 jvResult[jss::stand_alone] = m_standalone;
3872
3873 // CHECKME: is it necessary to provide a random number here?
3874 beast::rngfill(uRandom.begin(), uRandom.size(), crypto_prng());
3875
3876 auto const& feeTrack = app_.getFeeTrack();
3877 jvResult[jss::random] = to_string(uRandom);
3878 jvResult[jss::server_status] = strOperatingMode(admin);
3879 jvResult[jss::load_base] = feeTrack.getLoadBase();
3880 jvResult[jss::load_factor] = feeTrack.getLoadFactor();
3881 jvResult[jss::hostid] = getHostId(admin);
3882 jvResult[jss::pubkey_node] = toBase58(TokenType::NodePublic, app_.nodeIdentity().first);
3883
3885 return mStreamMaps[sServer].emplace(isrListener->getSeq(), isrListener).second;
3886}
3887
3888// <-- bool: true=erased, false=was not there
3889bool
3891{
3893 return mStreamMaps[sServer].erase(uSeq);
3894}
3895
3896// <-- bool: true=added, false=already there
3897bool
3899{
3901 return mStreamMaps[sTransactions].emplace(isrListener->getSeq(), isrListener).second;
3902}
3903
3904// <-- bool: true=erased, false=was not there
3905bool
3911
3912// <-- bool: true=added, false=already there
3913bool
3915{
3917 return mStreamMaps[sRTTransactions].emplace(isrListener->getSeq(), isrListener).second;
3918}
3919
3920// <-- bool: true=erased, false=was not there
3921bool
3927
3928// <-- bool: true=added, false=already there
3929bool
3931{
3933 return mStreamMaps[sValidations].emplace(isrListener->getSeq(), isrListener).second;
3934}
3935
3936void
3941
3942// <-- bool: true=erased, false=was not there
3943bool
3949
3950// <-- bool: true=added, false=already there
3951bool
3953{
3955 return mStreamMaps[sPeerStatus].emplace(isrListener->getSeq(), isrListener).second;
3956}
3957
3958// <-- bool: true=erased, false=was not there
3959bool
3965
3966// <-- bool: true=added, false=already there
3967bool
3969{
3971 return mStreamMaps[sConsensusPhase].emplace(isrListener->getSeq(), isrListener).second;
3972}
3973
3974// <-- bool: true=erased, false=was not there
3975bool
3981
3984{
3986
3987 subRpcMapType::iterator it = mRpcSubMap.find(strUrl);
3988
3989 if (it != mRpcSubMap.end())
3990 return it->second;
3991
3992 return InfoSub::pointer();
3993}
3994
3997{
3999
4000 mRpcSubMap.emplace(strUrl, rspEntry);
4001
4002 return rspEntry;
4003}
4004
4005bool
4007{
4009 auto pInfo = findRpcSub(strUrl);
4010
4011 if (!pInfo)
4012 return false;
4013
4014 // check to see if any of the stream maps still hold a weak reference to
4015 // this entry before removing
4016 for (SubMapType const& map : mStreamMaps)
4017 {
4018 if (map.find(pInfo->getSeq()) != map.end())
4019 return false;
4020 }
4021 mRpcSubMap.erase(strUrl);
4022 return true;
4023}
4024
4025#ifndef USE_NEW_BOOK_PAGE
4026
4027// NIKB FIXME this should be looked at. There's no reason why this shouldn't
4028// work, but it demonstrated poor performance.
4029//
4030void
4033 Book const& book,
4034 AccountID const& uTakerID,
4035 bool const bProof,
4036 unsigned int iLimit,
4037 Json::Value const& jvMarker,
4038 Json::Value& jvResult)
4039{ // CAUTION: This is the old get book page logic
4040 Json::Value& jvOffers = (jvResult[jss::offers] = Json::Value(Json::arrayValue));
4041
4043 uint256 const uBookBase = getBookBase(book);
4044 uint256 const uBookEnd = getQualityNext(uBookBase);
4045 uint256 uTipIndex = uBookBase;
4046
4047 if (auto stream = m_journal.trace())
4048 {
4049 stream << "getBookPage:" << book;
4050 stream << "getBookPage: uBookBase=" << uBookBase;
4051 stream << "getBookPage: uBookEnd=" << uBookEnd;
4052 stream << "getBookPage: uTipIndex=" << uTipIndex;
4053 }
4054
4055 ReadView const& view = *lpLedger;
4056
4057 bool const bGlobalFreeze = isGlobalFrozen(view, book.out.account) || isGlobalFrozen(view, book.in.account);
4058
4059 bool bDone = false;
4060 bool bDirectAdvance = true;
4061
4062 std::shared_ptr<SLE const> sleOfferDir;
4063 uint256 offerIndex;
4064 unsigned int uBookEntry;
4065 STAmount saDirRate;
4066
4067 auto const rate = transferRate(view, book.out.account);
4068 auto viewJ = app_.journal("View");
4069
4070 while (!bDone && iLimit-- > 0)
4071 {
4072 if (bDirectAdvance)
4073 {
4074 bDirectAdvance = false;
4075
4076 JLOG(m_journal.trace()) << "getBookPage: bDirectAdvance";
4077
4078 auto const ledgerIndex = view.succ(uTipIndex, uBookEnd);
4079 if (ledgerIndex)
4080 sleOfferDir = view.read(keylet::page(*ledgerIndex));
4081 else
4082 sleOfferDir.reset();
4083
4084 if (!sleOfferDir)
4085 {
4086 JLOG(m_journal.trace()) << "getBookPage: bDone";
4087 bDone = true;
4088 }
4089 else
4090 {
4091 uTipIndex = sleOfferDir->key();
4092 saDirRate = amountFromQuality(getQuality(uTipIndex));
4093
4094 cdirFirst(view, uTipIndex, sleOfferDir, uBookEntry, offerIndex);
4095
4096 JLOG(m_journal.trace()) << "getBookPage: uTipIndex=" << uTipIndex;
4097 JLOG(m_journal.trace()) << "getBookPage: offerIndex=" << offerIndex;
4098 }
4099 }
4100
4101 if (!bDone)
4102 {
4103 auto sleOffer = view.read(keylet::offer(offerIndex));
4104
4105 if (sleOffer)
4106 {
4107 auto const uOfferOwnerID = sleOffer->getAccountID(sfAccount);
4108 auto const& saTakerGets = sleOffer->getFieldAmount(sfTakerGets);
4109 auto const& saTakerPays = sleOffer->getFieldAmount(sfTakerPays);
4110 STAmount saOwnerFunds;
4111 bool firstOwnerOffer(true);
4112
4113 if (book.out.account == uOfferOwnerID)
4114 {
4115 // If an offer is selling issuer's own IOUs, it is fully
4116 // funded.
4117 saOwnerFunds = saTakerGets;
4118 }
4119 else if (bGlobalFreeze)
4120 {
4121 // If either asset is globally frozen, consider all offers
4122 // that aren't ours to be totally unfunded
4123 saOwnerFunds.clear(book.out);
4124 }
4125 else
4126 {
4127 auto umBalanceEntry = umBalance.find(uOfferOwnerID);
4128 if (umBalanceEntry != umBalance.end())
4129 {
4130 // Found in running balance table.
4131
4132 saOwnerFunds = umBalanceEntry->second;
4133 firstOwnerOffer = false;
4134 }
4135 else
4136 {
4137 // Did not find balance in table.
4138
4139 saOwnerFunds = accountHolds(
4140 view, uOfferOwnerID, book.out.currency, book.out.account, fhZERO_IF_FROZEN, viewJ);
4141
4142 if (saOwnerFunds < beast::zero)
4143 {
4144 // Treat negative funds as zero.
4145
4146 saOwnerFunds.clear();
4147 }
4148 }
4149 }
4150
4151 Json::Value jvOffer = sleOffer->getJson(JsonOptions::none);
4152
4153 STAmount saTakerGetsFunded;
4154 STAmount saOwnerFundsLimit = saOwnerFunds;
4155 Rate offerRate = parityRate;
4156
4157 if (rate != parityRate
4158 // Have a transfer fee.
4159 && uTakerID != book.out.account
4160 // Not taking offers of own IOUs.
4161 && book.out.account != uOfferOwnerID)
4162 // Offer owner not issuing ownfunds
4163 {
4164 // Need to charge a transfer fee to offer owner.
4165 offerRate = rate;
4166 saOwnerFundsLimit = divide(saOwnerFunds, offerRate);
4167 }
4168
4169 if (saOwnerFundsLimit >= saTakerGets)
4170 {
4171 // Sufficient funds no shenanigans.
4172 saTakerGetsFunded = saTakerGets;
4173 }
4174 else
4175 {
4176 // Only provide, if not fully funded.
4177
4178 saTakerGetsFunded = saOwnerFundsLimit;
4179
4180 saTakerGetsFunded.setJson(jvOffer[jss::taker_gets_funded]);
4181 std::min(saTakerPays, multiply(saTakerGetsFunded, saDirRate, saTakerPays.issue()))
4182 .setJson(jvOffer[jss::taker_pays_funded]);
4183 }
4184
4185 STAmount saOwnerPays = (parityRate == offerRate)
4186 ? saTakerGetsFunded
4187 : std::min(saOwnerFunds, multiply(saTakerGetsFunded, offerRate));
4188
4189 umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
4190
4191 // Include all offers funded and unfunded
4192 Json::Value& jvOf = jvOffers.append(jvOffer);
4193 jvOf[jss::quality] = saDirRate.getText();
4194
4195 if (firstOwnerOffer)
4196 jvOf[jss::owner_funds] = saOwnerFunds.getText();
4197 }
4198 else
4199 {
4200 JLOG(m_journal.warn()) << "Missing offer";
4201 }
4202
4203 if (!cdirNext(view, uTipIndex, sleOfferDir, uBookEntry, offerIndex))
4204 {
4205 bDirectAdvance = true;
4206 }
4207 else
4208 {
4209 JLOG(m_journal.trace()) << "getBookPage: offerIndex=" << offerIndex;
4210 }
4211 }
4212 }
4213
4214 // jvResult[jss::marker] = Json::Value(Json::arrayValue);
4215 // jvResult[jss::nodes] = Json::Value(Json::arrayValue);
4216}
4217
4218#else
4219
4220// This is the new code that uses the book iterators
4221// It has temporarily been disabled
4222
4223void
4226 Book const& book,
4227 AccountID const& uTakerID,
4228 bool const bProof,
4229 unsigned int iLimit,
4230 Json::Value const& jvMarker,
4231 Json::Value& jvResult)
4232{
4233 auto& jvOffers = (jvResult[jss::offers] = Json::Value(Json::arrayValue));
4234
4236
4237 MetaView lesActive(lpLedger, tapNONE, true);
4238 OrderBookIterator obIterator(lesActive, book);
4239
4240 auto const rate = transferRate(lesActive, book.out.account);
4241
4242 bool const bGlobalFreeze = lesActive.isGlobalFrozen(book.out.account) || lesActive.isGlobalFrozen(book.in.account);
4243
4244 while (iLimit-- > 0 && obIterator.nextOffer())
4245 {
4246 SLE::pointer sleOffer = obIterator.getCurrentOffer();
4247 if (sleOffer)
4248 {
4249 auto const uOfferOwnerID = sleOffer->getAccountID(sfAccount);
4250 auto const& saTakerGets = sleOffer->getFieldAmount(sfTakerGets);
4251 auto const& saTakerPays = sleOffer->getFieldAmount(sfTakerPays);
4252 STAmount saDirRate = obIterator.getCurrentRate();
4253 STAmount saOwnerFunds;
4254
4255 if (book.out.account == uOfferOwnerID)
4256 {
4257 // If offer is selling issuer's own IOUs, it is fully funded.
4258 saOwnerFunds = saTakerGets;
4259 }
4260 else if (bGlobalFreeze)
4261 {
4262 // If either asset is globally frozen, consider all offers
4263 // that aren't ours to be totally unfunded
4264 saOwnerFunds.clear(book.out);
4265 }
4266 else
4267 {
4268 auto umBalanceEntry = umBalance.find(uOfferOwnerID);
4269
4270 if (umBalanceEntry != umBalance.end())
4271 {
4272 // Found in running balance table.
4273
4274 saOwnerFunds = umBalanceEntry->second;
4275 }
4276 else
4277 {
4278 // Did not find balance in table.
4279
4280 saOwnerFunds =
4281 lesActive.accountHolds(uOfferOwnerID, book.out.currency, book.out.account, fhZERO_IF_FROZEN);
4282
4283 if (saOwnerFunds.isNegative())
4284 {
4285 // Treat negative funds as zero.
4286
4287 saOwnerFunds.zero();
4288 }
4289 }
4290 }
4291
4292 Json::Value jvOffer = sleOffer->getJson(JsonOptions::none);
4293
4294 STAmount saTakerGetsFunded;
4295 STAmount saOwnerFundsLimit = saOwnerFunds;
4296 Rate offerRate = parityRate;
4297
4298 if (rate != parityRate
4299 // Have a transfer fee.
4300 && uTakerID != book.out.account
4301 // Not taking offers of own IOUs.
4302 && book.out.account != uOfferOwnerID)
4303 // Offer owner not issuing ownfunds
4304 {
4305 // Need to charge a transfer fee to offer owner.
4306 offerRate = rate;
4307 saOwnerFundsLimit = divide(saOwnerFunds, offerRate);
4308 }
4309
4310 if (saOwnerFundsLimit >= saTakerGets)
4311 {
4312 // Sufficient funds no shenanigans.
4313 saTakerGetsFunded = saTakerGets;
4314 }
4315 else
4316 {
4317 // Only provide, if not fully funded.
4318 saTakerGetsFunded = saOwnerFundsLimit;
4319
4320 saTakerGetsFunded.setJson(jvOffer[jss::taker_gets_funded]);
4321
4322 // TODO(tom): The result of this expression is not used - what's
4323 // going on here?
4324 std::min(saTakerPays, multiply(saTakerGetsFunded, saDirRate, saTakerPays.issue()))
4325 .setJson(jvOffer[jss::taker_pays_funded]);
4326 }
4327
4328 STAmount saOwnerPays = (parityRate == offerRate)
4329 ? saTakerGetsFunded
4330 : std::min(saOwnerFunds, multiply(saTakerGetsFunded, offerRate));
4331
4332 umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
4333
4334 if (!saOwnerFunds.isZero() || uOfferOwnerID == uTakerID)
4335 {
4336 // Only provide funded offers and offers of the taker.
4337 Json::Value& jvOf = jvOffers.append(jvOffer);
4338 jvOf[jss::quality] = saDirRate.getText();
4339 }
4340 }
4341 }
4342
4343 // jvResult[jss::marker] = Json::Value(Json::arrayValue);
4344 // jvResult[jss::nodes] = Json::Value(Json::arrayValue);
4345}
4346
4347#endif
4348
4349inline void
4351{
4352 auto [counters, mode, start, initialSync] = accounting_.getCounterData();
4353 auto const current =
4354 std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - start);
4355 counters[static_cast<std::size_t>(mode)].dur += current;
4356
4358 m_stats.disconnected_duration.set(counters[static_cast<std::size_t>(OperatingMode::DISCONNECTED)].dur.count());
4359 m_stats.connected_duration.set(counters[static_cast<std::size_t>(OperatingMode::CONNECTED)].dur.count());
4360 m_stats.syncing_duration.set(counters[static_cast<std::size_t>(OperatingMode::SYNCING)].dur.count());
4361 m_stats.tracking_duration.set(counters[static_cast<std::size_t>(OperatingMode::TRACKING)].dur.count());
4362 m_stats.full_duration.set(counters[static_cast<std::size_t>(OperatingMode::FULL)].dur.count());
4363
4364 m_stats.disconnected_transitions.set(counters[static_cast<std::size_t>(OperatingMode::DISCONNECTED)].transitions);
4365 m_stats.connected_transitions.set(counters[static_cast<std::size_t>(OperatingMode::CONNECTED)].transitions);
4366 m_stats.syncing_transitions.set(counters[static_cast<std::size_t>(OperatingMode::SYNCING)].transitions);
4367 m_stats.tracking_transitions.set(counters[static_cast<std::size_t>(OperatingMode::TRACKING)].transitions);
4368 m_stats.full_transitions.set(counters[static_cast<std::size_t>(OperatingMode::FULL)].transitions);
4369}
4370
4371void
4373{
4374 auto now = std::chrono::steady_clock::now();
4375
4376 std::lock_guard lock(mutex_);
4377 ++counters_[static_cast<std::size_t>(om)].transitions;
4378 if (om == OperatingMode::FULL && counters_[static_cast<std::size_t>(om)].transitions == 1)
4379 {
4380 initialSyncUs_ = std::chrono::duration_cast<std::chrono::microseconds>(now - processStart_).count();
4381 }
4382 counters_[static_cast<std::size_t>(mode_)].dur +=
4383 std::chrono::duration_cast<std::chrono::microseconds>(now - start_);
4384
4385 mode_ = om;
4386 start_ = now;
4387}
4388
4389void
4391{
4392 auto [counters, mode, start, initialSync] = getCounterData();
4393 auto const current =
4394 std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - start);
4395 counters[static_cast<std::size_t>(mode)].dur += current;
4396
4397 obj[jss::state_accounting] = Json::objectValue;
4399 i <= static_cast<std::size_t>(OperatingMode::FULL);
4400 ++i)
4401 {
4402 obj[jss::state_accounting][states_[i]] = Json::objectValue;
4403 auto& state = obj[jss::state_accounting][states_[i]];
4404 state[jss::transitions] = std::to_string(counters[i].transitions);
4405 state[jss::duration_us] = std::to_string(counters[i].dur.count());
4406 }
4407 obj[jss::server_state_duration_us] = std::to_string(current.count());
4408 if (initialSync)
4409 obj[jss::initial_sync_duration_us] = std::to_string(initialSync);
4410}
4411
4412//------------------------------------------------------------------------------
4413
4416 Application& app,
4418 bool standalone,
4419 std::size_t minPeerCount,
4420 bool startvalid,
4421 JobQueue& job_queue,
4423 ValidatorKeys const& validatorKeys,
4424 boost::asio::io_context& io_svc,
4425 beast::Journal journal,
4426 beast::insight::Collector::ptr const& collector)
4427{
4429 app,
4430 clock,
4431 standalone,
4432 minPeerCount,
4433 startvalid,
4434 job_queue,
4436 validatorKeys,
4437 io_svc,
4438 journal,
4439 collector);
4440}
4441
4442} // namespace xrpl
T any_of(T... args)
T back_inserter(T... args)
T begin(T... args)
Decorator for streaming out compact json.
Lightweight wrapper to tag static string.
Definition json_value.h:45
Represents a JSON value.
Definition json_value.h:131
Json::UInt UInt
Definition json_value.h:138
Value & append(Value const &value)
Append value to array at the end.
bool isMember(char const *key) const
Return true if the object has a member named key.
Value get(UInt index, Value const &defaultValue) const
If the array contains at least index+1 elements, returns the element value, otherwise returns default...
A generic endpoint for log messages.
Definition Journal.h:41
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
Stream warn() const
Definition Journal.h:313
A metric for measuring an integral value.
Definition Gauge.h:21
void set(value_type value) const
Set the value on the gauge.
Definition Gauge.h:49
A reference to a handler for performing polled collection.
Definition Hook.h:13
A transaction that is in a closed ledger.
TxMeta const & getMeta() const
boost::container::flat_set< AccountID > const & getAffected() const
std::shared_ptr< STTx const > const & getTxn() const
virtual std::optional< NetClock::time_point > firstUnsupportedExpected() const =0
virtual perf::PerfLog & getPerfLog()=0
virtual TaggedCache< uint256, AcceptedLedger > & getAcceptedLedgerCache()=0
virtual std::chrono::milliseconds getIOLatency()=0
virtual TxQ & getTxQ()=0
virtual Cluster & cluster()=0
virtual Config & config()=0
virtual InboundLedgers & getInboundLedgers()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual beast::Journal journal(std::string const &name)=0
virtual NodeStore::Database & getNodeStore()=0
virtual ServerHandler & getServerHandler()=0
virtual TimeKeeper & timeKeeper()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual OpenLedger & openLedger()=0
virtual AmendmentTable & getAmendmentTable()=0
virtual OrderBookDB & getOrderBookDB()=0
virtual Overlay & overlay()=0
virtual JobQueue & getJobQueue()=0
virtual ManifestCache & validatorManifests()=0
virtual RelationalDatabase & getRelationalDatabase()=0
virtual std::pair< PublicKey, SecretKey > const & nodeIdentity()=0
virtual ValidatorList & validators()=0
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
Specifies an order book.
Definition Book.h:17
Issue in
Definition Book.h:19
Issue out
Definition Book.h:20
Holds transactions which were deferred to the next pass of consensus.
The role of a ClosureCounter is to assist in shutdown by letting callers wait for the completion of c...
std::uint32_t getLoadFee() const
Definition ClusterNode.h:33
NetClock::time_point getReportTime() const
Definition ClusterNode.h:39
PublicKey const & identity() const
Definition ClusterNode.h:45
std::string const & name() const
Definition ClusterNode.h:27
std::size_t size() const
The number of nodes in the cluster list.
Definition Cluster.cpp:30
uint32_t NETWORK_ID
Definition Config.h:138
std::string SERVER_DOMAIN
Definition Config.h:259
int RELAY_UNTRUSTED_VALIDATIONS
Definition Config.h:151
static constexpr std::uint32_t FEE_UNITS_DEPRECATED
Definition Config.h:142
std::size_t NODE_SIZE
Definition Config.h:194
virtual Json::Value getInfo()=0
virtual void clearFailures()=0
std::shared_ptr< InfoSub > pointer
Definition InfoSub.h:35
Currency currency
Definition Issue.h:16
AccountID account
Definition Issue.h:17
A pool of threads to perform work.
Definition JobQueue.h:38
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:146
Json::Value getJson(int c=0)
Definition JobQueue.cpp:172
std::chrono::seconds getValidatedLedgerAge()
bool getValidatedRange(std::uint32_t &minVal, std::uint32_t &maxVal)
std::shared_ptr< Ledger const > getLedgerBySeq(std::uint32_t index)
bool haveValidated()
Whether we have ever fully validated a ledger.
std::size_t getFetchPackCacheSize() const
std::shared_ptr< Ledger const > getClosedLedger()
std::string getCompleteLedgers()
std::shared_ptr< Ledger const > getValidatedLedger()
std::shared_ptr< ReadView const > getPublishedLedger()
std::shared_ptr< ReadView const > getCurrentLedger()
Manages the current fee schedule.
std::uint32_t getClusterFee() const
std::uint32_t getLocalFee() const
std::uint32_t getRemoteFee() const
std::uint32_t getLoadFactor() const
std::uint32_t getLoadBase() const
Manages load sources.
Definition LoadManager.h:27
void heartbeat()
Reset the stall detection timer.
PublicKey getMasterKey(PublicKey const &pk) const
Returns ephemeral signing key's master public key.
Definition Manifest.cpp:285
State accounting records two attributes for each possible server state: 1) Amount of time spent in ea...
void json(Json::Value &obj) const
Output state counters in JSON format.
std::chrono::steady_clock::time_point const processStart_
static std::array< Json::StaticString const, 5 > const states_
std::array< Counters, 5 > counters_
void mode(OperatingMode om)
Record state transition.
std::chrono::steady_clock::time_point start_
Transaction with input flags and results to be applied in batches.
std::shared_ptr< Transaction > const transaction
TransactionStatus(std::shared_ptr< Transaction > t, bool a, bool l, FailHard f)
std::string getHostId(bool forAdmin)
void reportConsensusStateChange(ConsensusPhase phase)
void addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
void clearNeedNetworkLedger() override
ServerFeeSummary mLastFeeSummary
Json::Value getOwnerInfo(std::shared_ptr< ReadView const > lpLedger, AccountID const &account) override
DispatchState mDispatchState
std::size_t const minPeerCount_
beast::Journal m_journal
static std::array< char const *, 5 > const states_
std::set< uint256 > pendingValidations_
void pubAccountTransaction(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &transaction, bool last)
ClosureCounter< void, boost::system::error_code const & > waitHandlerCounter_
MultiApiJson transJson(std::shared_ptr< STTx const > const &transaction, TER result, bool validated, std::shared_ptr< ReadView const > const &ledger, std::optional< std::reference_wrapper< TxMeta const > > meta)
bool unsubManifests(std::uint64_t uListener) override
void pubPeerStatus(std::function< Json::Value(void)> const &) override
void unsubAccount(InfoSub::ref ispListener, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
bool subManifests(InfoSub::ref ispListener) override
void stateAccounting(Json::Value &obj) override
void pubLedger(std::shared_ptr< ReadView const > const &lpAccepted) override
void stop() override
SubInfoMapType mSubRTAccount
void subAccount(InfoSub::ref ispListener, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
void transactionBatch()
Apply transactions in batches.
bool unsubRTTransactions(std::uint64_t uListener) override
void getBookPage(std::shared_ptr< ReadView const > &lpLedger, Book const &, AccountID const &uTakerID, bool const bProof, unsigned int iLimit, Json::Value const &jvMarker, Json::Value &jvResult) override
bool processTrustedProposal(RCLCxPeerPos proposal) override
error_code_i subAccountHistory(InfoSub::ref ispListener, AccountID const &account) override
subscribe an account's new transactions and retrieve the account's historical transactions
void subAccountHistoryStart(std::shared_ptr< ReadView const > const &ledger, SubAccountHistoryInfoWeak &subInfo)
void pubValidation(std::shared_ptr< STValidation > const &val) override
bool subBook(InfoSub::ref ispListener, Book const &) override
InfoSub::pointer addRpcSub(std::string const &strUrl, InfoSub::ref) override
void endConsensus(std::unique_ptr< std::stringstream > const &clog) override
std::atomic< OperatingMode > mMode
void setMode(OperatingMode om) override
void setAmendmentBlocked() override
void pubConsensus(ConsensusPhase phase)
bool const m_standalone
std::recursive_mutex mSubLock
bool isNeedNetworkLedger() override
DispatchState
Synchronization states for transaction batches.
std::atomic< bool > needNetworkLedger_
boost::asio::steady_timer heartbeatTimer_
bool subConsensus(InfoSub::ref ispListener) override
bool unsubBook(std::uint64_t uListener, Book const &) override
bool unsubLedger(std::uint64_t uListener) override
bool checkLastClosedLedger(Overlay::PeerSequence const &, uint256 &networkClosed)
void pubProposedAccountTransaction(std::shared_ptr< ReadView const > const &ledger, std::shared_ptr< STTx const > const &transaction, TER result)
void unsubAccountHistoryInternal(std::uint64_t seq, AccountID const &account, bool historyOnly) override
void pubValidatedTransaction(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &transaction, bool last)
void switchLastClosedLedger(std::shared_ptr< Ledger const > const &newLCL)
std::optional< PublicKey > const validatorPK_
std::atomic< bool > amendmentBlocked_
bool isFull() override
void clearAmendmentWarned() override
void updateLocalTx(ReadView const &view) override
void clearLedgerFetch() override
void apply(std::unique_lock< std::mutex > &batchLock)
Attempt to apply transactions and post-process based on the results.
InfoSub::pointer findRpcSub(std::string const &strUrl) override
bool isAmendmentBlocked() override
std::string strOperatingMode(OperatingMode const mode, bool const admin) const override
std::unique_ptr< LocalTxs > m_localTX
void setStandAlone() override
void setNeedNetworkLedger() override
bool subServer(InfoSub::ref ispListener, Json::Value &jvResult, bool admin) override
void setTimer(boost::asio::steady_timer &timer, std::chrono::milliseconds const &expiry_time, std::function< void()> onExpire, std::function< void()> onError)
bool unsubServer(std::uint64_t uListener) override
SubAccountHistoryMapType mSubAccountHistory
bool unsubConsensus(std::uint64_t uListener) override
std::condition_variable mCond
void pubManifest(Manifest const &) override
RCLConsensus mConsensus
void consensusViewChange() override
boost::asio::steady_timer accountHistoryTxTimer_
Json::Value getConsensusInfo() override
bool recvValidation(std::shared_ptr< STValidation > const &val, std::string const &source) override
void setUNLBlocked() override
bool unsubValidations(std::uint64_t uListener) override
bool subPeerStatus(InfoSub::ref ispListener) override
void doTransactionAsync(std::shared_ptr< Transaction > transaction, bool bUnlimited, FailHard failtype)
For transactions not submitted by a locally connected client, fire and forget.
ConsensusPhase mLastConsensusPhase
OperatingMode getOperatingMode() const override
std::optional< PublicKey > const validatorMasterPK_
void doTransactionSyncBatch(std::unique_lock< std::mutex > &lock, std::function< bool(std::unique_lock< std::mutex > const &)> retryCallback)
NetworkOPsImp(Application &app, NetworkOPs::clock_type &clock, bool standalone, std::size_t minPeerCount, bool start_valid, JobQueue &job_queue, LedgerMaster &ledgerMaster, ValidatorKeys const &validatorKeys, boost::asio::io_context &io_svc, beast::Journal journal, beast::insight::Collector::ptr const &collector)
std::array< SubMapType, SubTypes::sLastEntry > mStreamMaps
std::vector< TransactionStatus > mTransactions
bool tryRemoveRpcSub(std::string const &strUrl) override
bool beginConsensus(uint256 const &networkClosed, std::unique_ptr< std::stringstream > const &clog) override
void doTransactionSync(std::shared_ptr< Transaction > transaction, bool bUnlimited, FailHard failType)
For transactions submitted directly by a client, apply batch of transactions and wait for this transa...
void submitTransaction(std::shared_ptr< STTx const > const &) override
void setAmendmentWarned() override
LedgerMaster & m_ledgerMaster
Json::Value getServerInfo(bool human, bool admin, bool counters) override
StateAccounting accounting_
bool subValidations(InfoSub::ref ispListener) override
void setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
bool subRTTransactions(InfoSub::ref ispListener) override
std::atomic< bool > unlBlocked_
subRpcMapType mRpcSubMap
bool unsubBookChanges(std::uint64_t uListener) override
void unsubAccountHistory(InfoSub::ref ispListener, AccountID const &account, bool historyOnly) override
unsubscribe an account's transactions
void setStateTimer() override
Called to initially start our timers.
std::size_t getLocalTxCount() override
bool preProcessTransaction(std::shared_ptr< Transaction > &transaction)
void processTransaction(std::shared_ptr< Transaction > &transaction, bool bUnlimited, bool bLocal, FailHard failType) override
Process transactions as they arrive from the network or which are submitted by clients.
bool unsubTransactions(std::uint64_t uListener) override
bool isAmendmentWarned() override
bool subTransactions(InfoSub::ref ispListener) override
std::mutex m_statsMutex
std::mutex validationsMutex_
std::uint32_t acceptLedger(std::optional< std::chrono::milliseconds > consensusDelay) override
Accepts the current transaction tree, return the new ledger's sequence.
SubInfoMapType mSubAccount
void clearUNLBlocked() override
bool isUNLBlocked() override
void pubProposedTransaction(std::shared_ptr< ReadView const > const &ledger, std::shared_ptr< STTx const > const &transaction, TER result) override
std::atomic< bool > amendmentWarned_
boost::asio::steady_timer clusterTimer_
bool unsubPeerStatus(std::uint64_t uListener) override
void reportFeeChange() override
void processTransactionSet(CanonicalTXSet const &set) override
Process a set of transactions synchronously, and ensuring that they are processed in one batch.
void mapComplete(std::shared_ptr< SHAMap > const &map, bool fromAcquire) override
bool subLedger(InfoSub::ref ispListener, Json::Value &jvResult) override
bool isBlocked() override
~NetworkOPsImp() override
Application & app_
Json::Value getLedgerFetchInfo() override
void unsubAccountInternal(std::uint64_t seq, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
bool subBookChanges(InfoSub::ref ispListener) override
Provides server functionality for clients.
Definition NetworkOPs.h:70
void getCountsJson(Json::Value &obj)
Definition Database.cpp:225
std::shared_ptr< OpenView const > current() const
Returns a view to the current open ledger.
Writable ledger view that accumulates state and tx changes.
Definition OpenView.h:46
BookListeners::pointer getBookListeners(Book const &)
void processTxn(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &alTx, MultiApiJson const &jvObj)
BookListeners::pointer makeBookListeners(Book const &)
virtual std::uint64_t getPeerDisconnect() const =0
virtual std::optional< std::uint32_t > networkID() const =0
Returns the ID of the network this server is configured for, if any.
virtual std::uint64_t getPeerDisconnectCharges() const =0
virtual std::uint64_t getJqTransOverflow() const =0
virtual std::size_t size() const =0
Returns the number of active peers.
Manages the generic consensus algorithm for use by the RCL.
std::size_t prevProposers() const
Get the number of proposing peers that participated in the previous round.
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
Json::Value getJson(bool full) const
std::chrono::milliseconds prevRoundTime() const
Get duration of the previous round.
A peer's signed, proposed position for use in RCLConsensus.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
Represents a set of transactions in RCLConsensus.
Definition RCLCxTx.h:44
Wraps a ledger instance for use in generic Validations LedgerTrie.
static std::string getWordFromBlob(void const *blob, size_t bytes)
Chooses a single dictionary word from the data.
Definition RFC1751.cpp:396
Collects logging information.
std::unique_ptr< std::stringstream > const & ss()
A view into a ledger.
Definition ReadView.h:32
virtual std::optional< key_type > succ(key_type const &key, std::optional< key_type > const &last=std::nullopt) const =0
Return the key of the next state item.
virtual std::shared_ptr< SLE const > read(Keylet const &k) const =0
Return the state item associated with a key.
Issue const & issue() const
Definition STAmount.h:455
std::string getText() const override
Definition STAmount.cpp:639
void setJson(Json::Value &) const
Definition STAmount.cpp:599
std::optional< T > get(std::string const &name) const
std::size_t size() const noexcept
Definition Serializer.h:51
void const * data() const noexcept
Definition Serializer.h:57
void setup(Setup const &setup, beast::Journal journal)
time_point now() const override
Returns the current time, using the server's clock.
Definition TimeKeeper.h:44
std::chrono::seconds closeOffset() const
Definition TimeKeeper.h:63
time_point closeTime() const
Returns the predicted close time, in network time.
Definition TimeKeeper.h:56
Metrics getMetrics(OpenView const &view) const
Returns fee metrics in reference fee level units.
Definition TxQ.cpp:1603
static time_point now()
Validator keys and manifest as set in configuration file.
std::optional< PublicKey > localPublicKey() const
This function returns the local validator public key or a std::nullopt.
std::size_t quorum() const
Get quorum value for current trusted key set.
std::optional< TimeKeeper::time_point > expires() const
Return the time when the validator list will expire.
std::size_t count() const
Return the number of configured validator list sites.
std::optional< PublicKey > getTrustedKey(PublicKey const &identity) const
Returns master public key if public key is trusted.
Json::Value jsonClipped() const
Definition XRPAmount.h:197
constexpr double decimalXRP() const
Definition XRPAmount.h:241
iterator begin()
Definition base_uint.h:113
bool isZero() const
Definition base_uint.h:509
static constexpr std::size_t size()
Definition base_uint.h:495
bool isNonZero() const
Definition base_uint.h:514
virtual Json::Value currentJson() const =0
Render currently executing jobs and RPC calls and durations in Json.
virtual Json::Value countersJson() const =0
Render performance counters in Json.
Automatically unlocks and re-locks a unique_lock object.
Definition scope.h:198
T clear(T... args)
T emplace_back(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T get(T... args)
T insert(T... args)
T is_same_v
T is_sorted(T... args)
T lock(T... args)
T make_pair(T... args)
T max(T... args)
T min(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:26
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:27
int Int
unsigned int UInt
void rngfill(void *const buffer, std::size_t const bytes, Generator &g)
Definition rngfill.h:15
STL namespace.
std::string const & getVersionString()
Server version.
Definition BuildInfo.cpp:51
std::optional< std::string > encodeCTID(uint32_t ledgerSeq, uint32_t txnIndex, uint32_t networkID) noexcept
Encodes ledger sequence, transaction index, and network ID into a CTID string.
Definition CTID.h:34
Json::Value computeBookChanges(std::shared_ptr< L const > const &lpAccepted)
Definition BookChanges.h:28
void insertMPTokenIssuanceID(Json::Value &response, std::shared_ptr< STTx const > const &transaction, TxMeta const &transactionMeta)
void insertDeliveredAmount(Json::Value &meta, ReadView const &, std::shared_ptr< STTx const > const &serializedTx, TxMeta const &)
Add a delivered_amount field to the meta input/output parameter.
void insertNFTSyntheticInJson(Json::Value &, std::shared_ptr< STTx const > const &, TxMeta const &)
Adds common synthetic fields to transaction-related JSON responses.
Charge const feeMediumBurdenRPC
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
Keylet offer(AccountID const &id, std::uint32_t seq) noexcept
An offer from an account.
Definition Indexes.cpp:235
Keylet account(AccountID const &id) noexcept
AccountID root.
Definition Indexes.cpp:160
Keylet page(uint256 const &root, std::uint64_t index=0) noexcept
A page in a directory.
Definition Indexes.cpp:331
Rate rate(Env &env, Account const &account, std::uint32_t const &seq)
Definition escrow.cpp:50
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
std::unique_ptr< FeeVote > make_FeeVote(FeeSetup const &setup, beast::Journal journal)
Create an instance of the FeeVote logic.
STAmount divide(STAmount const &amount, Rate const &rate)
Definition Rate2.cpp:69
@ terQUEUED
Definition TER.h:206
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
bool isTerRetry(TER x) noexcept
Definition TER.h:644
@ fhZERO_IF_FROZEN
Definition View.h:59
@ fhIGNORE_FREEZE
Definition View.h:59
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
std::optional< std::uint64_t > mulDiv(std::uint64_t value, std::uint64_t mul, std::uint64_t div)
Return value*mul/div accurately.
Json::Value getJson(LedgerFill const &fill)
Return a new Json::Value representing the ledger with given options.
@ INVALID
Definition Transaction.h:29
@ OBSOLETE
Definition Transaction.h:35
@ INCLUDED
Definition Transaction.h:30
constexpr std::uint32_t tfInnerBatchTxn
Definition TxFlags.h:42
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:598
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:11
Rules makeRulesGivenLedger(DigestAwareReadView const &ledger, Rules const &current)
Definition ReadView.cpp:50
@ tefPAST_SEQ
Definition TER.h:156
std::uint64_t getQuality(uint256 const &uBase)
Definition Indexes.cpp:126
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition AccountID.cpp:92
FeeSetup setup_FeeVote(Section const &section)
Definition Config.cpp:1022
STAmount accountHolds(ReadView const &view, AccountID const &account, Currency const &currency, AccountID const &issuer, FreezeHandling zeroIfFrozen, beast::Journal j, SpendableHandling includeFullBalance=shSIMPLE_BALANCE)
Definition View.cpp:392
Number root(Number f, unsigned d)
Definition Number.cpp:938
std::unique_ptr< NetworkOPs > make_NetworkOPs(Application &app, NetworkOPs::clock_type &clock, bool standalone, std::size_t minPeerCount, bool startvalid, JobQueue &job_queue, LedgerMaster &ledgerMaster, ValidatorKeys const &validatorKeys, boost::asio::io_context &io_svc, beast::Journal journal, beast::insight::Collector::ptr const &collector)
bool transResultInfo(TER code, std::string &token, std::string &text)
Definition TER.cpp:228
bool cdirNext(ReadView const &view, uint256 const &root, std::shared_ptr< SLE const > &page, unsigned int &index, uint256 &entry)
Returns the next entry in the directory, advancing the index.
Definition View.cpp:112
STAmount multiply(STAmount const &amount, Rate const &rate)
Definition Rate2.cpp:34
static auto const genesisAccountId
Seed generateSeed(std::string const &passPhrase)
Generate a seed deterministically.
Definition Seed.cpp:57
bool cdirFirst(ReadView const &view, uint256 const &root, std::shared_ptr< SLE const > &page, unsigned int &index, uint256 &entry)
Returns the first entry in the directory, advancing the index.
Definition View.cpp:101
std::pair< PublicKey, SecretKey > generateKeyPair(KeyType type, Seed const &seed)
Generate a key pair deterministically.
@ current
This was a new validation and was added.
constexpr std::size_t maxPoppedTransactions
STAmount accountFunds(ReadView const &view, AccountID const &id, STAmount const &saDefault, FreezeHandling freezeHandling, beast::Journal j)
Definition View.cpp:515
bool isGlobalFrozen(ReadView const &view, AccountID const &issuer)
Definition View.cpp:138
STAmount amountFromQuality(std::uint64_t rate)
Definition STAmount.cpp:927
@ jtNETOP_CLUSTER
Definition Job.h:55
@ jtCLIENT_CONSENSUS
Definition Job.h:28
@ jtTXN_PROC
Definition Job.h:62
@ jtCLIENT_ACCT_HIST
Definition Job.h:29
@ jtTRANSACTION
Definition Job.h:42
@ jtCLIENT_FEE_CHANGE
Definition Job.h:27
@ jtBATCH
Definition Job.h:45
bool isTefFailure(TER x) noexcept
Definition TER.h:638
Rate transferRate(ReadView const &view, AccountID const &issuer)
Returns IOU issuer transfer fee as Rate.
Definition View.cpp:699
auto constexpr muldiv_max
Definition mulDiv.h:9
uint256 getQualityNext(uint256 const &uBase)
Definition Indexes.cpp:119
ConsensusPhase
Phases of consensus for a single ledger round.
send_if_pred< Predicate > send_if(std::shared_ptr< Message > const &m, Predicate const &f)
Helper function to aid in type deduction.
Definition predicates.h:55
void forAllApiVersions(Fn const &fn, Args &&... args)
Definition ApiVersion.h:145
AccountID calcAccountID(PublicKey const &pk)
uint256 getBookBase(Book const &book)
Definition Indexes.cpp:98
Json::Value rpcError(error_code_i iError)
Definition RPCErr.cpp:12
std::string to_string_iso(date::sys_time< Duration > tp)
Definition chrono.h:68
std::unique_ptr< LocalTxs > make_LocalTxs()
Definition LocalTxs.cpp:170
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
Definition apply.cpp:21
ApplyFlags
Definition ApplyView.h:11
@ tapNONE
Definition ApplyView.h:12
@ tapFAIL_HARD
Definition ApplyView.h:16
@ tapUNLIMITED
Definition ApplyView.h:23
bool isTelLocal(TER x) noexcept
Definition TER.h:626
@ ledgerMaster
ledger master data for signing
@ proposal
proposal for signing
@ temINVALID_FLAG
Definition TER.h:92
@ temBAD_SIGNATURE
Definition TER.h:86
bool isTesSuccess(TER x) noexcept
Definition TER.h:650
static std::uint32_t trunc32(std::uint64_t v)
static std::array< char const *, 5 > const stateNames
void handleNewValidation(Application &app, std::shared_ptr< STValidation > const &val, std::string const &source, BypassAccept const bypassAccept, std::optional< beast::Journal > j)
Handle a new validation.
OperatingMode
Specifies the mode under which the server believes it's operating.
Definition NetworkOPs.h:49
@ TRACKING
convinced we agree with the network
@ DISCONNECTED
not ready to process requests
@ CONNECTED
convinced we are talking to the network
@ FULL
we have the ledger and can even validate
@ SYNCING
fallen slightly behind
std::shared_ptr< STTx const > sterilize(STTx const &stx)
Sterilize a transaction.
Definition STTx.cpp:767
bool isTemMalformed(TER x) noexcept
Definition TER.h:632
@ tesSUCCESS
Definition TER.h:226
error_code_i
Definition ErrorCodes.h:21
@ rpcINTERNAL
Definition ErrorCodes.h:111
@ rpcINVALID_PARAMS
Definition ErrorCodes.h:65
@ rpcSUCCESS
Definition ErrorCodes.h:25
Rate const parityRate
A transfer rate signifying a 1:1 exchange.
@ warnRPC_AMENDMENT_BLOCKED
Definition ErrorCodes.h:154
@ warnRPC_UNSUPPORTED_MAJORITY
Definition ErrorCodes.h:153
@ warnRPC_EXPIRED_VALIDATOR_LIST
Definition ErrorCodes.h:155
T owns_lock(T... args)
T ref(T... args)
T reserve(T... args)
T reset(T... args)
T set_intersection(T... args)
T size(T... args)
T str(T... args)
PublicKey masterKey
The master key associated with this manifest.
Definition Manifest.h:67
std::string serialized
The manifest in serialized form.
Definition Manifest.h:64
Blob getMasterSignature() const
Returns manifest master key signature.
Definition Manifest.cpp:221
std::string domain
The domain, if one was specified in the manifest; empty otherwise.
Definition Manifest.h:79
std::optional< Blob > getSignature() const
Returns manifest signature.
Definition Manifest.cpp:210
std::optional< PublicKey > signingKey
The ephemeral key associated with this manifest.
Definition Manifest.h:73
std::uint32_t sequence
The sequence number of this manifest.
Definition Manifest.h:76
Server fees published on server subscription.
std::optional< TxQ::Metrics > em
bool operator!=(ServerFeeSummary const &b) const
bool operator==(ServerFeeSummary const &b) const
beast::insight::Gauge syncing_duration
beast::insight::Gauge tracking_duration
beast::insight::Gauge connected_duration
beast::insight::Gauge tracking_transitions
Stats(Handler const &handler, beast::insight::Collector::ptr const &collector)
beast::insight::Gauge connected_transitions
beast::insight::Gauge full_transitions
beast::insight::Gauge disconnected_duration
beast::insight::Gauge syncing_transitions
beast::insight::Gauge disconnected_transitions
beast::insight::Gauge full_duration
beast::insight::Hook hook
SubAccountHistoryIndex(AccountID const &accountId)
std::shared_ptr< SubAccountHistoryIndex > index_
std::shared_ptr< SubAccountHistoryIndex > index_
Represents a transfer rate.
Definition Rate.h:21
Data format for exchanging consumption information across peers.
Definition Gossip.h:13
std::vector< Item > items
Definition Gossip.h:25
Changes in trusted nodes after updating validator list.
hash_set< NodeID > added
hash_set< NodeID > removed
Structure returned by TxQ::getMetrics, expressed in reference fee level units.
Definition TxQ.h:146
void set(char const *key, auto const &v)
IsMemberResult isMember(char const *key) const
Select all peers (except optional excluded) that are in our cluster.
Definition predicates.h:116
Sends a message to all peers.
Definition predicates.h:13
T swap(T... args)
T time_since_epoch(T... args)
T to_string(T... args)
T unlock(T... args)
T value_or(T... args)
T what(T... args)