Inject Application:

Calls to getApp are replaced with dependency injections.
This commit is contained in:
Vinnie Falco
2015-09-14 14:25:23 -07:00
committed by Edward Hennis
parent f4fe55caff
commit 9b787434c9
132 changed files with 1203 additions and 1063 deletions

View File

@@ -50,13 +50,14 @@
namespace ripple {
PeerImp::PeerImp (id_t id, endpoint_type remote_endpoint,
PeerImp::PeerImp (Application& app, id_t id, endpoint_type remote_endpoint,
PeerFinder::Slot::ptr const& slot, beast::http::message&& request,
protocol::TMHello const& hello, RippleAddress const& publicKey,
Resource::Consumer consumer,
std::unique_ptr<beast::asio::ssl_bundle>&& ssl_bundle,
OverlayImpl& overlay)
: Child (overlay)
, app_ (app)
, id_(id)
, sink_(deprecatedLogs().journal("Peer"), makePrefix(id))
, p_sink_(deprecatedLogs().journal("Protocol"), makePrefix(id))
@@ -603,7 +604,7 @@ void PeerImp::doAccept()
"Protocol: " << to_string(protocol);
if(journal_.info) journal_.info <<
"Public Key: " << publicKey_.humanNodePublic();
bool const cluster = getApp().getUNL().nodeInCluster(publicKey_, name_);
bool const cluster = app_.getUNL().nodeInCluster(publicKey_, name_);
if (cluster)
if (journal_.info) journal_.info <<
"Cluster name: " << name_;
@@ -647,7 +648,7 @@ PeerImp::makeResponse (bool crawl,
resp.headers.append("Connect-AS", "Peer");
resp.headers.append("Server", BuildInfo::getFullVersionString());
resp.headers.append ("Crawl", crawl ? "public" : "private");
protocol::TMHello hello = buildHello(sharedValue, getApp());
protocol::TMHello hello = buildHello(sharedValue, app_);
appendHello(resp, hello);
return resp;
}
@@ -811,7 +812,7 @@ PeerImp::error_code
PeerImp::onMessageBegin (std::uint16_t type,
std::shared_ptr <::google::protobuf::Message> const& m)
{
load_event_ = getApp().getJobQueue ().getLoadEventAP (
load_event_ = app_.getJobQueue ().getLoadEventAP (
jtPEER, protocolMessageName(type));
fee_ = Resource::feeLightPeer;
return error_code{};
@@ -836,7 +837,7 @@ PeerImp::onMessage (std::shared_ptr<protocol::TMManifests> const& m)
{
// VFALCO What's the right job type?
auto that = shared_from_this();
getApp().getJobQueue().addJob (
app_.getJobQueue().addJob (
jtVALIDATION_ut, "receiveManifests",
[this, that, m] (Job&) { overlay_.onManifests(m, that); });
}
@@ -901,7 +902,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMCluster> const& m)
RippleAddress nodePub;
nodePub.setNodePublic(node.publickey());
getApp().getUNL().nodeUpdate(nodePub, s);
app_.getUNL().nodeUpdate(nodePub, s);
}
int loadSources = m->loadsources().size();
@@ -921,7 +922,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMCluster> const& m)
overlay_.resourceManager().importConsumers (name_, gossip);
}
getApp().getFeeTrack().setClusterFee(getApp().getUNL().getClusterFee());
app_.getFeeTrack().setClusterFee(app_.getUNL().getClusterFee());
}
void
@@ -1008,7 +1009,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
if (sanity_.load() == Sanity::insane)
return;
if (getApp().getOPs().isNeedNetworkLedger ())
if (app_.getOPs().isNeedNetworkLedger ())
{
// If we've never been in synch, there's nothing we can do
// with a transaction
@@ -1024,7 +1025,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
int flags;
if (! getApp().getHashRouter ().addSuppressionPeer (
if (! app_.getHashRouter ().addSuppressionPeer (
txID, id_, flags))
{
// we have seen this transaction recently
@@ -1058,14 +1059,14 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
}
}
if (getApp().getJobQueue().getJobCount(jtTRANSACTION) > 100)
if (app_.getJobQueue().getJobCount(jtTRANSACTION) > 100)
p_journal_.info << "Transaction queue is full";
else if (getApp().getLedgerMaster().getValidatedLedgerAge() > 240)
else if (app_.getLedgerMaster().getValidatedLedgerAge() > 240)
p_journal_.trace << "No new transactions until synchronized";
else
{
std::weak_ptr<PeerImp> weak = shared_from_this();
getApp().getJobQueue ().addJob (
app_.getJobQueue ().addJob (
jtTRANSACTION, "recvTransaction->checkTransaction",
[weak, flags, stx] (Job&) {
if (auto peer = weak.lock())
@@ -1085,7 +1086,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMGetLedger> const& m)
{
fee_ = Resource::feeMediumBurdenPeer;
std::weak_ptr<PeerImp> weak = shared_from_this();
getApp().getJobQueue().addJob (
app_.getJobQueue().addJob (
jtLEDGER_REQ, "recvGetLedger",
[weak, m] (Job&) {
if (auto peer = weak.lock())
@@ -1137,7 +1138,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMLedgerData> const& m)
// got data for a candidate transaction set
std::weak_ptr<PeerImp> weak = shared_from_this();
auto& journal = p_journal_;
getApp().getJobQueue().addJob(
app_.getJobQueue().addJob(
jtTXN_DATA, "recvPeerData",
[weak, hash, journal, m] (Job&) {
if (auto peer = weak.lock())
@@ -1146,7 +1147,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMLedgerData> const& m)
return;
}
if (!getApp().getInboundLedgers ().gotLedgerData (
if (!app_.getInboundLedgers ().gotLedgerData (
hash, shared_from_this(), m))
{
p_journal_.trace << "Got data for unwanted ledger";
@@ -1163,7 +1164,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMProposeSet> const& m)
set.set_hops(set.hops() + 1);
// VFALCO Magic numbers are bad
if ((set.closetime() + 180) < getApp().timeKeeper().closeTime().time_since_epoch().count())
if ((set.closetime() + 180) < app_.timeKeeper().closeTime().time_since_epoch().count())
return;
auto const type = publicKeyType(
@@ -1198,7 +1199,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMProposeSet> const& m)
Blob(set.nodepubkey ().begin (), set.nodepubkey ().end ()),
Blob(set.signature ().begin (), set.signature ().end ()));
if (! getApp().getHashRouter ().addSuppressionPeer (suppression, id_))
if (! app_.getHashRouter ().addSuppressionPeer (suppression, id_))
{
p_journal_.trace << "Proposal: duplicate";
return;
@@ -1213,7 +1214,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMProposeSet> const& m)
return;
}
bool isTrusted = getApp().getUNL ().nodeInUNL (signerPublic);
bool isTrusted = app_.getUNL ().nodeInUNL (signerPublic);
if (!isTrusted)
{
@@ -1223,7 +1224,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMProposeSet> const& m)
return;
}
if (getApp().getFeeTrack ().isLoadedLocal ())
if (app_.getFeeTrack ().isLoadedLocal ())
{
p_journal_.debug << "Proposal: Dropping UNTRUSTED (load)";
return;
@@ -1239,7 +1240,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMProposeSet> const& m)
suppression);
std::weak_ptr<PeerImp> weak = shared_from_this();
getApp().getJobQueue ().addJob (
app_.getJobQueue ().addJob (
isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, "recvPropose->checkPropose",
[weak, m, proposal] (Job& job) {
if (auto peer = weak.lock())
@@ -1253,7 +1254,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMStatusChange> const& m)
p_journal_.trace << "Status: Change";
if (!m->has_networktime ())
m->set_networktime (getApp().timeKeeper().now().time_since_epoch().count());
m->set_networktime (app_.timeKeeper().now().time_since_epoch().count());
if (!last_status_.has_newstatus () || m->has_newstatus ())
last_status_ = *m;
@@ -1313,9 +1314,9 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMStatusChange> const& m)
}
if (m->has_ledgerseq() &&
getApp().getLedgerMaster().getValidatedLedgerAge() < 120)
app_.getLedgerMaster().getValidatedLedgerAge() < 120)
{
checkSanity (m->ledgerseq(), getApp().getLedgerMaster().getValidLedgerIndex());
checkSanity (m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex());
}
}
@@ -1432,7 +1433,7 @@ void
PeerImp::onMessage (std::shared_ptr <protocol::TMValidation> const& m)
{
error_code ec;
auto const closeTime = getApp().timeKeeper().closeTime().time_since_epoch().count();
auto const closeTime = app_.timeKeeper().closeTime().time_since_epoch().count();
if (m->has_hops() && ! slot_->cluster())
m->set_hops(m->hops() + 1);
@@ -1460,23 +1461,23 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMValidation> const& m)
return;
}
if (! getApp().getHashRouter ().addSuppressionPeer(
if (! app_.getHashRouter ().addSuppressionPeer(
sha512Half(makeSlice(m->validation())), id_))
{
p_journal_.trace << "Validation: duplicate";
return;
}
bool isTrusted = getApp().getUNL ().nodeInUNL (val->getSignerPublic ());
bool isTrusted = app_.getUNL ().nodeInUNL (val->getSignerPublic ());
if (!isTrusted && (sanity_.load () == Sanity::insane))
{
p_journal_.debug <<
"Validation: dropping untrusted from insane peer";
}
if (isTrusted || !getApp().getFeeTrack ().isLoadedLocal ())
if (isTrusted || !app_.getFeeTrack ().isLoadedLocal ())
{
std::weak_ptr<PeerImp> weak = shared_from_this();
getApp().getJobQueue ().addJob (
app_.getJobQueue ().addJob (
isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
"recvValidation->checkValidation",
[weak, val, isTrusted, m] (Job&) {
@@ -1552,7 +1553,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMGetObjectByHash> const& m)
// VFALCO TODO Move this someplace more sensible so we dont
// need to inject the NodeStore interfaces.
std::shared_ptr<NodeObject> hObj =
getApp().getNodeStore ().fetch (hash);
app_.getNodeStore ().fetch (hash);
if (hObj)
{
@@ -1597,7 +1598,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMGetObjectByHash> const& m)
"GetObj: Full fetch pack for " << pLSeq;
pLSeq = obj.ledgerseq ();
pLDo = !getApp().getLedgerMaster ().haveLedger (pLSeq);
pLDo = !app_.getLedgerMaster ().haveLedger (pLSeq);
if (!pLDo)
p_journal_.debug <<
@@ -1616,7 +1617,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMGetObjectByHash> const& m)
std::make_shared< Blob > (
obj.data ().begin (), obj.data ().end ()));
getApp().getLedgerMaster ().addFetchPack (hash, data);
app_.getLedgerMaster ().addFetchPack (hash, data);
}
}
}
@@ -1626,7 +1627,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMGetObjectByHash> const& m)
p_journal_.debug << "GetObj: Partial fetch pack for " << pLSeq;
if (packet.type () == protocol::TMGetObjectByHash::otFETCH_PACK)
getApp().getLedgerMaster ().gotFetchPack (progress, pLSeq);
app_.getLedgerMaster ().gotFetchPack (progress, pLSeq);
}
}
@@ -1655,7 +1656,7 @@ PeerImp::sendHello()
if (! success)
return false;
auto const hello = buildHello (sharedValue_, getApp());
auto const hello = buildHello (sharedValue_, app_);
auto const m = std::make_shared<Message> (
std::move(hello), protocol::mtHELLO);
send (m);
@@ -1685,9 +1686,9 @@ PeerImp::doFetchPack (const std::shared_ptr<protocol::TMGetObjectByHash>& packet
// VFALCO TODO Invert this dependency using an observer and shared state object.
// Don't queue fetch pack jobs if we're under load or we already have
// some queued.
if (getApp().getFeeTrack ().isLoadedLocal () ||
(getApp().getLedgerMaster().getValidatedLedgerAge() > 40) ||
(getApp().getJobQueue().getJobCount(jtPACK) > 10))
if (app_.getFeeTrack ().isLoadedLocal () ||
(app_.getLedgerMaster().getValidatedLedgerAge() > 40) ||
(app_.getJobQueue().getJobCount(jtPACK) > 10))
{
p_journal_.info << "Too busy to make fetch pack";
return;
@@ -1707,10 +1708,11 @@ PeerImp::doFetchPack (const std::shared_ptr<protocol::TMGetObjectByHash>& packet
std::weak_ptr<PeerImp> weak = shared_from_this();
auto elapsed = UptimeTimer::getInstance().getElapsedSeconds();
getApp().getJobQueue ().addJob (
auto const pap = &app_;
app_.getJobQueue ().addJob (
jtPACK, "MakeFetchPack",
[weak, packet, hash, elapsed] (Job&) {
getApp().getLedgerMaster().makeFetchPack(
[pap, weak, packet, hash, elapsed] (Job&) {
pap->getLedgerMaster().makeFetchPack(
weak, packet, hash, elapsed);
});
}
@@ -1724,9 +1726,9 @@ PeerImp::checkTransaction (int flags, STTx::pointer stx)
// Expired?
if (stx->isFieldPresent(sfLastLedgerSequence) &&
(stx->getFieldU32 (sfLastLedgerSequence) <
getApp().getLedgerMaster().getValidLedgerIndex()))
app_.getLedgerMaster().getValidLedgerIndex()))
{
getApp().getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
charge (Resource::feeUnwantedData);
return;
}
@@ -1742,22 +1744,22 @@ PeerImp::checkTransaction (int flags, STTx::pointer stx)
if (! reason.empty ())
p_journal_.trace << "Exception checking transaction: " << reason;
getApp().getHashRouter ().setFlags (stx->getTransactionID (), SF_BAD);
app_.getHashRouter ().setFlags (stx->getTransactionID (), SF_BAD);
charge (Resource::feeInvalidSignature);
return;
}
else
{
getApp().getHashRouter ().setFlags (stx->getTransactionID (), SF_SIGGOOD);
app_.getHashRouter ().setFlags (stx->getTransactionID (), SF_SIGGOOD);
}
bool const trusted (flags & SF_TRUSTED);
getApp().getOPs ().processTransaction (
app_.getOPs ().processTransaction (
tx, trusted, false, NetworkOPs::FailHard::no);
}
catch (...)
{
getApp().getHashRouter ().setFlags (stx->getTransactionID (), SF_BAD);
app_.getHashRouter ().setFlags (stx->getTransactionID (), SF_BAD);
charge (Resource::feeBadData);
}
}
@@ -1786,15 +1788,15 @@ PeerImp::checkPropose (Job& job,
if (isTrusted)
{
getApp().getOPs ().processTrustedProposal (
app_.getOPs ().processTrustedProposal (
proposal, packet, publicKey_);
}
else
{
uint256 consensusLCL;
{
std::lock_guard<Application::MutexType> lock (getApp().getMasterMutex());
consensusLCL = getApp().getOPs ().getConsensusLCL ();
std::lock_guard<Application::MutexType> lock (app_.getMasterMutex());
consensusLCL = app_.getOPs ().getConsensusLCL ();
}
if (consensusLCL == proposal->getPrevLedger())
@@ -1828,7 +1830,7 @@ PeerImp::checkValidation (STValidation::pointer val,
return;
}
if (getApp().getOPs ().recvValidation(
if (app_.getOPs ().recvValidation(
val, std::to_string(id())))
overlay_.relay(*packet, signingHash);
}
@@ -1928,7 +1930,7 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
uint256 txHash;
memcpy (txHash.begin (), packet.ledgerhash ().data (), 32);
shared = getApp().getInboundTransactions().getSet (txHash, false);
shared = app_.getInboundTransactions().getSet (txHash, false);
map = shared.get();
if (! map)
@@ -1973,7 +1975,7 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
return;
}
if (getApp().getFeeTrack().isLoadedLocal() && ! cluster())
if (app_.getFeeTrack().isLoadedLocal() && ! cluster())
{
if (p_journal_.debug) p_journal_.debug <<
"GetLedger: Too busy";
@@ -2000,7 +2002,7 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
memcpy (ledgerhash.begin (), packet.ledgerhash ().data (), 32);
logMe += "LedgerHash:";
logMe += to_string (ledgerhash);
ledger = getApp().getLedgerMaster ().getLedgerByHash (ledgerhash);
ledger = app_.getLedgerMaster ().getLedgerByHash (ledgerhash);
if (!ledger)
if (p_journal_.trace) p_journal_.trace <<
@@ -2034,13 +2036,13 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
else if (packet.has_ledgerseq ())
{
if (packet.ledgerseq() <
getApp().getLedgerMaster().getEarliestFetch())
app_.getLedgerMaster().getEarliestFetch())
{
if (p_journal_.debug) p_journal_.debug <<
"GetLedger: Early ledger request";
return;
}
ledger = getApp().getLedgerMaster ().getLedgerBySeq (
ledger = app_.getLedgerMaster ().getLedgerBySeq (
packet.ledgerseq ());
if (! ledger)
if (p_journal_.debug) p_journal_.debug <<
@@ -2048,10 +2050,10 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
}
else if (packet.has_ltype () && (packet.ltype () == protocol::ltCLOSED) )
{
ledger = getApp().getLedgerMaster ().getClosedLedger ();
ledger = app_.getLedgerMaster ().getClosedLedger ();
if (ledger && ledger->info().open)
ledger = getApp().getLedgerMaster ().getLedgerBySeq (
ledger = app_.getLedgerMaster ().getLedgerBySeq (
ledger->info().seq - 1);
}
else
@@ -2075,7 +2077,7 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
}
if (!packet.has_ledgerseq() && (ledger->info().seq <
getApp().getLedgerMaster().getEarliestFetch()))
app_.getLedgerMaster().getEarliestFetch()))
{
if (p_journal_.debug) p_journal_.debug <<
"GetLedger: Early ledger request";
@@ -2240,7 +2242,7 @@ PeerImp::peerTXData (uint256 const& hash,
std::shared_ptr <protocol::TMLedgerData> const& pPacket,
beast::Journal journal)
{
getApp().getInboundTransactions().gotData (hash, shared_from_this(), pPacket);
app_.getInboundTransactions().gotData (hash, shared_from_this(), pPacket);
}
int