About half of clustering support. We need this so our own nodes don't send each other proof

of work requests when we're under load.
This commit is contained in:
JoelKatz
2013-01-12 01:41:46 -08:00
parent 0d49bc877e
commit 0fabbc4f18
6 changed files with 98 additions and 30 deletions

View File

@@ -123,6 +123,16 @@
# Examples: RASH BUSH MILK LOOK BAD BRIM AVID GAFF BAIT ROT POD LOVE
# shfArahZT9Q9ckTf3s1psJ7C7qzVN
#
# [node_seed]:
# To force a particular node seed or key, the key can be set here.
# The format is the same as the validation_seed field. The need is used for clustering.
# Node seeds start with an 's'.
#
# [cluster_nodes]:
# To extend full trust to other nodes, place their node public keys here.
# Generally, you should only do this for nodes under common administration.
# Node public keys start with an 'n'.
#
# [ledger_history]:
# To serve clients, servers need historical ledger data. This sets the number of
# past ledgers to acquire on server startup and the minimum to maintain while

View File

@@ -13,6 +13,7 @@
#include <algorithm>
#define SECTION_ACCOUNT_PROBE_MAX "account_probe_max"
#define SECTION_CLUSTER_NODES "cluster_nodes"
#define SECTION_DATABASE_PATH "database_path"
#define SECTION_DEBUG_LOGFILE "debug_logfile"
#define SECTION_FEE_DEFAULT "fee_default"
@@ -24,6 +25,7 @@
#define SECTION_LEDGER_HISTORY "ledger_history"
#define SECTION_IPS "ips"
#define SECTION_NETWORK_QUORUM "network_quorum"
#define SECTION_NODE_SEED "node_seed"
#define SECTION_PEER_CONNECT_LOW_WATER "peer_connect_low_water"
#define SECTION_PEER_IP "peer_ip"
#define SECTION_PEER_PORT "peer_port"
@@ -247,6 +249,13 @@ void Config::load()
// sectionEntriesPrint(&VALIDATORS, SECTION_VALIDATORS);
}
smtTmp = sectionEntries(secConfig, SECTION_CLUSTER_NODES);
if (smtTmp)
{
CLUSTER_NODES = *smtTmp;
// sectionEntriesPrint(&CLUSTER_NODES, SECTION_CLUSTER_NODES);
}
smtTmp = sectionEntries(secConfig, SECTION_IPS);
if (smtTmp)
{
@@ -310,6 +319,15 @@ void Config::load()
VALIDATION_PRIV = RippleAddress::createNodePrivate(VALIDATION_SEED);
}
}
if (sectionSingleB(secConfig, SECTION_NODE_SEED, strTemp))
{
NODE_SEED.setSeedGeneric(strTemp);
if (NODE_SEED.isValid())
{
NODE_PUB = RippleAddress::createNodePublic(NODE_SEED);
NODE_PRIV = RippleAddress::createNodePrivate(NODE_SEED);
}
}
(void) sectionSingleB(secConfig, SECTION_PEER_SSL_CIPHER_LIST, PEER_SSL_CIPHER_LIST);

View File

@@ -114,6 +114,10 @@ public:
// Validation
RippleAddress VALIDATION_SEED, VALIDATION_PUB, VALIDATION_PRIV;
// Node/Cluster
std::vector<std::string> CLUSTER_NODES;
RippleAddress NODE_SEED, NODE_PUB, NODE_PRIV;
// Fee schedule (All below values are in fee units)
uint64 FEE_DEFAULT; // Default fee.
uint64 FEE_ACCOUNT_RESERVE; // Amount of units not allowed to send.

View File

