mirror of
https://github.com/Xahau/xahaud.git
synced 2025-12-06 17:27:52 +00:00
Tie the peer code into the new load management code.
This commit is contained in:
@@ -3,6 +3,8 @@
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
|
||||
#include "../database/database.h"
|
||||
|
||||
#include "LedgerMaster.h"
|
||||
#include "UniqueNodeList.h"
|
||||
#include "ConnectionPool.h"
|
||||
@@ -17,9 +19,10 @@
|
||||
#include "ValidationCollection.h"
|
||||
#include "Suppression.h"
|
||||
#include "SNTPClient.h"
|
||||
#include "../database/database.h"
|
||||
#include "JobQueue.h"
|
||||
#include "RPCHandler.h"
|
||||
#include "ProofOfWork.h"
|
||||
#include "LoadManager.h"
|
||||
|
||||
class RPCDoor;
|
||||
class PeerDoor;
|
||||
@@ -58,6 +61,8 @@ class Application
|
||||
SNTPClient mSNTPClient;
|
||||
JobQueue mJobQueue;
|
||||
RPCHandler mRPCHandler;
|
||||
ProofOfWorkGenerator mPOWGen;
|
||||
LoadManager mLoadMgr;
|
||||
|
||||
DatabaseCon *mRpcDB, *mTxnDB, *mLedgerDB, *mWalletDB, *mHashNodeDB, *mNetNodeDB;
|
||||
|
||||
@@ -102,6 +107,8 @@ public:
|
||||
SuppressionTable& getSuppression() { return mSuppressions; }
|
||||
RPCHandler& getRPCHandler() { return mRPCHandler; }
|
||||
boost::recursive_mutex& getMasterLock() { return mMasterLock; }
|
||||
ProofOfWorkGenerator& getPowGen() { return mPOWGen; }
|
||||
LoadManager& getLoadManager() { return mLoadMgr; }
|
||||
|
||||
|
||||
bool isNew(const uint256& s) { return mSuppressions.addSuppression(s); }
|
||||
|
||||
@@ -101,6 +101,12 @@ bool LoadManager::shouldCutoff(LoadSource& source) const
|
||||
return !source.isPrivileged() && (source.mBalance < mDebitLimit);
|
||||
}
|
||||
|
||||
bool LoadManager::adjust(LoadSource& source, LoadType t) const
|
||||
{ // FIXME: Scale by category
|
||||
LoadCost cost = mCosts[static_cast<int>(t)];
|
||||
return adjust(source, cost.mCost);
|
||||
}
|
||||
|
||||
bool LoadManager::adjust(LoadSource& source, int credits) const
|
||||
{ // return: true = need to warn/cutoff
|
||||
time_t now = time(NULL);
|
||||
|
||||
@@ -15,6 +15,7 @@ enum LoadType
|
||||
LT_RequestNoReply, // A request that we cannot satisfy
|
||||
LT_InvalidSignature, // An object whose signature we had to check and it failed
|
||||
LT_UnwantedData, // Data we have no use for
|
||||
LT_BadPoW, // Proof of work not valid
|
||||
|
||||
// Good things
|
||||
LT_NewTrusted, // A new transaction/validation/proposal we trust
|
||||
@@ -89,7 +90,7 @@ protected:
|
||||
|
||||
public:
|
||||
|
||||
LoadManager(int creditRate, int creditLimit, int debitWarn, int debitLimit);
|
||||
LoadManager(int creditRate = 10, int creditLimit = 50, int debitWarn = -50, int debitLimit = -100);
|
||||
|
||||
int getCreditRate() const;
|
||||
int getCreditLimit() const;
|
||||
@@ -103,6 +104,7 @@ public:
|
||||
bool shouldWarn(LoadSource&) const;
|
||||
bool shouldCutoff(LoadSource&) const;
|
||||
bool adjust(LoadSource&, int credits) const; // return value: false=balance okay, true=warn/cutoff
|
||||
bool adjust(LoadSource&, LoadType l) const;
|
||||
|
||||
int getCost(LoadType t) { return mCosts[static_cast<int>(t)].mCost; }
|
||||
};
|
||||
|
||||
@@ -600,11 +600,11 @@ void Peer::processReadBuffer()
|
||||
}
|
||||
}
|
||||
|
||||
void Peer::punishPeer(const boost::weak_ptr<Peer>& wp, PeerPunish pp)
|
||||
void Peer::punishPeer(const boost::weak_ptr<Peer>& wp, LoadType l)
|
||||
{
|
||||
Peer::pointer p = wp.lock();
|
||||
if (p)
|
||||
p->punishPeer(pp);
|
||||
p->punishPeer(l);
|
||||
}
|
||||
|
||||
void Peer::recvHello(ripple::TMHello& packet)
|
||||
@@ -743,7 +743,7 @@ static void checkTransaction(Job&, int flags, SerializedTransaction::pointer stx
|
||||
if (tx->getStatus() == INVALID)
|
||||
{
|
||||
theApp->getSuppression().setFlag(stx->getTransactionID(), SF_BAD);
|
||||
Peer::punishPeer(peer, PP_BAD_SIGNATURE);
|
||||
Peer::punishPeer(peer, LT_InvalidSignature);
|
||||
return;
|
||||
}
|
||||
else
|
||||
@@ -759,7 +759,7 @@ static void checkTransaction(Job&, int flags, SerializedTransaction::pointer stx
|
||||
catch (...)
|
||||
{
|
||||
theApp->getSuppression().setFlags(stx->getTransactionID(), SF_BAD);
|
||||
punishPeer(peer, PP_INVALID_REQUEST);
|
||||
punishPeer(peer, LT_InvalidRequest);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
@@ -783,7 +783,7 @@ void Peer::recvTransaction(ripple::TMTransaction& packet)
|
||||
{ // we have seen this transaction recently
|
||||
if ((flags & SF_BAD) != 0)
|
||||
{
|
||||
punishPeer(PP_BAD_SIGNATURE);
|
||||
punishPeer(LT_InvalidSignature);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -828,7 +828,7 @@ static void checkPropose(Job& job, boost::shared_ptr<ripple::TMProposeSet> packe
|
||||
if (!proposal->checkSign(set.signature()))
|
||||
{
|
||||
cLog(lsWARNING) << "proposal with previous ledger fails signature check";
|
||||
Peer::punishPeer(peer, PP_BAD_SIGNATURE);
|
||||
Peer::punishPeer(peer, LT_InvalidSignature);
|
||||
return;
|
||||
}
|
||||
else
|
||||
@@ -874,14 +874,14 @@ void Peer::recvPropose(const boost::shared_ptr<ripple::TMProposeSet>& packet)
|
||||
(set.signature().size() < 56) || (set.nodepubkey().size() > 128) || (set.signature().size() > 128))
|
||||
{
|
||||
cLog(lsWARNING) << "Received proposal is malformed";
|
||||
punishPeer(PP_INVALID_REQUEST);
|
||||
punishPeer(LT_InvalidSignature);
|
||||
return;
|
||||
}
|
||||
|
||||
if (set.has_previousledger() && (set.previousledger().size() != 32))
|
||||
{
|
||||
cLog(lsWARNING) << "Received proposal is malformed";
|
||||
punishPeer(PP_INVALID_REQUEST);
|
||||
punishPeer(LT_InvalidRequest);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -930,7 +930,7 @@ void Peer::recvHaveTxSet(ripple::TMHaveTransactionSet& packet)
|
||||
uint256 hashes;
|
||||
if (packet.hash().size() != (256 / 8))
|
||||
{
|
||||
punishPeer(PP_INVALID_REQUEST);
|
||||
punishPeer(LT_InvalidRequest);
|
||||
return;
|
||||
}
|
||||
uint256 hash;
|
||||
@@ -938,7 +938,7 @@ void Peer::recvHaveTxSet(ripple::TMHaveTransactionSet& packet)
|
||||
if (packet.status() == ripple::tsHAVE)
|
||||
addTxSet(hash);
|
||||
if (!theApp->getOPs().hasTXSet(shared_from_this(), hash, packet.status()))
|
||||
punishPeer(PP_UNWANTED_DATA);
|
||||
punishPeer(LT_UnwantedData);
|
||||
}
|
||||
|
||||
static void checkValidation(Job&, SerializedValidation::pointer val, uint256 signingHash,
|
||||
@@ -951,7 +951,7 @@ static void checkValidation(Job&, SerializedValidation::pointer val, uint256 sig
|
||||
if (!val->isValid(signingHash))
|
||||
{
|
||||
cLog(lsWARNING) << "Validation is invalid";
|
||||
Peer::punishPeer(peer, PP_UNKNOWN_REQUEST);
|
||||
Peer::punishPeer(peer, LT_InvalidRequest);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -966,7 +966,7 @@ static void checkValidation(Job&, SerializedValidation::pointer val, uint256 sig
|
||||
catch (...)
|
||||
{
|
||||
cLog(lsWARNING) << "Exception processing validation";
|
||||
Peer::punishPeer(peer, PP_UNKNOWN_REQUEST);
|
||||
Peer::punishPeer(peer, LT_InvalidRequest);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
@@ -976,7 +976,7 @@ void Peer::recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet)
|
||||
if (packet->validation().size() < 50)
|
||||
{
|
||||
cLog(lsWARNING) << "Too small validation from peer";
|
||||
punishPeer(PP_UNKNOWN_REQUEST);
|
||||
punishPeer(LT_InvalidRequest);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1004,7 +1004,7 @@ void Peer::recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet)
|
||||
catch (...)
|
||||
{
|
||||
cLog(lsWARNING) << "Exception processing validation";
|
||||
punishPeer(PP_UNKNOWN_REQUEST);
|
||||
punishPeer(LT_InvalidRequest);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
@@ -1138,10 +1138,32 @@ void Peer::recvAccount(ripple::TMAccount& packet)
|
||||
|
||||
void Peer::recvProofWork(ripple::TMProofWork& packet)
|
||||
{
|
||||
if (packet.has_response())
|
||||
{ // this is an answer to a proof of work we requested
|
||||
if (packet.response().size() != (256 / 8))
|
||||
{
|
||||
punishPeer(LT_InvalidRequest);
|
||||
return;
|
||||
}
|
||||
uint256 response;
|
||||
memcpy(response.begin(), packet.response().data(), 256 / 8);
|
||||
POWResult r = theApp->getPowGen().checkProof(packet.token(), response);
|
||||
if (r == powOK)
|
||||
{
|
||||
// credit peer
|
||||
// WRITEME
|
||||
return;
|
||||
}
|
||||
// return error message
|
||||
// WRITEME
|
||||
if (r != powTOOEASY)
|
||||
punishPeer(LT_BadPoW);
|
||||
return;
|
||||
}
|
||||
|
||||
if (packet.has_result())
|
||||
{ // this is a reply to a proof of work we sent
|
||||
// WRITEME
|
||||
return;
|
||||
}
|
||||
|
||||
if (packet.has_target() && packet.has_challenge() && packet.has_iterations())
|
||||
@@ -1151,7 +1173,7 @@ void Peer::recvProofWork(ripple::TMProofWork& packet)
|
||||
uint256 challenge, target;
|
||||
if ((packet.challenge().size() != (256 / 8)) || (packet.target().size() != (256 / 8)))
|
||||
{
|
||||
punishPeer(PP_INVALID_REQUEST);
|
||||
punishPeer(LT_InvalidRequest);
|
||||
return;
|
||||
}
|
||||
memcpy(challenge.begin(), packet.challenge().data(), 256 / 8);
|
||||
@@ -1160,7 +1182,7 @@ void Peer::recvProofWork(ripple::TMProofWork& packet)
|
||||
challenge, target);
|
||||
if (!pow->isValid())
|
||||
{
|
||||
punishPeer(PP_INVALID_REQUEST);
|
||||
punishPeer(LT_InvalidRequest);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1232,7 +1254,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet)
|
||||
cLog(lsINFO) << "Received request for TX candidate set data " << getIP();
|
||||
if ((!packet.has_ledgerhash() || packet.ledgerhash().size() != 32))
|
||||
{
|
||||
punishPeer(PP_INVALID_REQUEST);
|
||||
punishPeer(LT_InvalidRequest);
|
||||
cLog(lsWARNING) << "invalid request";
|
||||
return;
|
||||
}
|
||||
@@ -1263,7 +1285,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet)
|
||||
return;
|
||||
}
|
||||
cLog(lsERROR) << "We do not have the map our peer wants";
|
||||
punishPeer(PP_INVALID_REQUEST);
|
||||
punishPeer(LT_InvalidRequest);
|
||||
return;
|
||||
}
|
||||
reply.set_ledgerseq(0);
|
||||
@@ -1281,7 +1303,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet)
|
||||
uint256 ledgerhash;
|
||||
if (packet.ledgerhash().size() != 32)
|
||||
{
|
||||
punishPeer(PP_INVALID_REQUEST);
|
||||
punishPeer(LT_InvalidRequest);
|
||||
cLog(lsWARNING) << "Invalid request";
|
||||
return;
|
||||
}
|
||||
@@ -1326,14 +1348,14 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet)
|
||||
}
|
||||
else
|
||||
{
|
||||
punishPeer(PP_INVALID_REQUEST);
|
||||
punishPeer(LT_InvalidRequest);
|
||||
cLog(lsWARNING) << "Can't figure out what ledger they want";
|
||||
return;
|
||||
}
|
||||
|
||||
if ((!ledger) || (packet.has_ledgerseq() && (packet.ledgerseq() != ledger->getLedgerSeq())))
|
||||
{
|
||||
punishPeer(PP_UNKNOWN_REQUEST);
|
||||
punishPeer(LT_InvalidRequest);
|
||||
if (sLog(lsWARNING))
|
||||
{
|
||||
if (ledger)
|
||||
@@ -1391,7 +1413,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet)
|
||||
if ((!map) || (packet.nodeids_size() == 0))
|
||||
{
|
||||
cLog(lsWARNING) << "Can't find map or empty request";
|
||||
punishPeer(PP_INVALID_REQUEST);
|
||||
punishPeer(LT_InvalidRequest);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1401,7 +1423,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet)
|
||||
if(!mn.isValid())
|
||||
{
|
||||
cLog(lsWARNING) << "Request for invalid node";
|
||||
punishPeer(PP_INVALID_REQUEST);
|
||||
punishPeer(LT_InvalidRequest);
|
||||
return;
|
||||
}
|
||||
std::vector<SHAMapNode> nodeIDs;
|
||||
@@ -1434,7 +1456,7 @@ void Peer::recvLedger(ripple::TMLedgerData& packet)
|
||||
if (packet.nodes().size() <= 0)
|
||||
{
|
||||
cLog(lsWARNING) << "Ledger/TXset data with no nodes";
|
||||
punishPeer(PP_INVALID_REQUEST);
|
||||
punishPeer(LT_InvalidRequest);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1449,7 +1471,7 @@ void Peer::recvLedger(ripple::TMLedgerData& packet)
|
||||
else
|
||||
{
|
||||
cLog(lsINFO) << "Unable to route TX/ledger data reply";
|
||||
punishPeer(PP_UNWANTED_DATA);
|
||||
punishPeer(LT_UnwantedData);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@@ -1460,7 +1482,7 @@ void Peer::recvLedger(ripple::TMLedgerData& packet)
|
||||
if(packet.ledgerhash().size() != 32)
|
||||
{
|
||||
cLog(lsWARNING) << "TX candidate reply with invalid hash size";
|
||||
punishPeer(PP_INVALID_REQUEST);
|
||||
punishPeer(LT_InvalidRequest);
|
||||
return;
|
||||
}
|
||||
memcpy(hash.begin(), packet.ledgerhash().data(), 32);
|
||||
@@ -1475,7 +1497,7 @@ void Peer::recvLedger(ripple::TMLedgerData& packet)
|
||||
if (!node.has_nodeid() || !node.has_nodedata() || (node.nodeid().size() != 33))
|
||||
{
|
||||
cLog(lsWARNING) << "LedgerData request with invalid node ID";
|
||||
punishPeer(PP_INVALID_REQUEST);
|
||||
punishPeer(LT_InvalidRequest);
|
||||
return;
|
||||
}
|
||||
nodeIDs.push_back(SHAMapNode(node.nodeid().data(), node.nodeid().size()));
|
||||
@@ -1483,13 +1505,13 @@ void Peer::recvLedger(ripple::TMLedgerData& packet)
|
||||
}
|
||||
SMAddNode san = theApp->getOPs().gotTXData(shared_from_this(), hash, nodeIDs, nodeData);
|
||||
if (san.isInvalid())
|
||||
punishPeer(PP_UNWANTED_DATA);
|
||||
punishPeer(LT_UnwantedData);
|
||||
return;
|
||||
}
|
||||
|
||||
SMAddNode san = theApp->getMasterLedgerAcquire().gotLedgerData(packet, shared_from_this());
|
||||
if (san.isInvalid())
|
||||
punishPeer(PP_UNWANTED_DATA);
|
||||
punishPeer(LT_UnwantedData);
|
||||
}
|
||||
|
||||
bool Peer::hasLedger(const uint256& hash) const
|
||||
@@ -1603,8 +1625,12 @@ void Peer::sendGetPeers()
|
||||
sendPacket(packet);
|
||||
}
|
||||
|
||||
void Peer::punishPeer(PeerPunish)
|
||||
void Peer::punishPeer(LoadType l)
|
||||
{
|
||||
if (theApp->getLoadManager().adjust(mLoad, l))
|
||||
{
|
||||
// WRITEME
|
||||
}
|
||||
}
|
||||
|
||||
void Peer::doProofOfWork(Job&, boost::weak_ptr<Peer> peer, ProofOfWork::pointer pow)
|
||||
|
||||
@@ -15,22 +15,7 @@
|
||||
#include "InstanceCounter.h"
|
||||
#include "JobQueue.h"
|
||||
#include "ProofOfWork.h"
|
||||
|
||||
enum PeerPunish
|
||||
{
|
||||
PP_INVALID_REQUEST = 1, // The peer sent a request that makes no sense
|
||||
PP_UNKNOWN_REQUEST = 2, // The peer sent a request that might be garbage
|
||||
PP_UNWANTED_DATA = 3, // The peer sent us data we didn't want/need
|
||||
PP_BAD_SIGNATURE = 4, // Object had bad signature
|
||||
};
|
||||
|
||||
enum PeerReward
|
||||
{
|
||||
PR_NEEDED_DATA = 1, // The peer gave us some data we needed
|
||||
PR_NEW_TRANSACTION = 2, // The peer gave us a new transaction
|
||||
PR_FIRST_USEFUL = 3, // The peer was first to give us something like a trusted proposal
|
||||
PR_USEFUL = 4 // The peer gave us a trusted proposal, just not quite first
|
||||
};
|
||||
#include "LoadManager.h"
|
||||
|
||||
typedef std::pair<std::string,int> ipPort;
|
||||
|
||||
@@ -57,6 +42,7 @@ private:
|
||||
uint256 mCookieHash;
|
||||
uint64 mPeerId;
|
||||
bool mPrivate; // Keep peer IP private.
|
||||
LoadSource mLoad;
|
||||
|
||||
uint256 mClosedLedgerHash, mPreviousLedgerHash;
|
||||
std::list<uint256> mRecentLedgers;
|
||||
@@ -151,8 +137,8 @@ public:
|
||||
void sendGetFullLedger(uint256& hash);
|
||||
void sendGetPeers();
|
||||
|
||||
void punishPeer(PeerPunish pp);
|
||||
static void punishPeer(const boost::weak_ptr<Peer>&, PeerPunish);
|
||||
void punishPeer(LoadType);
|
||||
static void punishPeer(const boost::weak_ptr<Peer>&, LoadType);
|
||||
|
||||
Json::Value getJson();
|
||||
bool isConnected() const { return mHelloed && !mDetaching; }
|
||||
|
||||
Reference in New Issue
Block a user