From f1377d5d309d34f9d94d48b2e91422da72b41fb8 Mon Sep 17 00:00:00 2001 From: Brad Chase Date: Tue, 7 Feb 2017 10:21:28 -0500 Subject: [PATCH] Publish server stream when fee changes (RIPD-1406): Resolves #1991 Publish a server status update after every ledger close or open ledger update if there is a change in fees. --- .../app/ledger/impl/LedgerConsensusImp.cpp | 3 + src/ripple/app/misc/NetworkOPs.cpp | 127 +++++++++++-- src/test/app/TxQ_test.cpp | 170 ++++++++++++++++++ 3 files changed, 282 insertions(+), 18 deletions(-) diff --git a/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp b/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp index 49f88f3573..88a153cdc9 100644 --- a/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp +++ b/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp @@ -1009,6 +1009,9 @@ void LedgerConsensusImp::accept (TxSet_t const& set) // Stuff the ledger with transactions from the queue. return app_.getTxQ().accept(app_, view); }); + // Signal a potential fee change to subscribers after the open ledger + // is created + app_.getOPs().reportFeeChange(); } ledgerMaster_.switchLCL (sharedLCL); diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 15e351fdd4..2829bd2657 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -180,6 +180,30 @@ class NetworkOPsImp final Json::Value json() const; }; + //! Server fees published on `server` subscription + struct ServerFeeSummary + { + ServerFeeSummary() = default; + + ServerFeeSummary(std::uint64_t fee, + boost::optional&& escalationMetrics, + LoadFeeTrack const & loadFeeTrack); + bool + operator !=(ServerFeeSummary const & b) const; + + bool + operator ==(ServerFeeSummary const & b) const + { + return !(*this != b); + } + + std::uint32_t loadFactorServer = 256; + std::uint32_t loadBaseServer = 256; + std::uint64_t baseFee = 10; + boost::optional em = boost::none; + }; + + public: // VFALCO TODO Make LedgerMaster a SharedPtr or a reference. // @@ -202,8 +226,6 @@ public: , mLedgerConsensus (mConsensus->makeLedgerConsensus ( app, app.getInboundTransactions(), ledgerMaster, *m_localTX)) , m_ledgerMaster (ledgerMaster) - , mLastLoadBase (256) - , mLastLoadFactor (256) , m_job_queue (job_queue) , m_standalone (standalone) , m_network_quorum (start_valid ? 0 : network_quorum) @@ -557,8 +579,8 @@ private: SubMapType mSubValidations; // Received validations. SubMapType mSubPeerStatus; // peer status changes - std::uint32_t mLastLoadBase; - std::uint32_t mLastLoadFactor; + ServerFeeSummary mLastFeeSummary; + JobQueue& m_job_queue; @@ -952,6 +974,7 @@ void NetworkOPsImp::apply (std::unique_lock& batchLock) { auto lock = make_lock(app_.getMasterMutex()); + bool changed = false; { std::lock_guard lock ( m_ledgerMaster.peekMutex()); @@ -959,7 +982,6 @@ void NetworkOPsImp::apply (std::unique_lock& batchLock) app_.openLedger().modify( [&](OpenView& view, beast::Journal j) { - bool changed = false; for (TransactionStatus& e : transactions) { // we check before addingto the batch @@ -977,6 +999,8 @@ void NetworkOPsImp::apply (std::unique_lock& batchLock) return changed; }); } + if (changed) + reportFeeChange(); auto newOL = app_.openLedger().current(); for (TransactionStatus& e : transactions) @@ -1601,6 +1625,38 @@ void NetworkOPsImp::pubManifest (Manifest const& mo) } } +NetworkOPsImp::ServerFeeSummary::ServerFeeSummary( + std::uint64_t fee, + boost::optional&& escalationMetrics, + LoadFeeTrack const & loadFeeTrack) + : loadFactorServer{loadFeeTrack.getLoadFactor()} + , loadBaseServer{loadFeeTrack.getLoadBase()} + , baseFee{fee} + , em{std::move(escalationMetrics)} +{ + +} + + +bool +NetworkOPsImp::ServerFeeSummary::operator !=(NetworkOPsImp::ServerFeeSummary const & b) const +{ + if(loadFactorServer != b.loadFactorServer || + loadBaseServer != b.loadBaseServer || + baseFee != b.baseFee || + em.is_initialized() != b.em.is_initialized()) + return true; + + if(em) + { + return (em->minFeeLevel != b.em->minFeeLevel || + em->expFeeLevel != b.em->expFeeLevel || + em->referenceFeeLevel != b.em->referenceFeeLevel); + } + + return false; +} + void NetworkOPsImp::pubServer () { // VFALCO TODO Don't hold the lock across calls to send...make a copy of the @@ -1612,14 +1668,45 @@ void NetworkOPsImp::pubServer () if (!mSubServer.empty ()) { Json::Value jvObj (Json::objectValue); - auto const& feeTrack = app_.getFeeTrack(); - jvObj [jss::type] = "serverStatus"; + ServerFeeSummary f{app_.openLedger().current()->fees().base, + app_.getTxQ().getMetrics(*app_.openLedger().current()), + app_.getFeeTrack()}; + + // Need to cap to uint64 to uint32 due to JSON limitations + auto clamp = [](std::uint64_t v) + { + constexpr std::uint64_t max32 = + std::numeric_limits::max(); + + return static_cast(std::min(max32, v)); + }; + + + jvObj [jss::type] = "serverStatus"; jvObj [jss::server_status] = strOperatingMode (); - jvObj [jss::load_base] = - (mLastLoadBase = feeTrack.getLoadBase ()); - jvObj [jss::load_factor] = - (mLastLoadFactor = feeTrack.getLoadFactor ()); + jvObj [jss::load_base] = f.loadBaseServer; + jvObj [jss::load_factor_server] = f.loadFactorServer; + jvObj [jss::base_fee] = clamp(f.baseFee); + + if(f.em) + { + auto const loadFactor = + std::max(static_cast(f.loadFactorServer), + mulDiv(f.em->expFeeLevel, f.loadBaseServer, + f.em->referenceFeeLevel).second); + + jvObj [jss::load_factor] = clamp(loadFactor); + jvObj [jss::load_factor_fee_escalation] = clamp(f.em->expFeeLevel); + jvObj [jss::load_factor_fee_queue] = clamp(f.em->minFeeLevel); + jvObj [jss::load_factor_fee_reference] + = clamp(f.em->referenceFeeLevel); + + } + else + jvObj [jss::load_factor] = f.loadFactorServer; + + mLastFeeSummary = f; for (auto i = mSubServer.begin (); i != mSubServer.end (); ) { @@ -2108,6 +2195,7 @@ Json::Value NetworkOPsImp::getServerInfo (bool human, bool admin) constexpr std::uint64_t max32 = std::numeric_limits::max(); + auto const loadFactorServer = app_.getFeeTrack().getLoadFactor(); auto const loadBaseServer = app_.getFeeTrack().getLoadBase(); auto const loadFactorFeeEscalation = escalationMetrics ? @@ -2367,14 +2455,17 @@ void NetworkOPsImp::pubLedger ( void NetworkOPsImp::reportFeeChange () { - auto const& feeTrack = app_.getFeeTrack(); - if ((feeTrack.getLoadBase () == mLastLoadBase) && - (feeTrack.getLoadFactor () == mLastLoadFactor)) - return; + ServerFeeSummary f{app_.openLedger().current()->fees().base, + app_.getTxQ().getMetrics(*app_.openLedger().current()), + app_.getFeeTrack()}; - m_job_queue.addJob ( - jtCLIENT, "reportFeeChange->pubServer", - [this] (Job&) { pubServer(); }); + + // only schedule the job if something has changed + if (f != mLastFeeSummary) + { + m_job_queue.addJob ( jtCLIENT, "reportFeeChange->pubServer", + [this] (Job&) { pubServer(); }); + } } // This routine should only be used to publish accepted or validated diff --git a/src/test/app/TxQ_test.cpp b/src/test/app/TxQ_test.cpp index bc0b6098e7..14e84fa383 100644 --- a/src/test/app/TxQ_test.cpp +++ b/src/test/app/TxQ_test.cpp @@ -32,6 +32,7 @@ #include #include #include +#include namespace ripple { namespace test { @@ -2416,6 +2417,174 @@ public: } } + void testServerSubscribe() + { + using namespace jtx; + + Env env(*this, makeConfig({ { "minimum_txn_in_ledger_standalone", "3" } }), + features(featureFeeEscalation)); + + Json::Value stream; + stream[jss::streams] = Json::arrayValue; + stream[jss::streams].append("server"); + auto wsc = makeWSClient(env.app().config()); + { + auto jv = wsc->invoke("subscribe", stream); + BEAST_EXPECT(jv[jss::status] == "success"); + } + + Account a{"a"}, b{"b"}, c{"c"}, d{"d"}, e{"e"}, f{"f"}, + g{"g"}, h{"h"}, i{"i"}; + + + // Fund the first few accounts at non escalated fee + env.fund(XRP(50000), noripple(a,b,c,d)); + checkMetrics(env, 0, boost::none, 4, 3, 256); + + // First transaction establishes the messaging + BEAST_EXPECT(wsc->findMsg(5s, + [&](auto const& jv) + { + return jv[jss::type] == "serverStatus" && + jv.isMember(jss::load_factor) && + jv[jss::load_factor] == 256 && + jv.isMember(jss::load_base) && + jv[jss::load_base] == 256 && + jv.isMember(jss::load_factor_server) && + jv[jss::load_factor_server] == 256 && + jv.isMember(jss::load_factor_fee_escalation) && + jv[jss::load_factor_fee_escalation] == 256 && + jv.isMember(jss::load_factor_fee_queue) && + jv[jss::load_factor_fee_queue] == 256 && + jv.isMember(jss::load_factor_fee_reference) && + jv[jss::load_factor_fee_reference] == 256; + })); + // Last transaction escalates the fee + BEAST_EXPECT(wsc->findMsg(5s, + [&](auto const& jv) + { + return jv[jss::type] == "serverStatus" && + jv.isMember(jss::load_factor) && + jv[jss::load_factor] == 227555 && + jv.isMember(jss::load_base) && + jv[jss::load_base] == 256 && + jv.isMember(jss::load_factor_server) && + jv[jss::load_factor_server] == 256 && + jv.isMember(jss::load_factor_fee_escalation) && + jv[jss::load_factor_fee_escalation] == 227555 && + jv.isMember(jss::load_factor_fee_queue) && + jv[jss::load_factor_fee_queue] == 256 && + jv.isMember(jss::load_factor_fee_reference) && + jv[jss::load_factor_fee_reference] == 256; + })); + + env.close(); + + // Closing ledger should publish a status update + BEAST_EXPECT(wsc->findMsg(5s, + [&](auto const& jv) + { + return jv[jss::type] == "serverStatus" && + jv.isMember(jss::load_factor) && + jv[jss::load_factor] == 256 && + jv.isMember(jss::load_base) && + jv[jss::load_base] == 256 && + jv.isMember(jss::load_factor_server) && + jv[jss::load_factor_server] == 256 && + jv.isMember(jss::load_factor_fee_escalation) && + jv[jss::load_factor_fee_escalation] == 256 && + jv.isMember(jss::load_factor_fee_queue) && + jv[jss::load_factor_fee_queue] == 256 && + jv.isMember(jss::load_factor_fee_reference) && + jv[jss::load_factor_fee_reference] == 256; + })); + + checkMetrics(env, 0, 8, 0, 4, 256); + + // Fund then next few accounts at non escalated fee + env.fund(XRP(50000), noripple(e,f,g,h,i)); + + // Extra transactions with low fee are queued + auto queued = ter(terQUEUED); + env(noop(a), fee(10), queued); + env(noop(b), fee(10), queued); + env(noop(c), fee(10), queued); + env(noop(d), fee(10), queued); + env(noop(e), fee(10), queued); + env(noop(f), fee(10), queued); + env(noop(g), fee(10), queued); + checkMetrics(env, 7, 8, 5, 4, 256); + + // Last transaction escalates the fee + BEAST_EXPECT(wsc->findMsg(5s, + [&](auto const& jv) + { + return jv[jss::type] == "serverStatus" && + jv.isMember(jss::load_factor) && + jv[jss::load_factor] == 200000 && + jv.isMember(jss::load_base) && + jv[jss::load_base] == 256 && + jv.isMember(jss::load_factor_server) && + jv[jss::load_factor_server] == 256 && + jv.isMember(jss::load_factor_fee_escalation) && + jv[jss::load_factor_fee_escalation] == 200000 && + jv.isMember(jss::load_factor_fee_queue) && + jv[jss::load_factor_fee_queue] == 256 && + jv.isMember(jss::load_factor_fee_reference) && + jv[jss::load_factor_fee_reference] == 256; + })); + + env.close(); + // Ledger close publishes with escalated fees for queued transactions + BEAST_EXPECT(wsc->findMsg(5s, + [&](auto const& jv) + { + return jv[jss::type] == "serverStatus" && + jv.isMember(jss::load_factor) && + jv[jss::load_factor] == 184320 && + jv.isMember(jss::load_base) && + jv[jss::load_base] == 256 && + jv.isMember(jss::load_factor_server) && + jv[jss::load_factor_server] == 256 && + jv.isMember(jss::load_factor_fee_escalation) && + jv[jss::load_factor_fee_escalation] == 184320 && + jv.isMember(jss::load_factor_fee_queue) && + jv[jss::load_factor_fee_queue] == 256 && + jv.isMember(jss::load_factor_fee_reference) && + jv[jss::load_factor_fee_reference] == 256; + })); + + env.close(); + // ledger close clears queue so fee is back to normal + BEAST_EXPECT(wsc->findMsg(5s, + [&](auto const& jv) + { + return jv[jss::type] == "serverStatus" && + jv.isMember(jss::load_factor) && + jv[jss::load_factor] == 256 && + jv.isMember(jss::load_base) && + jv[jss::load_base] == 256 && + jv.isMember(jss::load_factor_server) && + jv[jss::load_factor_server] == 256 && + jv.isMember(jss::load_factor_fee_escalation) && + jv[jss::load_factor_fee_escalation] == 256 && + jv.isMember(jss::load_factor_fee_queue) && + jv[jss::load_factor_fee_queue] == 256 && + jv.isMember(jss::load_factor_fee_reference) && + jv[jss::load_factor_fee_reference] == 256; + })); + + BEAST_EXPECT(!wsc->findMsg(1s, + [&](auto const& jv) + { + return jv[jss::type] == "serverStatus"; + })); + + auto jv = wsc->invoke("unsubscribe", stream); + BEAST_EXPECT(jv[jss::status] == "success"); + + } + void testClearQueuedAccountTxs() { using namespace jtx; @@ -2645,6 +2814,7 @@ public: testSignAndSubmitSequence(); testAccountInfo(); testServerInfo(); + testServerSubscribe(); testClearQueuedAccountTxs(); } };