Publish server stream when fee changes (RIPD-1406):

Resolves #1991

Publish a server status update after every ledger close or open
ledger update if there is a change in fees.
This commit is contained in:
Brad Chase
2017-02-07 10:21:28 -05:00
committed by seelabs
parent 30b6e4e2e5
commit f1377d5d30
3 changed files with 282 additions and 18 deletions

View File

@@ -1009,6 +1009,9 @@ void LedgerConsensusImp<Traits>::accept (TxSet_t const& set)
// Stuff the ledger with transactions from the queue. // Stuff the ledger with transactions from the queue.
return app_.getTxQ().accept(app_, view); return app_.getTxQ().accept(app_, view);
}); });
// Signal a potential fee change to subscribers after the open ledger
// is created
app_.getOPs().reportFeeChange();
} }
ledgerMaster_.switchLCL (sharedLCL); ledgerMaster_.switchLCL (sharedLCL);

View File

@@ -180,6 +180,30 @@ class NetworkOPsImp final
Json::Value json() const; Json::Value json() const;
}; };
//! Server fees published on `server` subscription
struct ServerFeeSummary
{
ServerFeeSummary() = default;
ServerFeeSummary(std::uint64_t fee,
boost::optional<TxQ::Metrics>&& escalationMetrics,
LoadFeeTrack const & loadFeeTrack);
bool
operator !=(ServerFeeSummary const & b) const;
bool
operator ==(ServerFeeSummary const & b) const
{
return !(*this != b);
}
std::uint32_t loadFactorServer = 256;
std::uint32_t loadBaseServer = 256;
std::uint64_t baseFee = 10;
boost::optional<TxQ::Metrics> em = boost::none;
};
public: public:
// VFALCO TODO Make LedgerMaster a SharedPtr or a reference. // VFALCO TODO Make LedgerMaster a SharedPtr or a reference.
// //
@@ -202,8 +226,6 @@ public:
, mLedgerConsensus (mConsensus->makeLedgerConsensus ( , mLedgerConsensus (mConsensus->makeLedgerConsensus (
app, app.getInboundTransactions(), ledgerMaster, *m_localTX)) app, app.getInboundTransactions(), ledgerMaster, *m_localTX))
, m_ledgerMaster (ledgerMaster) , m_ledgerMaster (ledgerMaster)
, mLastLoadBase (256)
, mLastLoadFactor (256)
, m_job_queue (job_queue) , m_job_queue (job_queue)
, m_standalone (standalone) , m_standalone (standalone)
, m_network_quorum (start_valid ? 0 : network_quorum) , m_network_quorum (start_valid ? 0 : network_quorum)
@@ -557,8 +579,8 @@ private:
SubMapType mSubValidations; // Received validations. SubMapType mSubValidations; // Received validations.
SubMapType mSubPeerStatus; // peer status changes SubMapType mSubPeerStatus; // peer status changes
std::uint32_t mLastLoadBase; ServerFeeSummary mLastFeeSummary;
std::uint32_t mLastLoadFactor;
JobQueue& m_job_queue; JobQueue& m_job_queue;
@@ -952,6 +974,7 @@ void NetworkOPsImp::apply (std::unique_lock<std::mutex>& batchLock)
{ {
auto lock = make_lock(app_.getMasterMutex()); auto lock = make_lock(app_.getMasterMutex());
bool changed = false;
{ {
std::lock_guard <std::recursive_mutex> lock ( std::lock_guard <std::recursive_mutex> lock (
m_ledgerMaster.peekMutex()); m_ledgerMaster.peekMutex());
@@ -959,7 +982,6 @@ void NetworkOPsImp::apply (std::unique_lock<std::mutex>& batchLock)
app_.openLedger().modify( app_.openLedger().modify(
[&](OpenView& view, beast::Journal j) [&](OpenView& view, beast::Journal j)
{ {
bool changed = false;
for (TransactionStatus& e : transactions) for (TransactionStatus& e : transactions)
{ {
// we check before addingto the batch // we check before addingto the batch
@@ -977,6 +999,8 @@ void NetworkOPsImp::apply (std::unique_lock<std::mutex>& batchLock)
return changed; return changed;
}); });
} }
if (changed)
reportFeeChange();
auto newOL = app_.openLedger().current(); auto newOL = app_.openLedger().current();
for (TransactionStatus& e : transactions) for (TransactionStatus& e : transactions)
@@ -1601,6 +1625,38 @@ void NetworkOPsImp::pubManifest (Manifest const& mo)
} }
} }
NetworkOPsImp::ServerFeeSummary::ServerFeeSummary(
std::uint64_t fee,
boost::optional<TxQ::Metrics>&& escalationMetrics,
LoadFeeTrack const & loadFeeTrack)
: loadFactorServer{loadFeeTrack.getLoadFactor()}
, loadBaseServer{loadFeeTrack.getLoadBase()}
, baseFee{fee}
, em{std::move(escalationMetrics)}
{
}
bool
NetworkOPsImp::ServerFeeSummary::operator !=(NetworkOPsImp::ServerFeeSummary const & b) const
{
if(loadFactorServer != b.loadFactorServer ||
loadBaseServer != b.loadBaseServer ||
baseFee != b.baseFee ||
em.is_initialized() != b.em.is_initialized())
return true;
if(em)
{
return (em->minFeeLevel != b.em->minFeeLevel ||
em->expFeeLevel != b.em->expFeeLevel ||
em->referenceFeeLevel != b.em->referenceFeeLevel);
}
return false;
}
void NetworkOPsImp::pubServer () void NetworkOPsImp::pubServer ()
{ {
// VFALCO TODO Don't hold the lock across calls to send...make a copy of the // VFALCO TODO Don't hold the lock across calls to send...make a copy of the
@@ -1612,14 +1668,45 @@ void NetworkOPsImp::pubServer ()
if (!mSubServer.empty ()) if (!mSubServer.empty ())
{ {
Json::Value jvObj (Json::objectValue); Json::Value jvObj (Json::objectValue);
auto const& feeTrack = app_.getFeeTrack();
jvObj [jss::type] = "serverStatus"; ServerFeeSummary f{app_.openLedger().current()->fees().base,
app_.getTxQ().getMetrics(*app_.openLedger().current()),
app_.getFeeTrack()};
// Need to cap to uint64 to uint32 due to JSON limitations
auto clamp = [](std::uint64_t v)
{
constexpr std::uint64_t max32 =
std::numeric_limits<std::uint32_t>::max();
return static_cast<std::uint32_t>(std::min(max32, v));
};
jvObj [jss::type] = "serverStatus";
jvObj [jss::server_status] = strOperatingMode (); jvObj [jss::server_status] = strOperatingMode ();
jvObj [jss::load_base] = jvObj [jss::load_base] = f.loadBaseServer;
(mLastLoadBase = feeTrack.getLoadBase ()); jvObj [jss::load_factor_server] = f.loadFactorServer;
jvObj [jss::load_factor] = jvObj [jss::base_fee] = clamp(f.baseFee);
(mLastLoadFactor = feeTrack.getLoadFactor ());
if(f.em)
{
auto const loadFactor =
std::max(static_cast<std::uint64_t>(f.loadFactorServer),
mulDiv(f.em->expFeeLevel, f.loadBaseServer,
f.em->referenceFeeLevel).second);
jvObj [jss::load_factor] = clamp(loadFactor);
jvObj [jss::load_factor_fee_escalation] = clamp(f.em->expFeeLevel);
jvObj [jss::load_factor_fee_queue] = clamp(f.em->minFeeLevel);
jvObj [jss::load_factor_fee_reference]
= clamp(f.em->referenceFeeLevel);
}
else
jvObj [jss::load_factor] = f.loadFactorServer;
mLastFeeSummary = f;
for (auto i = mSubServer.begin (); i != mSubServer.end (); ) for (auto i = mSubServer.begin (); i != mSubServer.end (); )
{ {
@@ -2108,6 +2195,7 @@ Json::Value NetworkOPsImp::getServerInfo (bool human, bool admin)
constexpr std::uint64_t max32 = constexpr std::uint64_t max32 =
std::numeric_limits<std::uint32_t>::max(); std::numeric_limits<std::uint32_t>::max();
auto const loadFactorServer = app_.getFeeTrack().getLoadFactor(); auto const loadFactorServer = app_.getFeeTrack().getLoadFactor();
auto const loadBaseServer = app_.getFeeTrack().getLoadBase(); auto const loadBaseServer = app_.getFeeTrack().getLoadBase();
auto const loadFactorFeeEscalation = escalationMetrics ? auto const loadFactorFeeEscalation = escalationMetrics ?
@@ -2367,14 +2455,17 @@ void NetworkOPsImp::pubLedger (
void NetworkOPsImp::reportFeeChange () void NetworkOPsImp::reportFeeChange ()
{ {
auto const& feeTrack = app_.getFeeTrack(); ServerFeeSummary f{app_.openLedger().current()->fees().base,
if ((feeTrack.getLoadBase () == mLastLoadBase) && app_.getTxQ().getMetrics(*app_.openLedger().current()),
(feeTrack.getLoadFactor () == mLastLoadFactor)) app_.getFeeTrack()};
return;
m_job_queue.addJob (
jtCLIENT, "reportFeeChange->pubServer", // only schedule the job if something has changed
[this] (Job&) { pubServer(); }); if (f != mLastFeeSummary)
{
m_job_queue.addJob ( jtCLIENT, "reportFeeChange->pubServer",
[this] (Job&) { pubServer(); });
}
} }
// This routine should only be used to publish accepted or validated // This routine should only be used to publish accepted or validated

View File

@@ -32,6 +32,7 @@
#include <test/jtx.h> #include <test/jtx.h>
#include <test/jtx/ticket.h> #include <test/jtx/ticket.h>
#include <boost/optional.hpp> #include <boost/optional.hpp>
#include <test/jtx/WSClient.h>
namespace ripple { namespace ripple {
namespace test { namespace test {
@@ -2416,6 +2417,174 @@ public:
} }
} }
void testServerSubscribe()
{
using namespace jtx;
Env env(*this, makeConfig({ { "minimum_txn_in_ledger_standalone", "3" } }),
features(featureFeeEscalation));
Json::Value stream;
stream[jss::streams] = Json::arrayValue;
stream[jss::streams].append("server");
auto wsc = makeWSClient(env.app().config());
{
auto jv = wsc->invoke("subscribe", stream);
BEAST_EXPECT(jv[jss::status] == "success");
}
Account a{"a"}, b{"b"}, c{"c"}, d{"d"}, e{"e"}, f{"f"},
g{"g"}, h{"h"}, i{"i"};
// Fund the first few accounts at non escalated fee
env.fund(XRP(50000), noripple(a,b,c,d));
checkMetrics(env, 0, boost::none, 4, 3, 256);
// First transaction establishes the messaging
BEAST_EXPECT(wsc->findMsg(5s,
[&](auto const& jv)
{
return jv[jss::type] == "serverStatus" &&
jv.isMember(jss::load_factor) &&
jv[jss::load_factor] == 256 &&
jv.isMember(jss::load_base) &&
jv[jss::load_base] == 256 &&
jv.isMember(jss::load_factor_server) &&
jv[jss::load_factor_server] == 256 &&
jv.isMember(jss::load_factor_fee_escalation) &&
jv[jss::load_factor_fee_escalation] == 256 &&
jv.isMember(jss::load_factor_fee_queue) &&
jv[jss::load_factor_fee_queue] == 256 &&
jv.isMember(jss::load_factor_fee_reference) &&
jv[jss::load_factor_fee_reference] == 256;
}));
// Last transaction escalates the fee
BEAST_EXPECT(wsc->findMsg(5s,
[&](auto const& jv)
{
return jv[jss::type] == "serverStatus" &&
jv.isMember(jss::load_factor) &&
jv[jss::load_factor] == 227555 &&
jv.isMember(jss::load_base) &&
jv[jss::load_base] == 256 &&
jv.isMember(jss::load_factor_server) &&
jv[jss::load_factor_server] == 256 &&
jv.isMember(jss::load_factor_fee_escalation) &&
jv[jss::load_factor_fee_escalation] == 227555 &&
jv.isMember(jss::load_factor_fee_queue) &&
jv[jss::load_factor_fee_queue] == 256 &&
jv.isMember(jss::load_factor_fee_reference) &&
jv[jss::load_factor_fee_reference] == 256;
}));
env.close();
// Closing ledger should publish a status update
BEAST_EXPECT(wsc->findMsg(5s,
[&](auto const& jv)
{
return jv[jss::type] == "serverStatus" &&
jv.isMember(jss::load_factor) &&
jv[jss::load_factor] == 256 &&
jv.isMember(jss::load_base) &&
jv[jss::load_base] == 256 &&
jv.isMember(jss::load_factor_server) &&
jv[jss::load_factor_server] == 256 &&
jv.isMember(jss::load_factor_fee_escalation) &&
jv[jss::load_factor_fee_escalation] == 256 &&
jv.isMember(jss::load_factor_fee_queue) &&
jv[jss::load_factor_fee_queue] == 256 &&
jv.isMember(jss::load_factor_fee_reference) &&
jv[jss::load_factor_fee_reference] == 256;
}));
checkMetrics(env, 0, 8, 0, 4, 256);
// Fund then next few accounts at non escalated fee
env.fund(XRP(50000), noripple(e,f,g,h,i));
// Extra transactions with low fee are queued
auto queued = ter(terQUEUED);
env(noop(a), fee(10), queued);
env(noop(b), fee(10), queued);
env(noop(c), fee(10), queued);
env(noop(d), fee(10), queued);
env(noop(e), fee(10), queued);
env(noop(f), fee(10), queued);
env(noop(g), fee(10), queued);
checkMetrics(env, 7, 8, 5, 4, 256);
// Last transaction escalates the fee
BEAST_EXPECT(wsc->findMsg(5s,
[&](auto const& jv)
{
return jv[jss::type] == "serverStatus" &&
jv.isMember(jss::load_factor) &&
jv[jss::load_factor] == 200000 &&
jv.isMember(jss::load_base) &&
jv[jss::load_base] == 256 &&
jv.isMember(jss::load_factor_server) &&
jv[jss::load_factor_server] == 256 &&
jv.isMember(jss::load_factor_fee_escalation) &&
jv[jss::load_factor_fee_escalation] == 200000 &&
jv.isMember(jss::load_factor_fee_queue) &&
jv[jss::load_factor_fee_queue] == 256 &&
jv.isMember(jss::load_factor_fee_reference) &&
jv[jss::load_factor_fee_reference] == 256;
}));
env.close();
// Ledger close publishes with escalated fees for queued transactions
BEAST_EXPECT(wsc->findMsg(5s,
[&](auto const& jv)
{
return jv[jss::type] == "serverStatus" &&
jv.isMember(jss::load_factor) &&
jv[jss::load_factor] == 184320 &&
jv.isMember(jss::load_base) &&
jv[jss::load_base] == 256 &&
jv.isMember(jss::load_factor_server) &&
jv[jss::load_factor_server] == 256 &&
jv.isMember(jss::load_factor_fee_escalation) &&
jv[jss::load_factor_fee_escalation] == 184320 &&
jv.isMember(jss::load_factor_fee_queue) &&
jv[jss::load_factor_fee_queue] == 256 &&
jv.isMember(jss::load_factor_fee_reference) &&
jv[jss::load_factor_fee_reference] == 256;
}));
env.close();
// ledger close clears queue so fee is back to normal
BEAST_EXPECT(wsc->findMsg(5s,
[&](auto const& jv)
{
return jv[jss::type] == "serverStatus" &&
jv.isMember(jss::load_factor) &&
jv[jss::load_factor] == 256 &&
jv.isMember(jss::load_base) &&
jv[jss::load_base] == 256 &&
jv.isMember(jss::load_factor_server) &&
jv[jss::load_factor_server] == 256 &&
jv.isMember(jss::load_factor_fee_escalation) &&
jv[jss::load_factor_fee_escalation] == 256 &&
jv.isMember(jss::load_factor_fee_queue) &&
jv[jss::load_factor_fee_queue] == 256 &&
jv.isMember(jss::load_factor_fee_reference) &&
jv[jss::load_factor_fee_reference] == 256;
}));
BEAST_EXPECT(!wsc->findMsg(1s,
[&](auto const& jv)
{
return jv[jss::type] == "serverStatus";
}));
auto jv = wsc->invoke("unsubscribe", stream);
BEAST_EXPECT(jv[jss::status] == "success");
}
void testClearQueuedAccountTxs() void testClearQueuedAccountTxs()
{ {
using namespace jtx; using namespace jtx;
@@ -2645,6 +2814,7 @@ public:
testSignAndSubmitSequence(); testSignAndSubmitSequence();
testAccountInfo(); testAccountInfo();
testServerInfo(); testServerInfo();
testServerSubscribe();
testClearQueuedAccountTxs(); testClearQueuedAccountTxs();
} }
}; };