diff --git a/SConstruct b/SConstruct index 820d9e2328..34908985e5 100644 --- a/SConstruct +++ b/SConstruct @@ -68,7 +68,7 @@ PROTO_SRCS = env.Protoc([], 'src/newcoin.proto', PROTOCOUTDIR='obj', PROTOCP env.Clean(PROTO_SRCS, 'site_scons/site_tools/protoc.pyc') # Remove unused source files. -UNUSED_SRCS = ['src/HttpReply.cpp', 'src/ValidationCollection.cpp'] +UNUSED_SRCS = ['src/HttpReply.cpp'] for file in UNUSED_SRCS: NEWCOIN_SRCS.remove(file) diff --git a/newcoind.cfg b/newcoind.cfg index 77c942b2a8..b97fef5f2a 100644 --- a/newcoind.cfg +++ b/newcoind.cfg @@ -119,3 +119,6 @@ [validation_seed] snTBDmrhUK3znvF8AaQURLm4UCkbS + +[unl_default] +validators.txt diff --git a/src/Application.h b/src/Application.h index 1771aa63e4..7d9ada8ea5 100644 --- a/src/Application.h +++ b/src/Application.h @@ -13,6 +13,7 @@ #include "Peer.h" #include "NetworkOPs.h" #include "TaggedCache.h" +#include "ValidationCollection.h" #include "../database/database.h" @@ -44,6 +45,7 @@ class Application TransactionMaster mMasterTransaction; NetworkOPs mNetOps; NodeCache mNodeCache; + ValidationCollection mValidations; DatabaseCon *mTxnDB, *mLedgerDB, *mWalletDB, *mHashNodeDB, *mNetNodeDB; @@ -74,6 +76,7 @@ public: LedgerAcquireMaster& getMasterLedgerAcquire() { return mMasterLedgerAcquire; } TransactionMaster& getMasterTransaction() { return mMasterTransaction; } NodeCache& getNodeCache() { return mNodeCache; } + ValidationCollection& getValidations() { return mValidations; } DatabaseCon* getTxnDB() { return mTxnDB; } DatabaseCon* getLedgerDB() { return mLedgerDB; } diff --git a/src/ConnectionPool.cpp b/src/ConnectionPool.cpp index 728795c41b..cfdad9432a 100644 --- a/src/ConnectionPool.cpp +++ b/src/ConnectionPool.cpp @@ -11,6 +11,9 @@ #include #include +// How often to enforce policies. +#define POLICY_INTERVAL_SECONDS 5 + void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort) { std::vector vIpPort; @@ -23,7 +26,8 @@ void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort) ConnectionPool::ConnectionPool(boost::asio::io_service& io_service) : mCtx(boost::asio::ssl::context::sslv23), bScanning(false), - mScanTimer(io_service) + mScanTimer(io_service), + mPolicyTimer(io_service) { mCtx.set_options( boost::asio::ssl::context::default_workarounds @@ -36,7 +40,7 @@ ConnectionPool::ConnectionPool(boost::asio::io_service& io_service) : void ConnectionPool::start() { - // XXX Start running policy. + // Start running policy. policyEnforce(); // Start scanning. @@ -157,7 +161,41 @@ void ConnectionPool::policyLowWater() void ConnectionPool::policyEnforce() { + boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time(); + + std::cerr << "policyEnforce: begin: " << tpNow << std::endl; + + // Cancel any in progrss timer. + (void) mPolicyTimer.cancel(); + + // Enforce policies. policyLowWater(); + + // Schedule next enforcement. + boost::posix_time::ptime tpNext; + + tpNext = boost::posix_time::second_clock::universal_time()+boost::posix_time::seconds(POLICY_INTERVAL_SECONDS); + + std::cerr << "policyEnforce: schedule : " << tpNext << std::endl; + + mPolicyTimer.expires_at(tpNext); + mPolicyTimer.async_wait(boost::bind(&ConnectionPool::policyHandler, this, _1)); +} + +void ConnectionPool::policyHandler(const boost::system::error_code& ecResult) +{ + if (ecResult == boost::asio::error::operation_aborted) + { + nothing(); + } + else if (!ecResult) + { + policyEnforce(); + } + else + { + throw std::runtime_error("Internal error: unexpected deadline error."); + } } // XXX Broken: also don't send a message to a peer if we got it from the peer. diff --git a/src/ConnectionPool.h b/src/ConnectionPool.h index 0c84c20971..656bbf1b94 100644 --- a/src/ConnectionPool.h +++ b/src/ConnectionPool.h @@ -34,6 +34,10 @@ private: void scanHandler(const boost::system::error_code& ecResult); + boost::asio::deadline_timer mPolicyTimer; + + void policyHandler(const boost::system::error_code& ecResult); + // Peers we are establishing a connection with as a client. // int miConnectStarting; diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index 2883864dba..7c84b258cf 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -281,7 +281,7 @@ void LedgerConsensus::mapComplete(const uint256& hash, SHAMap::pointer map) } if (!peers.empty()) adjustCount(map, peers); - else + else if (!hash) Log(lsWARNING) << "By the time we got the map, no peers were proposing it"; sendHaveTxSet(hash, true); @@ -807,8 +807,11 @@ void LedgerConsensus::accept(SHAMap::pointer set) } #endif - SerializedValidation v(newLCLHash, mOurPosition->peekSeed(), true); - std::vector validation = v.getSigned(); + SerializedValidation::pointer v = boost::make_shared + (newLCLHash, mOurPosition->peekSeed(), true); + v->setTrusted(); + theApp->getValidations().addValidation(v); + std::vector validation = v->getSigned(); newcoin::TMValidation val; val.set_validation(&validation[0], validation.size()); theApp->getConnectionPool().relayMessage(NULL, boost::make_shared(val, newcoin::mtVALIDATION)); diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index 479cbb35ab..60d5b226a4 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -308,11 +308,18 @@ void NetworkOPs::checkState(const boost::system::error_code& result) if (!!peerLedger) { // FIXME: If we have this ledger, don't count it if it's too far past its close time + bool isNew = ledgers.find(peerLedger) == ledgers.end(); ValidationCount& vc = ledgers[peerLedger]; + if (isNew) + { + theApp->getValidations().getValidationCount(peerLedger, + vc.trustedValidations, vc.untrustedValidations); + Log(lsTRACE) << peerLedger.GetHex() << " has " << vc.trustedValidations << + " trusted validations and " << vc.untrustedValidations << " untrusted"; + } if ((vc.nodesUsing == 0) || ((*it)->getNodePublic() > vc.highNode)) vc.highNode = (*it)->getNodePublic(); ++vc.nodesUsing; - // WRITEME: Validations, trusted peers } } } @@ -533,8 +540,8 @@ void NetworkOPs::setMode(OperatingMode om) if (mMode == om) return; Log l((om < mMode) ? lsWARNING : lsINFO); if (om == omDISCONNECTED) l << "STATE->Disonnected"; - else if (om==omCONNECTED) l << "STATE->Connected"; - else if (om==omTRACKING) l << "STATE->Tracking"; + else if (om == omCONNECTED) l << "STATE->Connected"; + else if (om == omTRACKING) l << "STATE->Tracking"; else l << "STATE->Full"; mMode = om; } @@ -562,4 +569,9 @@ std::vector< std::pair > return affectedAccounts; } +bool NetworkOPs::recvValidation(SerializedValidation::pointer val) +{ + return theApp->getValidations().addValidation(val); +} + // vim:ts=4 diff --git a/src/NetworkOPs.h b/src/NetworkOPs.h index 0514a0902a..5fdea311ed 100644 --- a/src/NetworkOPs.h +++ b/src/NetworkOPs.h @@ -5,6 +5,7 @@ #include "AccountState.h" #include "RippleState.h" #include "NicknameState.h" +#include "SerializedValidation.h" // #include @@ -115,6 +116,7 @@ public: const std::string& pubKey, const std::string& signature); bool gotTXData(boost::shared_ptr peer, const uint256& hash, const std::list& nodeIDs, const std::list< std::vector >& nodeData); + bool recvValidation(SerializedValidation::pointer val); SHAMap::pointer getTXMap(const uint256& hash); bool hasTXSet(boost::shared_ptr peer, const uint256& set, newcoin::TxSetStatus status); void mapComplete(const uint256& hash, SHAMap::pointer map); diff --git a/src/Peer.cpp b/src/Peer.cpp index a81196b140..6cfd008437 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -595,7 +595,7 @@ void Peer::recvTransaction(newcoin::TMTransaction& packet) { #endif std::string rawTx = packet.rawtransaction(); - Serializer s(std::vector(rawTx.begin(), rawTx.end())); + Serializer s(rawTx); SerializerIterator sit(s); SerializedTransaction::pointer stx = boost::make_shared(boost::ref(sit)); @@ -668,6 +668,31 @@ void Peer::recvHaveTxSet(newcoin::TMHaveTransactionSet& packet) void Peer::recvValidation(newcoin::TMValidation& packet) { + if (packet.validation().size() < 50) + { + punishPeer(PP_UNKNOWN_REQUEST); + return; + } + try + { + Serializer s(packet.validation()); + SerializerIterator sit(s); + SerializedValidation::pointer val = boost::make_shared(boost::ref(sit)); + if (!val->isValid()) + { + punishPeer(PP_UNKNOWN_REQUEST); + return; + } + if (theApp->getOPs().recvValidation(val)) + { + PackedMessage::pointer message = boost::make_shared(packet, newcoin::mtVALIDATION); + theApp->getConnectionPool().relayMessage(this, message); + } + } + catch (...) + { + punishPeer(PP_UNKNOWN_REQUEST); + } } void Peer::recvGetValidation(newcoin::TMGetValidations& packet) diff --git a/src/SerializedValidation.cpp b/src/SerializedValidation.cpp index a6c8e8c040..7647be4833 100644 --- a/src/SerializedValidation.cpp +++ b/src/SerializedValidation.cpp @@ -13,13 +13,13 @@ const uint32 SerializedValidation::sFullFlag = 0x00010000; const uint32 SerializedValidation::sValidationMagic = 0x4c575200; // "LGR" SerializedValidation::SerializedValidation(SerializerIterator& sit, bool checkSignature) - : STObject(sValidationFormat, sit), mSignature(sit, "Signature") + : STObject(sValidationFormat, sit), mSignature(sit, "Signature"), mTrusted(false) { if (!isValid()) throw std::runtime_error("Invalid validation"); } SerializedValidation::SerializedValidation(const uint256& ledgerHash, const NewcoinAddress& naSeed, bool isFull) - : STObject(sValidationFormat), mSignature("Signature") + : STObject(sValidationFormat), mSignature("Signature"), mTrusted(false) { setValueFieldH256(sfLedgerHash, ledgerHash); setValueFieldVL(sfSigningKey, NewcoinAddress::createNodePublic(naSeed).getNodePublic()); diff --git a/src/SerializedValidation.h b/src/SerializedValidation.h index 594504c874..987a883234 100644 --- a/src/SerializedValidation.h +++ b/src/SerializedValidation.h @@ -8,6 +8,9 @@ class SerializedValidation : public STObject { protected: STVariableLength mSignature; + bool mTrusted; + + void setNode(); public: typedef boost::shared_ptr pointer; @@ -26,9 +29,11 @@ public: NewcoinAddress getSignerPublic() const; bool isValid() const; bool isFull() const; + bool isTrusted() const { return mTrusted; } CKey::pointer getSigningKey() const; uint256 getSigningHash() const; + void setTrusted() { mTrusted = true; } void addSigned(Serializer&) const; void addSignature(Serializer&) const; std::vector getSigned() const; diff --git a/src/Serializer.h b/src/Serializer.h index f21940cf8a..40613dd52b 100644 --- a/src/Serializer.h +++ b/src/Serializer.h @@ -24,6 +24,10 @@ public: Serializer(int n = 256) { mData.reserve(n); } Serializer(const std::vector &data) : mData(data) { ; } Serializer(const std::string& data) : mData(data.data(), (data.data()) + data.size()) { ; } + Serializer(std::vector::iterator begin, std::vector::iterator end) : + mData(begin, end) { ; } + Serializer(std::vector::const_iterator begin, std::vector::const_iterator end) : + mData(begin, end) { ; } // assemble functions int add8(unsigned char byte); diff --git a/src/UniqueNodeList.cpp b/src/UniqueNodeList.cpp index ae36513bce..c49bfc70e7 100644 --- a/src/UniqueNodeList.cpp +++ b/src/UniqueNodeList.cpp @@ -1532,7 +1532,7 @@ bool UniqueNodeList::nodeLoad(boost::filesystem::path pConfig) return false; } - nodeProcess("local", strValidators, pConfig.native()); + nodeProcess("local", strValidators, pConfig.string()); std::cerr << str(boost::format("Processing: %s") % pConfig) << std::endl; diff --git a/src/ValidationCollection.cpp b/src/ValidationCollection.cpp index 26dcc13de0..7da2747365 100644 --- a/src/ValidationCollection.cpp +++ b/src/ValidationCollection.cpp @@ -1,226 +1,50 @@ + #include "ValidationCollection.h" + #include "Application.h" -#include "Config.h" -#include "Conversion.h" -#include "Application.h" -#include -using namespace std; +#include "Log.h" -/* -We need to group validations into compatible groups. -We can make one super ledger of all the transactions in each compatible group. -Then we just have to check this ledger to see if a new ledger is compatible -This is also the ledger we hand back when we ask for the consensus ledger - -*/ -ValidationCollection::ValidationCollection() +bool ValidationCollection::addValidation(SerializedValidation::pointer val) { + if(theApp->getUNL().nodeInUNL(val->getSignerPublic())) + val->setTrusted(); + uint256 hash = val->getLedgerHash(); + uint160 node = val->getSignerPublic().getNodeID(); + + boost::mutex::scoped_lock sl(mValidationLock); + bool ret = mValidations[hash].insert(std::make_pair(node, val)).second; + if (ret) + Log(lsINFO) << "Val for " << hash.GetHex() << " from " << node.GetHex() << " added " << + (val->isTrusted() ? "trusted" : "UNtrusted"); + return ret; } -void ValidationCollection::save() +ValidationSet ValidationCollection::getValidations(const uint256& ledger) { - -} -void ValidationCollection::load() -{ - -} - -void ValidationCollection::addToDB(newcoin::Validation& valid,int weCare) -{ - Database* db=theApp->getDB(); - uint256 hash=protobufTo256(valid.hash()); - uint160 hanko=protobufTo160(valid.hanko()); - uint256 sig=protobufTo256(valid.sig()); - - string hashStr,hankoStr,sigStr; - db->escape(hash.begin(),hash.GetSerializeSize(),hashStr); - db->escape(hanko.begin(),hanko.GetSerializeSize(),hankoStr); - db->escape(sig.begin(),sig.GetSerializeSize(),sigStr); - string sql=strprintf("INSERT INTO Validations (LedgerIndex,Hash,Hanko,SeqNum,Sig,WeCare) values (%d,%s,%s,%d,%s,%d)",valid.ledgerindex(),hashStr.c_str(),hankoStr.c_str(),valid.seqnum(),sigStr.c_str(),weCare); - db->executeSQL(sql); - -} - -bool ValidationCollection::hasValidation(uint32 ledgerIndex,uint160& hanko,uint32 seqnum) -{ - string hankoStr; - Database* db=theApp->getDB(); - db->escape(hanko.begin(),hanko.GetSerializeSize(),hankoStr); - string sql=strprintf("SELECT ValidationID,seqnum from Validations where LedgerIndex=%d and hanko=%s", - ledgerIndex,hankoStr.c_str()); - if(db->executeSQL(sql) && db->startIterRows()) + ValidationSet ret; { - uint32 currentSeqNum=db->getInt(1); - if(currentSeqNum>=seqnum) + boost::mutex::scoped_lock sl(mValidationLock); + boost::unordered_map::iterator it = mValidations.find(ledger); + if (it != mValidations.end()) ret = it->second; + } + return ret; +} + +void ValidationCollection::getValidationCount(const uint256& ledger, int& trusted, int &untrusted) +{ + trusted = untrusted = 0; + boost::mutex::scoped_lock sl(mValidationLock); + boost::unordered_map::iterator it = mValidations.find(ledger); + if (it != mValidations.end()) + { + for (ValidationSet::iterator vit = it->second.begin(), end = it->second.end(); + vit != end; ++vit) { - db->endIterRows(); - return(true); - } - // delete the old validation we were storing - sql=strprintf("DELETE FROM Validations where ValidationID=%d",db->getInt(0)); - db->endIterRows(); - db->executeSQL(sql); - } - return(false); -} - -// TODO: we are adding our own validation -// TODO: when do we check if we are with the consensus? -// TODO: throw out lower seqnums -void ValidationCollection::addValidation(newcoin::Validation& valid) -{ - // TODO: make sure the validation is valid - - uint256 hash=protobufTo256(valid.hash()); - uint160 hanko=protobufTo160(valid.hanko()); - - // make sure we don't already have this validation - if(hasValidation(valid.ledgerindex(),hanko,valid.seqnum())) return; - - // check if we care about this hanko - int validity=theApp->getUNL().checkValid(valid); - if( validity==1 ) - { - addToDB(valid,true); - addToGroup(valid); - theApp->getLedgerMaster().checkConsensus(valid.ledgerindex()); - }else if(validity==0) - { - addToDB(valid,false); - }else - { // the signature wasn't valid - cout << "Invalid Validation" << endl; - } -} - - -bool ValidationCollection::Group::addIfCompatible(Ledger::pointer ledger,newcoin::Validation& valid) -{ - if(mSuperLedger) - { - if(mSuperLedger->isCompatible(ledger)) - { - mValidations.push_back(valid); - mSuperLedger->mergeIn(ledger); + if(vit->second->isTrusted()) + ++trusted; + else + ++untrusted; } } - return(false); } - -// TODO: optimize. We can at least cache what ledgers are compatible -// a validation can be in multiple groups since compatibility isn't transitive -// Sometimes things are just complex -void ValidationCollection::addToGroup(newcoin::Validation& newValid) -{ - - if(mIndexGroups.count(newValid.ledgerindex())) - { - bool canReturn=false; - // see if this hash is already on the list. If so add it there. - vector< Group >& groups=mIndexGroups[newValid.ledgerindex()]; - BOOST_FOREACH(Group& group,groups) - { // FIXME: Cannot modify *at* *all* inside a BOOST_FOREACH - BOOST_FOREACH(newcoin::Validation& valid,group.mValidations) - { - if(valid.hash()==newValid.hash()) - { - group.mValidations.push_back(newValid); - canReturn=true; - break; - } - } - } - - if(canReturn) return; - // this is a validation of a new ledger hash - - uint256 newHash=protobufTo256(newValid.hash()); - Ledger::pointer newLedger=theApp->getLedgerMaster().getLedger(newHash); - if(newLedger) - { // see if this ledger is compatible with any groups - bool foundGroup=false; - BOOST_FOREACH(Group& group,groups) - { - if(group.addIfCompatible(newLedger,newValid)) foundGroup=true; - } - - if(!foundGroup) - { // this validation didn't fit in any of the other groups - // we need to make a new group for it and see what validations fit it - Group& newGroup=mIndexGroups[newValid.ledgerindex()][groups.size()]; - - newGroup.mValidations.push_back(newValid); - newGroup.mSuperLedger=Ledger::pointer(new Ledger(newLedger)); // since this super ledger gets modified and we don't want to screw the original - - vector retVec; - getValidations(newValid.ledgerindex(),retVec); - - - BOOST_FOREACH(newcoin::Validation& valid,retVec) - { - uint256 hash=protobufTo256(valid.hash()); - Ledger::pointer ledger=theApp->getLedgerMaster().getLedger(hash); - newGroup.addIfCompatible(ledger,valid); - } - } - - }else - { // we don't have a ledger for this validation - // add to its own group since we can't check if it is compatible - int newIndex=groups.size(); - mIndexGroups[newValid.ledgerindex()][newIndex].mValidations.push_back(newValid); - } - }else - { // this is the first validation of this ledgerindex - uint256 newHash=protobufTo256(newValid.hash()); - mIndexGroups[newValid.ledgerindex()][0].mValidations.push_back(newValid); - mIndexGroups[newValid.ledgerindex()][0].mSuperLedger=theApp->getLedgerMaster().getLedger(newHash); - } -} - -void ValidationCollection::getValidations(uint32 ledgerIndex,vector& retVec) -{ - string sql=strprintf("SELECT * From Validations where LedgerIndex=%d and wecare=1",ledgerIndex); - - // TODO: ValidationCollection::getValidations(uint32 ledgerIndex) -} - - -// look through all the validated hashes at that index -// put the ledgers into compatible groups -// Pick the group with the most votes -bool ValidationCollection::getConsensusLedger(uint32 ledgerIndex, uint256& ourHash, Ledger::pointer& retLedger, uint256& retHash) -{ - bool ret=false; - if(mIndexGroups.count(ledgerIndex)) - { - unsigned int maxVotes=theConfig.MIN_VOTES_FOR_CONSENSUS; - vector< Group >& groups=mIndexGroups[ledgerIndex]; - Group empty; - Group& maxGroup=empty; - BOOST_FOREACH(Group& group, groups) - { - if(group.mValidations.size()>maxVotes) - { - maxVotes=group.mValidations.size(); - retLedger=group.mSuperLedger; - maxGroup=group; - if(!retLedger) retHash=protobufTo256(group.mValidations[0].hash()); - ret=true; - } - } - if(ret) - { - // should also return false if we are in the consensus - BOOST_FOREACH(newcoin::Validation& valid, maxGroup.mValidations) - { - if(protobufTo256(valid.hash()) == ourHash) return(false); - } - } - } - - return(ret); -} -// vim:ts=4 diff --git a/src/ValidationCollection.h b/src/ValidationCollection.h index eb25be859e..3f41e0fc0f 100644 --- a/src/ValidationCollection.h +++ b/src/ValidationCollection.h @@ -1,56 +1,28 @@ #ifndef __VALIDATION_COLLECTION__ #define __VALIDATION_COLLECTION__ -#include "../obj/src/newcoin.pb.h" +#include +#include + #include "uint256.h" #include "types.h" -#include "Ledger.h" -#include +#include "SerializedValidation.h" + +typedef boost::unordered_map ValidationSet; class ValidationCollection { + protected: - // from ledger hash to the validation - //std::map > mValidations; - //std::map > mIgnoredValidations; + boost::mutex mValidationLock; + boost::unordered_map mValidations; - // this maps ledgerIndex to an array of groups. Each group is a list of validations. - // a validation can be in multiple groups since compatibility isn't transitive - // - class Group - { - public: - std::vector mValidations; - Ledger::pointer mSuperLedger; - - bool addIfCompatible(Ledger::pointer ledger,newcoin::Validation& valid); - }; - - std::map > mIndexGroups; // all the groups at each index - //std::map > mIndexValidations; // all the validations at each index - - bool hasValidation(uint32 ledgerIndex,uint160& hanko,uint32 seqnum); - void addToGroup(newcoin::Validation& valid); - void addToDB(newcoin::Validation& valid,int weCare); public: - ValidationCollection(); + ValidationCollection() { ; } - void save(); - void load(); - - void addValidation(newcoin::Validation& valid); - - void getValidations(uint32 ledgerIndex,std::vector& retVec); - - - // It can miss some compatible ledgers of course if you don't know them - - // fills out retLedger if there is a consensusLedger you can check - // fills out retHash if there isn't a consensusLedger to check. We need to fetch this ledger - // returns false if there isn't a consensus yet or we are in the consensus - bool getConsensusLedger(uint32 ledgerIndex, uint256& ourHash, Ledger::pointer& retLedger, uint256& retHash); - - int getSeqNum(uint32 ledgerIndex); + bool addValidation(SerializedValidation::pointer); + ValidationSet getValidations(const uint256& ledger); + void getValidationCount(const uint256& ledger, int& trusted, int& untrusted); }; #endif