@@ -64,7 +64,16 @@ void UniqueNodeList::start()
// Load information about when we last updated.
bool UniqueNodeList::miscLoad()
{
ScopedLock sl(theApp->getWalletDB()->getDBLock());
BOOST_FOREACH(const std::string& node, theConfig.CLUSTER_NODES)
{
RippleAddress a = RippleAddress::createNodePublic(node);
if (a.isValid())
sClusterNodes.insert(a);
else
cLog(lsWARNING) << "Entry in cluster list invalid: '" << node << "'";
}
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
Database *db=theApp->getWalletDB()->getDB();
if (!db->executeSQL("SELECT * FROM Misc WHERE Magic=1;")) return false;
@@ -85,7 +94,7 @@ bool UniqueNodeList::miscLoad()
bool UniqueNodeList::miscSave()
{
Database* db=theApp->getWalletDB()->getDB();
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
db->executeSQL(str(boost::format("REPLACE INTO Misc (Magic,FetchUpdated,ScoreUpdated) VALUES (1,%d,%d);")
% iToSeconds(mtpFetchUpdated)
@@ -96,9 +105,18 @@ bool UniqueNodeList::miscSave()
void UniqueNodeList::trustedLoad()
{
BOOST_FOREACH(const std::string& node, theConfig.CLUSTER_NODES)
{
RippleAddress a = RippleAddress::createNodePublic(node);
if (a.isValid())
sClusterNodes.insert(a);
else
cLog(lsWARNING) << "Entry in cluster list invalid: '" << node << "'";
}
Database* db=theApp->getWalletDB()->getDB();
ScopedLock sl(theApp->getWalletDB()->getDBLock());
ScopedLock slUNL(mUNLLock);
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock slUNL(mUNLLock);
mUNL.clear();
@@ -187,7 +205,7 @@ void UniqueNodeList::scoreCompute()
// For each entry in SeedDomains with a PublicKey:
// - Add an entry in umPulicIdx, umDomainIdx, and vsnNodes.
{
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
SQL_FOREACH(db, "SELECT Domain,PublicKey,Source FROM SeedDomains;")
{
@@ -240,7 +258,7 @@ void UniqueNodeList::scoreCompute()
// For each entry in SeedNodes:
// - Add an entry in umPulicIdx, umDomainIdx, and vsnNodes.
{
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
SQL_FOREACH(db, "SELECT PublicKey,Source FROM SeedNodes;")
{
@@ -304,7 +322,7 @@ void UniqueNodeList::scoreCompute()
std::string& strValidator = sn.strValidator;
std::vector<int>& viReferrals = sn.viReferrals;
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
SQL_FOREACH(db, boost::str(boost::format("SELECT Referral FROM ValidatorReferrals WHERE Validator=%s ORDER BY Entry;")
% sqlEscape(strValidator)))
@@ -385,7 +403,7 @@ void UniqueNodeList::scoreCompute()
}
// Persist validator scores.
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
db->executeSQL("BEGIN;");
db->executeSQL("UPDATE TrustedNodes SET Score = 0 WHERE Score != 0;");
@@ -436,7 +454,7 @@ void UniqueNodeList::scoreCompute()
}
{
ScopedLock sl(mUNLLock);
boost::recursive_mutex::scoped_lock sl(mUNLLock);
// XXX Should limit to scores above a certain minimum and limit to a certain number.
mUNL.swap(usUNL);
@@ -622,7 +640,7 @@ void UniqueNodeList::processIps(const std::string& strSite, const RippleAddress&
// Remove all current Validator's entries in IpReferrals
{
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
db->executeSQL(str(boost::format("DELETE FROM IpReferrals WHERE Validator=%s;") % strEscNodePublic));
// XXX Check result.
}
@@ -664,7 +682,7 @@ void UniqueNodeList::processIps(const std::string& strSite, const RippleAddress&
{
vstrValues.resize(iValues);
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
db->executeSQL(str(boost::format("INSERT INTO IpReferrals (Validator,Entry,IP,Port) VALUES %s;")
% strJoin(vstrValues.begin(), vstrValues.end(), ",")));
// XXX Check result.
@@ -693,7 +711,7 @@ int UniqueNodeList::processValidators(const std::string& strSite, const std::str
// Remove all current Validator's entries in ValidatorReferrals
{
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
db->executeSQL(str(boost::format("DELETE FROM ValidatorReferrals WHERE Validator='%s';") % strNodePublic));
// XXX Check result.
@@ -764,7 +782,7 @@ int UniqueNodeList::processValidators(const std::string& strSite, const std::str
std::string strSql = str(boost::format("INSERT INTO ValidatorReferrals (Validator,Entry,Referral) VALUES %s;")
% strJoin(vstrValues.begin(), vstrValues.end(), ","));
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
db->executeSQL(strSql);
// XXX Check result.
@@ -1056,7 +1074,7 @@ void UniqueNodeList::fetchNext()
boost::posix_time::ptime tpNext;
boost::posix_time::ptime tpNow;
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
Database *db=theApp->getWalletDB()->getDB();
if (db->executeSQL("SELECT Domain,Next FROM SeedDomains INDEXED BY SeedDomainNext ORDER BY Next LIMIT 1;")
@@ -1156,7 +1174,7 @@ bool UniqueNodeList::getSeedDomains(const std::string& strDomain, seedDomain& ds
std::string strSql = boost::str(boost::format("SELECT * FROM SeedDomains WHERE Domain=%s;")
% sqlEscape(strDomain));
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
bResult = db->executeSQL(strSql) && db->startIterRows();
if (bResult)
@@ -1226,7 +1244,7 @@ void UniqueNodeList::setSeedDomains(const seedDomain& sdSource, bool bNext)
% sqlEscape(sdSource.strComment)
);
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
if (!db->executeSQL(strSql))
{
@@ -1294,7 +1312,7 @@ bool UniqueNodeList::getSeedNodes(const RippleAddress& naNodePublic, seedNode& d
std::string strSql = str(boost::format("SELECT * FROM SeedNodes WHERE PublicKey='%s';")
% naNodePublic.humanNodePublic());
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
bResult = db->executeSQL(strSql) && db->startIterRows();
if (bResult)
@@ -1366,7 +1384,7 @@ void UniqueNodeList::setSeedNodes(const seedNode& snSource, bool bNext)
);
{
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
if (!db->executeSQL(strSql))
{
@@ -1424,7 +1442,7 @@ void UniqueNodeList::nodeRemovePublic(const RippleAddress& naNodePublic)
{
{
Database* db=theApp->getWalletDB()->getDB();
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
db->executeSQL(str(boost::format("DELETE FROM SeedNodes WHERE PublicKey=%s") % sqlEscape(naNodePublic.humanNodePublic())));
}
@@ -1440,7 +1458,7 @@ void UniqueNodeList::nodeRemoveDomain(std::string strDomain)
{
Database* db=theApp->getWalletDB()->getDB();
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
db->executeSQL(str(boost::format("DELETE FROM SeedDomains WHERE Domain=%s") % sqlEscape(strDomain)));
}
@@ -1454,7 +1472,7 @@ void UniqueNodeList::nodeReset()
{
Database* db=theApp->getWalletDB()->getDB();
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
// XXX Check results.
db->executeSQL("DELETE FROM SeedDomains");
@@ -1470,7 +1488,7 @@ Json::Value UniqueNodeList::getUnlJson()
Json::Value ret(Json::arrayValue);
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
SQL_FOREACH(db, "SELECT * FROM TrustedNodes;")
{
Json::Value node(Json::objectValue);
@@ -1571,7 +1589,7 @@ void UniqueNodeList::nodeBootstrap()
Database* db = theApp->getWalletDB()->getDB();
{
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
if (db->executeSQL(str(boost::format("SELECT COUNT(*) AS Count FROM SeedDomains WHERE Source='%s' OR Source='%c';") % vsManual % vsValidator)) && db->startIterRows())
iDomains = db->getInt("Count");
@@ -1640,7 +1658,7 @@ void UniqueNodeList::nodeBootstrap()
if (!vstrValues.empty())
{
ScopedLock sl(theApp->getWalletDB()->getDBLock());
boost::recursive_mutex::scoped_lock sl(theApp->getWalletDB()->getDBLock());
db->executeSQL(str(boost::format("REPLACE INTO PeerIps (IpPort,Source) VALUES %s;")
% strJoin(vstrValues.begin(), vstrValues.end(), ",")));
@@ -1673,8 +1691,15 @@ void UniqueNodeList::nodeProcess(const std::string& strSite, const std::string&
bool UniqueNodeList::nodeInUNL(const RippleAddress& naNodePublic)
{
ScopedLock sl(mUNLLock);
boost::recursive_mutex::scoped_lock sl(mUNLLock);
return mUNL.end() != mUNL.find(naNodePublic.humanNodePublic());
}
bool UniqueNodeList::nodeInCluster(const RippleAddress& naNodePublic)
{
boost::recursive_mutex::scoped_lock sl(mUNLLock);
return sClusterNodes.count(naNodePublic) != 0;
}
// vim:ts=4

View File

@@ -2,6 +2,11 @@
#define __UNIQUE_NODE_LIST__
#include <deque>
#include <set>
#include <boost/thread/recursive_mutex.hpp>
#include <boost/unordered_map.hpp>
#include <boost/unordered_set.hpp>
#include "../json/value.h"
@@ -10,11 +15,7 @@
#include "HttpsClient.h"
#include "ParseSection.h"
#include <boost/thread/recursive_mutex.hpp>
#include <boost/unordered_map.hpp>
#include <boost/unordered_set.hpp>
// Guarantees minimum thoughput of 1 node per second.
// Guarantees minimum throughput of 1 node per second.
#define NODE_FETCH_JOBS 10
#define NODE_FETCH_SECONDS 10
#define NODE_FILE_BYTES_MAX (50<<10) // 50k
@@ -87,6 +88,8 @@ private:
std::vector<int> viReferrals;
} scoreNode;
std::set<RippleAddress> sClusterNodes;
typedef boost::unordered_map<std::string,int> strIndex;
typedef std::pair<std::string,int> ipPort;
typedef boost::unordered_map<std::pair< std::string, int>, score> epScore;
@@ -151,6 +154,7 @@ public:
void nodeScore();
bool nodeInUNL(const RippleAddress& naNodePublic);
bool nodeInCluster(const RippleAddress& naNodePublic);
void nodeBootstrap();
bool nodeLoad(boost::filesystem::path pConfig);

View File

@@ -38,6 +38,7 @@ void Wallet::start()
// Retrieve network identity.
bool Wallet::nodeIdentityLoad()
{
Database* db=theApp->getWalletDB()->getDB();
ScopedLock sl(theApp->getWalletDB()->getDBLock());
bool bSuccess = false;
@@ -59,6 +60,12 @@ bool Wallet::nodeIdentityLoad()
bSuccess = true;
}
if (theConfig.NODE_PUB.isValid() && theConfig.NODE_PRIV.isValid())
{
mNodePublicKey = theConfig.NODE_PUB;
mNodePrivateKey = theConfig.NODE_PRIV;
}
return bSuccess;
}