Track and report peer load:

* PeerImp::charge only calls fail if dispatched from the peer
* Add "load" to output of RPC command "peer"
* Add Resource::Charge values for peer commands
* Impose some fee for every peer command
* Cleanup fee imposition
This commit is contained in:
David Schwartz
2015-03-05 15:11:26 -08:00
committed by Nik Bougalis
parent ff7dc0b446
commit e44e75fa6b
5 changed files with 54 additions and 27 deletions

View File

@@ -69,6 +69,7 @@ PeerImp::PeerImp (id_t id, endpoint_type remote_endpoint,
, publicKey_(publicKey)
, hello_(hello)
, usage_(consumer)
, fee_ (Resource::feeLightPeer)
, slot_ (slot)
, http_message_(std::move(request))
, validatorsConnection_(getApp().getValidators().newConnection(id))
@@ -179,8 +180,12 @@ PeerImp::send (Message::pointer const& m)
void
PeerImp::charge (Resource::Charge const& fee)
{
if ((usage_.charge(fee) == Resource::drop) && usage_.disconnect())
if ((usage_.charge(fee) == Resource::drop) &&
usage_.disconnect() && strand_.running_in_this_thread())
{
// Sever the connection
fail("charge: Resources");
}
}
//------------------------------------------------------------------------------
@@ -222,6 +227,8 @@ PeerImp::json()
ret[jss::name] = name_;
}
ret[jss::load] = usage_.balance ();
if (hello_.has_fullversion ())
ret[jss::version] = hello_.fullversion ();
@@ -687,6 +694,7 @@ PeerImp::onMessageBegin (std::uint16_t type,
{
load_event_ = getApp().getJobQueue ().getLoadEventAP (
jtPEER, protocolMessageName(type));
fee_ = Resource::feeLightPeer;
return error_code{};
}
@@ -695,6 +703,7 @@ PeerImp::onMessageEnd (std::uint16_t,
std::shared_ptr <::google::protobuf::Message> const&)
{
load_event_.reset();
charge (fee_);
}
void
@@ -708,6 +717,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMPing> const& m)
{
if (m->type () == protocol::TMPing::ptPING)
{
fee_ = Resource::feeMediumBurdenPeer;
m->set_type (protocol::TMPing::ptPONG);
send (std::make_shared<Message> (*m, protocol::mtPING));
}
@@ -718,7 +728,10 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMCluster> const& m)
{
// VFALCO NOTE I think we should drop the peer immediately
if (! cluster())
return charge (Resource::feeUnwantedData);
{
fee_ = Resource::feeUnwantedData;
return;
}
for (int i = 0; i < m->clusternodes().size(); ++i)
{
@@ -853,7 +866,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
// we have seen this transaction recently
if (flags & SF_BAD)
{
charge (Resource::feeInvalidSignature);
fee_ = Resource::feeInvalidSignature;
return;
}
@@ -901,6 +914,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
void
PeerImp::onMessage (std::shared_ptr <protocol::TMGetLedger> const& m)
{
fee_ = Resource::feeMediumBurdenPeer;
getApp().getJobQueue().addJob (jtPACK, "recvGetLedger", std::bind(
beast::weak_fn(&PeerImp::getLedger, shared_from_this()), m));
}
@@ -928,7 +942,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMLedgerData> const& m)
else
{
p_journal_.info << "Unable to route TX/ledger data reply";
charge (Resource::feeUnwantedData);
fee_ = Resource::feeUnwantedData;
}
return;
}
@@ -938,7 +952,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMLedgerData> const& m)
if (m->ledgerhash ().size () != 32)
{
p_journal_.warning << "TX candidate reply with invalid hash size";
charge (Resource::feeInvalidRequest);
fee_ = Resource::feeInvalidRequest;
return;
}
@@ -957,7 +971,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMLedgerData> const& m)
hash, shared_from_this(), m))
{
p_journal_.trace << "Got data for unwanted ledger";
charge (Resource::feeUnwantedData);
fee_ = Resource::feeUnwantedData;
}
}
@@ -981,14 +995,14 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMProposeSet> const& m)
)
{
p_journal_.warning << "Proposal: malformed";
charge (Resource::feeInvalidSignature);
fee_ = Resource::feeInvalidSignature;
return;
}
if (set.has_previousledger () && (set.previousledger ().size () != 32))
{
p_journal_.warning << "Proposal: malformed";
charge (Resource::feeInvalidRequest);
fee_ = Resource::feeInvalidRequest;
return;
}
@@ -1120,7 +1134,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMHaveTransactionSet> const& m)
if (m->hash ().size () != (256 / 8))
{
charge (Resource::feeInvalidRequest);
fee_ = Resource::feeInvalidRequest;
return;
}
@@ -1139,7 +1153,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMHaveTransactionSet> const& m)
if (!getApp().getOPs ().hasTXSet (
shared_from_this (), hash, m->status ()))
charge (Resource::feeUnwantedData);
fee_ = Resource::feeUnwantedData;
}
}
@@ -1152,7 +1166,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMValidation> const& m)
if (m->validation ().size () < 50)
{
p_journal_.warning << "Validation: Too small";
charge (Resource::feeInvalidRequest);
fee_ = Resource::feeInvalidRequest;
return;
}
@@ -1166,7 +1180,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMValidation> const& m)
if (closeTime > (120 + val->getFieldU32(sfSigningTime)))
{
p_journal_.trace << "Validation: Too old";
charge (Resource::feeUnwantedData);
fee_ = Resource::feeUnwantedData;
return;
}
@@ -1196,13 +1210,13 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMValidation> const& m)
{
p_journal_.warning <<
"Validation: Exception, " << e.what();
charge (Resource::feeInvalidRequest);
fee_ = Resource::feeInvalidRequest;
}
catch (...)
{
p_journal_.warning <<
"Validation: Unknown exception";
charge (Resource::feeInvalidRequest);
fee_ = Resource::feeInvalidRequest;
}
}
@@ -1220,6 +1234,8 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMGetObjectByHash> const& m)
return;
}
fee_ = Resource::feeMediumBurdenPeer;
protocol::TMGetObjectByHash reply;
reply.set_query (false);
@@ -1403,10 +1419,12 @@ PeerImp::doFetchPack (const std::shared_ptr<protocol::TMGetObjectByHash>& packet
if (packet->ledgerhash ().size () != 32)
{
p_journal_.warning << "FetchPack hash size malformed";
charge (Resource::feeInvalidRequest);
fee_ = Resource::feeInvalidRequest;
return;
}
fee_ = Resource::feeHighBurdenPeer;
uint256 hash;
memcpy (hash.begin (), packet->ledgerhash ().data (), 32);
@@ -1429,7 +1447,8 @@ PeerImp::checkTransaction (Job&, int flags,
getApp().getLedgerMaster().getValidLedgerIndex()))
{
getApp().getHashRouter().setFlag(stx->getTransactionID(), SF_BAD);
return charge (Resource::feeUnwantedData);
charge (Resource::feeUnwantedData);
return;
}
auto validate = (flags & SF_SIGGOOD) ? Validate::NO : Validate::YES;
@@ -1438,7 +1457,8 @@ PeerImp::checkTransaction (Job&, int flags,
if (tx->getStatus () == INVALID)
{
getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_BAD);
return charge (Resource::feeInvalidSignature);
charge (Resource::feeInvalidSignature);
return;
}
else
{
@@ -1451,7 +1471,7 @@ PeerImp::checkTransaction (Job&, int flags,
catch (...)
{
getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_BAD);
charge (Resource::feeInvalidRequest);
charge (Resource::feeBadData);
}
}