From 58dcc8b9c19863d3d8d1ea40d04900135ddc501a Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Wed, 20 Jun 2012 13:26:32 -0700 Subject: [PATCH 1/9] Don't use stale validiations. --- src/LedgerConsensus.cpp | 4 ++-- src/NetworkOPs.cpp | 6 ++---- src/SerializedObject.h | 1 + src/SerializedValidation.cpp | 10 +++++++++- src/SerializedValidation.h | 5 +++-- src/ValidationCollection.cpp | 31 +++++++++++++++++++++++++++---- src/ValidationCollection.h | 3 ++- 7 files changed, 46 insertions(+), 14 deletions(-) diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index acb269def5..fc6a798c0e 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -825,7 +825,7 @@ void LedgerConsensus::accept(SHAMap::pointer set) if (mValidating) { SerializedValidation::pointer v = boost::make_shared - (newLCLHash, mOurPosition->peekSeed(), true); + (newLCLHash, newLCL->getCloseTimeNC(), mOurPosition->peekSeed(), true); v->setTrusted(); // FIXME: If not proposing, set not full theApp->getValidations().addValidation(v); @@ -833,7 +833,7 @@ void LedgerConsensus::accept(SHAMap::pointer set) newcoin::TMValidation val; val.set_validation(&validation[0], validation.size()); theApp->getConnectionPool().relayMessage(NULL, boost::make_shared(val, newcoin::mtVALIDATION)); - Log(lsINFO) << "Validation sent " << newLCL->getHash().GetHex(); + Log(lsINFO) << "Validation sent " << newLCLHash.GetHex(); } else Log(lsWARNING) << "Not validating"; statusChange(newcoin::neACCEPTED_LEDGER, newLCL); diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index 100c9d4a1b..bad18d8862 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -269,8 +269,6 @@ public: { if (trustedValidations > v.trustedValidations) return true; if (trustedValidations < v.trustedValidations) return false; - if (untrustedValidations > v.untrustedValidations) return true; - if (untrustedValidations < v.untrustedValidations) return false; if (nodesUsing > v.nodesUsing) return true; if (nodesUsing < v.nodesUsing) return false; return highNode > v.highNode; @@ -379,7 +377,7 @@ bool NetworkOPs::checkLastClosedLedger(const std::vector& peerLis ValidationCount& vc = ledgers[peerLedger]; if (vc.nodesUsing == 0) { - theApp->getValidations().getValidationCount(peerLedger, + theApp->getValidations().getValidationCount(peerLedger, true, vc.trustedValidations, vc.untrustedValidations); Log(lsTRACE) << peerLedger.GetHex() << " has " << vc.trustedValidations << " trusted validations and " << vc.untrustedValidations << " untrusted"; @@ -397,7 +395,7 @@ bool NetworkOPs::checkLastClosedLedger(const std::vector& peerLis if (ourVC.nodesUsing == 0) { ourVC.highNode = theApp->getWallet().getNodePublic(); - theApp->getValidations().getValidationCount(closedLedger, + theApp->getValidations().getValidationCount(closedLedger, true, ourVC.trustedValidations, ourVC.untrustedValidations); } ++ourVC.nodesUsing; diff --git a/src/SerializedObject.h b/src/SerializedObject.h index 3c322486da..ab115e1254 100644 --- a/src/SerializedObject.h +++ b/src/SerializedObject.h @@ -38,6 +38,7 @@ enum SOE_Field sfBorrowRate, sfBorrowStart, sfBorrower, + sfCloseTime, sfCurrency, sfCurrencyIn, sfCurrencyOut, diff --git a/src/SerializedValidation.cpp b/src/SerializedValidation.cpp index 209f870948..9c19bc7615 100644 --- a/src/SerializedValidation.cpp +++ b/src/SerializedValidation.cpp @@ -4,6 +4,7 @@ SOElement SerializedValidation::sValidationFormat[] = { { sfFlags, "Flags", STI_UINT32, SOE_FLAGS, 0 }, { sfLedgerHash, "LedgerHash", STI_HASH256, SOE_REQUIRED, 0 }, + { sfCloseTime, "CloseTime", STI_UINT64, SOE_REQUIRED, 0 }, { sfSigningKey, "SigningKey", STI_VL, SOE_REQUIRED, 0 }, { sfExtensions, "Extensions", STI_TL, SOE_IFFLAG, 0x01000000 }, { sfInvalid, NULL, STI_DONE, SOE_NEVER, -1 }, @@ -18,10 +19,12 @@ SerializedValidation::SerializedValidation(SerializerIterator& sit, bool checkSi if (!isValid()) throw std::runtime_error("Invalid validation"); } -SerializedValidation::SerializedValidation(const uint256& ledgerHash, const NewcoinAddress& naSeed, bool isFull) +SerializedValidation::SerializedValidation(const uint256& ledgerHash, uint64 closeTime, + const NewcoinAddress& naSeed, bool isFull) : STObject(sValidationFormat), mSignature("Signature"), mTrusted(false) { setValueFieldH256(sfLedgerHash, ledgerHash); + setValueFieldU64(sfCloseTime, closeTime); if (naSeed.isValid()) setValueFieldVL(sfSigningKey, NewcoinAddress::createNodePublic(naSeed).getNodePublic()); if (!isFull) setFlag(sFullFlag); @@ -47,6 +50,11 @@ uint256 SerializedValidation::getLedgerHash() const return getValueFieldH256(sfLedgerHash); } +uint64 SerializedValidation::getCloseTime() const +{ + return getValueFieldU64(sfCloseTime); +} + bool SerializedValidation::isValid() const { try diff --git a/src/SerializedValidation.h b/src/SerializedValidation.h index 987a883234..ab0886a6bf 100644 --- a/src/SerializedValidation.h +++ b/src/SerializedValidation.h @@ -23,9 +23,10 @@ public: SerializedValidation(SerializerIterator& sit, bool checkSignature = true); SerializedValidation(const Serializer& s, bool checkSignature = true); - SerializedValidation(const uint256& ledgerHash, const NewcoinAddress& naSeed, bool isFull); + SerializedValidation(const uint256& ledgerHash, uint64 closeTime, const NewcoinAddress& naSeed, bool isFull); uint256 getLedgerHash() const; + uint64 getCloseTime() const; NewcoinAddress getSignerPublic() const; bool isValid() const; bool isFull() const; @@ -33,7 +34,7 @@ public: CKey::pointer getSigningKey() const; uint256 getSigningHash() const; - void setTrusted() { mTrusted = true; } + void setTrusted() { mTrusted = true; } void addSigned(Serializer&) const; void addSignature(Serializer&) const; std::vector getSigned() const; diff --git a/src/ValidationCollection.cpp b/src/ValidationCollection.cpp index 15d6d92792..b30ea4a00f 100644 --- a/src/ValidationCollection.cpp +++ b/src/ValidationCollection.cpp @@ -2,12 +2,21 @@ #include "ValidationCollection.h" #include "Application.h" +#include "LedgerTiming.h" #include "Log.h" bool ValidationCollection::addValidation(SerializedValidation::pointer val) { - if(theApp->getUNL().nodeInUNL(val->getSignerPublic())) - val->setTrusted(); + bool isTrusted = false; + if (theApp->getUNL().nodeInUNL(val->getSignerPublic())) + { + uint64 now = theApp->getOPs().getNetworkTimeNC(); + uint64 valClose = val->getCloseTime(); + if ((now > valClose) && (now < (valClose + 2 * LEDGER_INTERVAL))) + isTrusted = true; + else + Log(lsWARNING) << "Received stale validation"; + } uint256 hash = val->getLedgerHash(); uint160 node = val->getSignerPublic().getNodeID(); @@ -16,6 +25,12 @@ bool ValidationCollection::addValidation(SerializedValidation::pointer val) boost::mutex::scoped_lock sl(mValidationLock); if (!mValidations[hash].insert(std::make_pair(node, val)).second) return false; + if (isTrusted) + { + boost::unordered_map::iterator it = mCurrentValidations.find(node); + if ((it == mCurrentValidations.end()) || (val->getCloseTime() >= it->second->getCloseTime())) + mCurrentValidations[node] = val; + } } Log(lsINFO) << "Val for " << hash.GetHex() << " from " << node.GetHex() << " added " << @@ -34,16 +49,24 @@ ValidationSet ValidationCollection::getValidations(const uint256& ledger) return ret; } -void ValidationCollection::getValidationCount(const uint256& ledger, int& trusted, int &untrusted) +void ValidationCollection::getValidationCount(const uint256& ledger, bool currentOnly, int& trusted, int &untrusted) { trusted = untrusted = 0; boost::mutex::scoped_lock sl(mValidationLock); boost::unordered_map::iterator it = mValidations.find(ledger); + uint64 now = theApp->getOPs().getNetworkTimeNC(); if (it != mValidations.end()) { for (ValidationSet::iterator vit = it->second.begin(), end = it->second.end(); vit != end; ++vit) { - if (vit->second->isTrusted()) + bool trusted = vit->second->isTrusted(); + if (trusted && currentOnly) + { + uint64 closeTime = vit->second->getCloseTime(); + if ((now < closeTime) || (now > (closeTime + 2 * LEDGER_INTERVAL))) + trusted = false; + } + if (trusted) ++trusted; else ++untrusted; diff --git a/src/ValidationCollection.h b/src/ValidationCollection.h index 3f41e0fc0f..62540f5853 100644 --- a/src/ValidationCollection.h +++ b/src/ValidationCollection.h @@ -16,13 +16,14 @@ class ValidationCollection boost::mutex mValidationLock; boost::unordered_map mValidations; + boost::unordered_map mCurrentValidations; public: ValidationCollection() { ; } bool addValidation(SerializedValidation::pointer); ValidationSet getValidations(const uint256& ledger); - void getValidationCount(const uint256& ledger, int& trusted, int& untrusted); + void getValidationCount(const uint256& ledger, bool currentOnly, int& trusted, int& untrusted); }; #endif From 3ccf91ffe0c4afdd668e6170421f34ae9393484f Mon Sep 17 00:00:00 2001 From: Arthur Britto Date: Wed, 20 Jun 2012 13:37:01 -0700 Subject: [PATCH 2/9] Add parseIpPort, ADDRESS, & ADDRESS_SHARED to utils. --- src/utils.cpp | 37 +++++++++++++++++++++++++++++++++++-- src/utils.h | 12 ++++++++---- 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/src/utils.cpp b/src/utils.cpp index d70caeb72c..327a991854 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -1,6 +1,9 @@ #include "utils.h" #include "uint256.h" +#include +#include + // // Time support // We have our own epoch. @@ -123,6 +126,36 @@ DH* DH_der_load(const std::string& strDer) return d2i_DHparams(NULL, &pbuf, strDer.size()); } +// +// IP Port parsing +// +// <-- iPort: "" = -1 +bool parseIpPort(const std::string& strSource, std::string& strIP, int& iPort) +{ + boost::smatch smMatch; + bool bValid = false; + + static boost::regex reEndpoint("\\`\\s*(\\S+)(?:\\s+(\\d+))?\\s*\\'"); + + if (boost::regex_match(strSource, smMatch, reEndpoint)) + { + boost::system::error_code err; + std::string strIPRaw = smMatch[1]; + std::string strPortRaw = smMatch[2]; + + boost::asio::ip::address addrIP = boost::asio::ip::address::from_string(strIPRaw, err); + + bValid = !err; + if (bValid) + { + strIP = addrIP.to_string(); + iPort = strPortRaw.empty() ? -1 : boost::lexical_cast(strPortRaw); + } + } + + return bValid; +} + /* void intIPtoStr(int ip,std::string& retStr) { @@ -130,7 +163,7 @@ void intIPtoStr(int ip,std::string& retStr) bytes[0] = ip & 0xFF; bytes[1] = (ip >> 8) & 0xFF; bytes[2] = (ip >> 16) & 0xFF; - bytes[3] = (ip >> 24) & 0xFF; + bytes[3] = (ip >> 24) & 0xFF; retStr=str(boost::format("%d.%d.%d.%d") % bytes[3] % bytes[2] % bytes[1] % bytes[0] ); } @@ -145,7 +178,7 @@ int strIPtoInt(std::string& ipStr) #include //#include "Winsock2.h" -//#include +//#include // from: http://stackoverflow.com/questions/3022552/is-there-any-standard-htonl-like-function-for-64-bits-integers-in-c // but we don't need to check the endianness uint64_t htobe64(uint64_t value) diff --git a/src/utils.h b/src/utils.h index a069c96fc7..302d5cfc37 100644 --- a/src/utils.h +++ b/src/utils.h @@ -8,9 +8,11 @@ #include "types.h" -#define nothing() do {} while (0) -#define fallthru() do {} while (0) -#define NUMBER(x) (sizeof(x)/sizeof((x)[0])) +#define nothing() do {} while (0) +#define fallthru() do {} while (0) +#define NUMBER(x) (sizeof(x)/sizeof((x)[0])) +#define ADDRESS(p) strHex(uint64( ((char*) p) - ((char*) 0))) +#define ADDRESS_SHARED(p) strHex(uint64( ((char*) (p).get()) - ((char*) 0))) #ifndef MAX #define MAX(x,y) ((x) < (y) ? (y) : (x)) @@ -36,7 +38,7 @@ int strIPtoInt(std::string& ipStr); template std::string strJoin(Iterator first, Iterator last, std::string strSeperator) { - std::ostringstream ossValues; + std::ostringstream ossValues; for (Iterator start = first; first != last; first++) { @@ -104,6 +106,8 @@ std::vector strUnHex(const std::string& strSrc); std::vector strCopy(const std::string& strSrc); std::string strCopy(const std::vector& vucSrc); +bool parseIpPort(const std::string& strSource, std::string& strIP, int& iPort); + DH* DH_der_load(const std::string& strDer); std::string DH_der_gen(int iKeyLength); From f0be92f4f0f7bcf8c4538a24b3811f57c380efa2 Mon Sep 17 00:00:00 2001 From: Arthur Britto Date: Wed, 20 Jun 2012 13:37:28 -0700 Subject: [PATCH 3/9] Make NewcoinAddress::setSeedGeneric more stringent. --- src/NewcoinAddress.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/NewcoinAddress.cpp b/src/NewcoinAddress.cpp index 49be00f061..2f24ef8c0d 100644 --- a/src/NewcoinAddress.cpp +++ b/src/NewcoinAddress.cpp @@ -716,7 +716,9 @@ bool NewcoinAddress::setSeedGeneric(const std::string& strText) if (strText.empty() || naTemp.setAccountID(strText) || naTemp.setAccountPublic(strText) - || naTemp.setAccountPrivate(strText)) + || naTemp.setAccountPrivate(strText) + || naTemp.setNodePublic(strText) + || naTemp.setNodePrivate(strText)) { bResult = false; } From 411d056377817858818fca7b41333e34192131ce Mon Sep 17 00:00:00 2001 From: Arthur Britto Date: Wed, 20 Jun 2012 13:38:10 -0700 Subject: [PATCH 4/9] Add a default score to PeerIps. --- src/DBInit.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DBInit.cpp b/src/DBInit.cpp index 8c6931e3b6..51df2458ca 100644 --- a/src/DBInit.cpp +++ b/src/DBInit.cpp @@ -214,7 +214,7 @@ const char *WalletDBInit[] = { // Delay between scans. "CREATE TABLE PeerIps ( \ IpPort TEXT NOT NULL PRIMARY KEY, \ - Score INTEGER NOT NULL, \ + Score INTEGER NOT NULL DEFAULT 0, \ Source CHARACTER(1) NOT NULL, \ ScanNext DATETIME DEFAULT 0, \ ScanInterval INTEGER NOT NULL DEFAULT 0 \ From 71e6921a617536943dfd243ada03a5bfb3dd5611 Mon Sep 17 00:00:00 2001 From: Arthur Britto Date: Wed, 20 Jun 2012 13:39:14 -0700 Subject: [PATCH 5/9] Bootstrap [ips] from newcoind.cfg. --- src/UniqueNodeList.cpp | 102 ++++++++++++++++++++++++----------------- src/UniqueNodeList.h | 2 + 2 files changed, 63 insertions(+), 41 deletions(-) diff --git a/src/UniqueNodeList.cpp b/src/UniqueNodeList.cpp index 86b47ea1a9..74ceb647fd 100644 --- a/src/UniqueNodeList.cpp +++ b/src/UniqueNodeList.cpp @@ -4,6 +4,7 @@ #include "Application.h" #include "Conversion.h" #include "HttpsClient.h" +#include "Log.h" #include "ParseSection.h" #include "Serializer.h" #include "UniqueNodeList.h" @@ -608,12 +609,11 @@ void UniqueNodeList::processIps(const std::string& strSite, const NewcoinAddress { Database* db=theApp->getWalletDB()->getDB(); - std::string strEscNodePublic = db->escape(naNodePublic.humanNodePublic()); + std::string strEscNodePublic = sqlEscape(naNodePublic.humanNodePublic()); - std::cerr + Log(lsINFO) << str(boost::format("Validator: '%s' processing %d ips.") - % strSite % ( pmtVecStrIps ? pmtVecStrIps->size() : 0)) - << std::endl; + % strSite % ( pmtVecStrIps ? pmtVecStrIps->size() : 0)); // Remove all current Validator's entries in IpReferrals { @@ -631,32 +631,15 @@ void UniqueNodeList::processIps(const std::string& strSite, const NewcoinAddress int iValues = 0; BOOST_FOREACH(std::string strReferral, *pmtVecStrIps) { - boost::smatch smMatch; - std::string strIP; - int iPort; - bool bValid = false; - if (iValues == REFERRAL_VALIDATORS_MAX) break; - static boost::regex reEndpoint("\\`\\s*(\\S+)(?:\\s+(\\d+))?\\s*\\'"); + std::string strIP; + int iPort; + bool bValid = parseIpPort(strReferral, strIP, iPort); // XXX Filter out private network ips. // XXX http://en.wikipedia.org/wiki/Private_network - if (boost::regex_match(strReferral, smMatch, reEndpoint)) - { - boost::system::error_code err; - std::string strIPRaw = smMatch[1]; - std::string strPortRaw = smMatch[2]; - - iPort = strPortRaw.empty() ? -1 : boost::lexical_cast(strPortRaw); - - boost::asio::ip::address addrIP = boost::asio::ip::address::from_string(strIPRaw, err); - - bValid = !err; - if (bValid) - strIP = addrIP.to_string(); - } if (bValid) { @@ -733,10 +716,7 @@ int UniqueNodeList::processValidators(const std::string& strSite, const std::str if (!boost::regex_match(strReferral, smMatch, reReferral)) { - std::cerr - << str(boost::format("Validator: '%s' ["SECTION_VALIDATORS"]: rejecting line: '%s'") - % strSite % strReferral) - << std::endl; + Log(lsWARNING) << str(boost::format("Bad validator: syntax error: %s: %s") % strSite % strReferral); } else { @@ -744,27 +724,36 @@ int UniqueNodeList::processValidators(const std::string& strSite, const std::str std::string strComment = smMatch[2]; NewcoinAddress naValidator; - std::cerr << str(boost::format("Validator='%s', Comment='%s'") % strRefered % strComment) << std::endl; + if (naValidator.setSeedGeneric(strRefered)) + { - if (naValidator.setNodePublic(strRefered)) + Log(lsWARNING) << str(boost::format("Bad validator: domain or public key required: %s %s") % strRefered % strComment); + } + else if (naValidator.setNodePublic(strRefered)) { // A public key. // XXX Schedule for CAS lookup. nodeAddPublic(naValidator, vsWhy, strComment); + Log(lsINFO) << str(boost::format("Node Public: %s %s") % strRefered % strComment); + if (naNodePublic.isValid()) vstrValues.push_back(str(boost::format("('%s',%d,'%s')") % strNodePublic % iValues % naValidator.humanNodePublic())); + + iValues++; } else { // A domain: need to look it up. nodeAddDomain(strRefered, vsWhy, strComment); + Log(lsINFO) << str(boost::format("Node Domain: %s %s") % strRefered % strComment); + if (naNodePublic.isValid()) vstrValues.push_back(str(boost::format("('%s',%d,%s)") % strNodePublic % iValues % sqlEscape(strRefered))); - } - iValues++; + iValues++; + } } } @@ -1140,8 +1129,10 @@ int UniqueNodeList::iSourceScore(validatorSource vsWhy) switch (vsWhy) { case vsConfig: iScore = 1500; break; + case vsInbound: iScore = 0; break; case vsManual: iScore = 1500; break; case vsReferral: iScore = 0; break; + case vsTold: iScore = 0; break; case vsValidator: iScore = 1000; break; case vsWeb: iScore = 200; break; default: @@ -1222,7 +1213,7 @@ void UniqueNodeList::setSeedDomains(const seedDomain& sdSource, bool bNext) std::string strSql = str(boost::format("REPLACE INTO SeedDomains (Domain,PublicKey,Source,Next,Scan,Fetch,Sha256,Comment) VALUES (%s, %s, %s, %d, %d, %d, '%s', %s);") % db->escape(sdSource.strDomain) % (sdSource.naPublicKey.isValid() ? db->escape(sdSource.naPublicKey.humanNodePublic()) : "NULL") - % db->escape(std::string(1, static_cast(sdSource.vsSource))) + % sqlEscape(std::string(1, static_cast(sdSource.vsSource))) % iNext % iScan % iFetch @@ -1567,12 +1558,11 @@ void UniqueNodeList::nodeNetwork() void UniqueNodeList::nodeBootstrap() { - int iDomains = 0; - int iNodes = 0; + int iDomains = 0; + int iNodes = 0; + Database* db = theApp->getWalletDB()->getDB(); { - Database* db=theApp->getWalletDB()->getDB(); - ScopedLock sl(theApp->getWalletDB()->getDBLock()); if (db->executeSQL(str(boost::format("SELECT COUNT(*) AS Count FROM SeedDomains WHERE Source='%s' OR Source='%c';") % vsManual % vsValidator)) && db->startIterRows()) @@ -1587,7 +1577,7 @@ void UniqueNodeList::nodeBootstrap() // Always merge in the file specified in the config. if (!theConfig.UNL_DEFAULT.empty()) { - std::cerr << "Bootstrapping UNL: loading from unl_default." << std::endl; + Log(lsINFO) << "Bootstrapping UNL: loading from unl_default."; bLoaded = nodeLoad(theConfig.UNL_DEFAULT); } @@ -1595,7 +1585,7 @@ void UniqueNodeList::nodeBootstrap() // If never loaded anything try the current directory. if (!bLoaded && theConfig.UNL_DEFAULT.empty()) { - std::cerr << "Bootstrapping UNL: loading from '" VALIDATORS_FILE_NAME "'." << std::endl; + Log(lsINFO) << "Bootstrapping UNL: loading from '" VALIDATORS_FILE_NAME "'."; bLoaded = nodeLoad(VALIDATORS_FILE_NAME); } @@ -1605,7 +1595,7 @@ void UniqueNodeList::nodeBootstrap() { NewcoinAddress naInvalid; // Don't want a referrer on added entries. - std::cerr << "Bootstrapping UNL: loading from " CONFIG_FILE_NAME "." << std::endl; + Log(lsINFO) << "Bootstrapping UNL: loading from " CONFIG_FILE_NAME "."; if (processValidators("local", CONFIG_FILE_NAME, naInvalid, vsConfig, &theConfig.VALIDATORS)) bLoaded = true; @@ -1613,10 +1603,40 @@ void UniqueNodeList::nodeBootstrap() if (!bLoaded) { - std::cerr << "Bootstrapping UNL: loading from " << theConfig.VALIDATORS_SITE << "." << std::endl; + Log(lsINFO) << "Bootstrapping UNL: loading from " << theConfig.VALIDATORS_SITE << "."; nodeNetwork(); } + + if (!theConfig.IPS.empty()) + { + std::vector vstrValues; + + vstrValues.reserve(theConfig.IPS.size()); + + BOOST_FOREACH(const std::string& strPeer, theConfig.IPS) + { + std::string strIP; + int iPort; + + if (parseIpPort(strPeer, strIP, iPort)) + { + vstrValues.push_back(str(boost::format("(%s,'%c')") + % sqlEscape(str(boost::format("%s %d") % strIP % iPort)) + % static_cast(vsConfig))); + } + } + + if (!vstrValues.empty()) + { + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + + db->executeSQL(str(boost::format("REPLACE INTO PeerIps (IpPort,Source) VALUES %s;") + % strJoin(vstrValues.begin(), vstrValues.end(), ","))); + } + + fetchDirty(); + } } // Process a validators.txt. diff --git a/src/UniqueNodeList.h b/src/UniqueNodeList.h index 0e5e72bde8..1de853c960 100644 --- a/src/UniqueNodeList.h +++ b/src/UniqueNodeList.h @@ -33,8 +33,10 @@ class UniqueNodeList public: typedef enum { vsConfig = 'C', // newcoind.cfg + vsInbound = 'I', vsManual = 'M', vsReferral = 'R', + vsTold = 'T', vsValidator = 'V', // validators.txt vsWeb = 'W', } validatorSource; From a2c5b90fe9f3672d4fa36f41efdbaf30bc32cffd Mon Sep 17 00:00:00 2001 From: Arthur Britto Date: Wed, 20 Jun 2012 13:40:03 -0700 Subject: [PATCH 6/9] Fixes and clean up for peer management. --- src/ConnectionPool.cpp | 300 ++++++++++++++++++++++++----------------- src/ConnectionPool.h | 16 +-- src/Peer.cpp | 155 ++++++++++++--------- src/Peer.h | 3 +- 4 files changed, 278 insertions(+), 196 deletions(-) diff --git a/src/ConnectionPool.cpp b/src/ConnectionPool.cpp index 12373a0da6..ed7df37439 100644 --- a/src/ConnectionPool.cpp +++ b/src/ConnectionPool.cpp @@ -28,7 +28,6 @@ void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort) ConnectionPool::ConnectionPool(boost::asio::io_service& io_service) : mCtx(boost::asio::ssl::context::sslv23), - bScanning(false), mScanTimer(io_service), mPolicyTimer(io_service) { @@ -53,36 +52,29 @@ void ConnectionPool::start() bool ConnectionPool::getTopNAddrs(int n,std::vector& addrs) { // XXX Filter out other local addresses (like ipv6) - if (!theConfig.PEER_IP.empty() && theConfig.PEER_IP != "127.0.0.1") + Database* db = theApp->getWalletDB()->getDB(); + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + + SQL_FOREACH(db, str(boost::format("SELECT IpPort FROM PeerIps LIMIT %d") % n) ) { - addrs.push_back(str(boost::format("%s %d") % theConfig.PEER_IP % theConfig.PEER_PORT)); - } + std::string str; - { - Database* db = theApp->getWalletDB()->getDB(); - ScopedLock sl(theApp->getWalletDB()->getDBLock()); + db->getStr(0,str); - SQL_FOREACH(db, str(boost::format("SELECT IpPort FROM PeerIps LIMIT %d") % n) ) - { - std::string str; - - db->getStr(0,str); - - addrs.push_back(str); - } + addrs.push_back(str); } return true; } -bool ConnectionPool::savePeer(const std::string& strIp, int iPort,char code) +bool ConnectionPool::savePeer(const std::string& strIp, int iPort, char code) { Database* db = theApp->getWalletDB()->getDB(); - std::string ipPort= sqlEscape(str(boost::format("%s %d") % strIp % iPort)); + std::string ipPort = sqlEscape(str(boost::format("%s %d") % strIp % iPort)); ScopedLock sl(theApp->getWalletDB()->getDBLock()); - std::string sql=str(boost::format("SELECT COUNT(*) FROM PeerIps WHERE IpPort=%s;") % ipPort); + std::string sql = str(boost::format("SELECT COUNT(*) FROM PeerIps WHERE IpPort=%s;") % ipPort); if (db->executeSQL(sql) && db->startIterRows()) { if ( db->getInt(0)==0) @@ -99,6 +91,9 @@ bool ConnectionPool::savePeer(const std::string& strIp, int iPort,char code) return false; } +// An available peer is one we had no trouble connect to last time and that we are not currently knowingly connected or connecting +// too. +// // <-- true, if a peer is available to connect to bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort) { @@ -108,6 +103,7 @@ bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort) // Convert mIpMap (list of open connections) to a vector of " ". { boost::mutex::scoped_lock sl(mPeerLock); + vstrIpPort.reserve(mIpMap.size()); BOOST_FOREACH(pipPeer ipPeer, mIpMap) @@ -115,19 +111,22 @@ bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort) const std::string& strIp = ipPeer.first.first; int iPort = ipPeer.first.second; - vstrIpPort.push_back(db->escape(str(boost::format("%s %d") % strIp % iPort))); + vstrIpPort.push_back(sqlEscape(str(boost::format("%s %d") % strIp % iPort))); } } // Get the first IpPort entry which is not in vector and which is not scheduled for scanning. std::string strIpPort; - ScopedLock sl(theApp->getWalletDB()->getDBLock()); - if (db->executeSQL(str(boost::format("SELECT IpPort FROM PeerIps WHERE ScanNext IS NULL AND IpPort NOT IN (%s) LIMIT 1;") - % strJoin(vstrIpPort.begin(), vstrIpPort.end(), ","))) - && db->startIterRows()) { - db->getStr("IpPort", strIpPort); + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + + if (db->executeSQL(str(boost::format("SELECT IpPort FROM PeerIps WHERE ScanNext IS NULL AND IpPort NOT IN (%s) LIMIT 1;") + % strJoin(vstrIpPort.begin(), vstrIpPort.end(), ","))) + && db->startIterRows()) + { + db->getStr("IpPort", strIpPort); + } } bool bAvailable = !strIpPort.empty(); @@ -138,6 +137,7 @@ bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort) return bAvailable; } +// Make sure we have at least low water connections. void ConnectionPool::policyLowWater() { std::string strIp; @@ -166,7 +166,7 @@ void ConnectionPool::policyLowWater() { // Try to start connection. if (!peerConnect(strIp, iPort)) - throw std::runtime_error("Internal error: standby was already connected."); + Log(lsINFO) << "policyLowWater was already connected."; // Check if we need more. policyLowWater(); @@ -175,24 +175,14 @@ void ConnectionPool::policyLowWater() void ConnectionPool::policyEnforce() { - boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time(); - - Log(lsTRACE) << "policyEnforce: begin: " << tpNow; - - // Cancel any in progrss timer. + // Cancel any in progress timer. (void) mPolicyTimer.cancel(); // Enforce policies. policyLowWater(); // Schedule next enforcement. - boost::posix_time::ptime tpNext; - - tpNext = boost::posix_time::second_clock::universal_time()+boost::posix_time::seconds(POLICY_INTERVAL_SECONDS); - - Log(lsTRACE) << "policyEnforce: schedule : " << tpNext; - - mPolicyTimer.expires_at(tpNext); + mPolicyTimer.expires_at(boost::posix_time::second_clock::universal_time()+boost::posix_time::seconds(POLICY_INTERVAL_SECONDS)); mPolicyTimer.async_wait(boost::bind(&ConnectionPool::policyHandler, this, _1)); } @@ -212,6 +202,8 @@ void ConnectionPool::policyHandler(const boost::system::error_code& ecResult) } } +// YYY: Should probably do this in the background. +// YYY: Might end up sending to disconnected peer? void ConnectionPool::relayMessage(Peer* fromPeer, PackedMessage::pointer msg) { boost::mutex::scoped_lock sl(mPeerLock); @@ -226,18 +218,18 @@ void ConnectionPool::relayMessage(Peer* fromPeer, PackedMessage::pointer msg) } } +// Schedule a connection via scanning. +// // Add or modify into PeerIps as a manual entry for immediate scanning. // Requires sane IP and port. void ConnectionPool::connectTo(const std::string& strIp, int iPort) { - Database* db = theApp->getWalletDB()->getDB(); - std::string ipPort = sqlEscape(str(boost::format("%s %d") % strIp % iPort)); - { + Database* db = theApp->getWalletDB()->getDB(); ScopedLock sl(theApp->getWalletDB()->getDBLock()); db->executeSQL(str(boost::format("REPLACE INTO PeerIps (IpPort,Score,Source,ScanNext) values (%s,%d,'%c',0);") - % ipPort + % sqlEscape(str(boost::format("%s %d") % strIp % iPort)) % theApp->getUNL().iSourceScore(UniqueNodeList::vsManual) % char(UniqueNodeList::vsManual))); } @@ -245,46 +237,52 @@ void ConnectionPool::connectTo(const std::string& strIp, int iPort) scanRefresh(); } +// Start a connection, if not already known connected or connecting. +// // <-- true, if already connected. -bool ConnectionPool::peerConnect(const std::string& strIp, int iPort) +Peer::pointer ConnectionPool::peerConnect(const std::string& strIp, int iPort) { - bool bConnecting; - ipPort ip = make_pair(strIp, iPort); + ipPort pipPeer = make_pair(strIp, iPort); + Peer::pointer ppResult = Peer::pointer(); boost::unordered_map::iterator it; - boost::mutex::scoped_lock sl(mPeerLock); - - it = mIpMap.find(ip); - - if (it == mIpMap.end()) { - // Did not find it. Not already connecting or connected. - std::cerr << "ConnectionPool::peerConnect: Connecting: " - << strIp << " " << iPort << std::endl; + boost::mutex::scoped_lock sl(mPeerLock); - Peer::pointer peer(Peer::create(theApp->getIOService(), mCtx)); + if ((it = mIpMap.find(pipPeer)) == mIpMap.end()) + { + Peer::pointer ppNew(Peer::create(theApp->getIOService(), mCtx)); - mIpMap[ip] = peer; + // Did not find it. Not already connecting or connected. + ppNew->connect(strIp, iPort); - peer->connect(strIp, iPort); + mIpMap[pipPeer] = ppNew; - // ++miConnectStarting; + ppResult = ppNew; + // ++miConnectStarting; + } + else + { + // Found it. Already connected. - bConnecting = true; + nothing(); + } + } + + if (ppResult) + { + Log(lsINFO) << "Pool: Connecting: " << ADDRESS_SHARED(ppResult) << ": " << strIp << " " << iPort; } else { - // Found it. Already connected. - std::cerr << "ConnectionPool::peerConnect: Already connected: " - << strIp << " " << iPort << std::endl; - - bConnecting = false; + Log(lsINFO) << "Pool: Already connected: " << strIp << " " << iPort; } - return bConnecting; + return ppResult; } +// Returns information on verified peers. Json::Value ConnectionPool::getPeersJson() { Json::Value ret(Json::arrayValue); @@ -316,18 +314,16 @@ std::vector ConnectionPool::getPeerVector() } // Now know peer's node public key. Determine if we want to stay connected. +// <-- bNew: false = redundant bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& naPeer, const std::string& strIP, int iPort) { - bool bSuccess; + bool bNew = false; - std::cerr << "ConnectionPool::peerConnected: " - << naPeer.humanNodePublic() << " " << strIP << " " << iPort << std::endl; assert(!!peer); if (naPeer == theApp->getWallet().getNodePublic()) { - std::cerr << "ConnectionPool::peerConnected: To self." << std::endl; - bSuccess = false; + Log(lsINFO) << "Pool: Connected: self: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort; } else { @@ -337,31 +333,48 @@ bool ConnectionPool::peerConnected(Peer::pointer peer, const NewcoinAddress& naP if (itCm == mConnectedMap.end()) { // New connection. + Log(lsINFO) << "Pool: Connected: new: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort; + mConnectedMap[naPeer] = peer; - bSuccess = true; + bNew = true; + } + // Found in map, already connected. + else if (!strIP.empty()) + { + // Was an outbound connection, we know IP and port. + // Note in previous connection how to reconnect. + if (itCm->second->getIP().empty()) + { + // Old peer did not know it's IP. + Log(lsINFO) << "Pool: Connected: redundant: outbound: " << ADDRESS_SHARED(peer) << " discovered: " << ADDRESS_SHARED(itCm->second) << ": " << strIP << " " << iPort; + + itCm->second->setIpPort(strIP, iPort); + + // Add old connection to identified connection list. + mIpMap[make_pair(strIP, iPort)] = itCm->second; + } + else + { + // Old peer knew its IP. Do nothing. + Log(lsINFO) << "Pool: Connected: redundant: outbound: rediscovered: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort; + + nothing(); + } } else { - // Found in map, already connected. - if (!strIP.empty()) - { - // Was an outbound connection, we know IP and port. - // Note in previous connection how to reconnect. - itCm->second->peerIpPort(strIP, iPort); - } + Log(lsINFO) << "Pool: Connected: redundant: inbound: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort; - bSuccess = false; // Don't need a redundant connection. + nothing(); } } - return bSuccess; + return bNew; } -// We maintain a map of public key to peer for connectted and verified peers. Maintain it. +// We maintain a map of public key to peer for connected and verified peers. Maintain it. void ConnectionPool::peerDisconnected(Peer::pointer peer, const NewcoinAddress& naPeer) { - std::cerr << "ConnectionPool::peerDisconnected: " << peer->getIP() << " " << peer->getPort() << std::endl; - if (naPeer.isValid()) { boost::unordered_map::iterator itCm; @@ -373,22 +386,36 @@ void ConnectionPool::peerDisconnected(Peer::pointer peer, const NewcoinAddress& if (itCm == mConnectedMap.end()) { // Did not find it. Not already connecting or connected. - std::cerr << "Internal Error: peer connection was inconsistent." << std::endl; - // XXX Bad error. + Log(lsWARNING) << "Pool: disconnected: Internal Error: mConnectedMap was inconsistent."; + // XXX Maybe bad error, considering we have racing connections, may not so bad. + } + else if (itCm->second != peer) + { + Log(lsWARNING) << "Pool: disconected: non canonical entry"; + + nothing(); } else { // Found it. Delete it. mConnectedMap.erase(itCm); + + Log(lsINFO) << "Pool: disconnected: " << naPeer.humanNodePublic() << " " << peer->getIP() << " " << peer->getPort(); } } + else + { + Log(lsINFO) << "Pool: disconnected: anonymous: " << peer->getIP() << " " << peer->getPort(); + } } -void ConnectionPool::peerScanSet(const std::string& strIp, int iPort) +// Schedule for immediate scanning, if not already scheduled. +// +// <-- true, scanRefresh needed. +bool ConnectionPool::peerScanSet(const std::string& strIp, int iPort) { - std::cerr << "ConnectionPool::peerScanSet: " << strIp << " " << iPort << std::endl; - std::string strIpPort = str(boost::format("%s %d") % strIp % iPort); + bool bScanDirty = false; ScopedLock sl(theApp->getWalletDB()->getDBLock()); Database* db = theApp->getWalletDB()->getDB(); @@ -404,38 +431,48 @@ void ConnectionPool::peerScanSet(const std::string& strIp, int iPort) boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time(); boost::posix_time::ptime tpNext = tpNow + boost::posix_time::seconds(iInterval); - std::cerr << str(boost::format("peerScanSet: scan schedule: %s %s (next %s, delay=%s)") - % mScanIp % mScanPort % tpNext % iInterval) << std::endl; + Log(lsINFO) << str(boost::format("Scanning: schedule create: %s %s (next %s, delay=%s)") + % mScanIp % mScanPort % tpNext % iInterval); db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IpPort=%s;") % iToSeconds(tpNext) % iInterval % db->escape(strIpPort))); + + bScanDirty = true; } else { - // Scanning connection terminate, already scheduled for retry. - nothing(); + // Scanning connection terminated, already scheduled for retry. + boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time(); + boost::posix_time::ptime tpNext = ptFromSeconds(db->getInt("ScanNext")); + int iInterval = (tpNext-tpNow).seconds(); + + Log(lsINFO) << str(boost::format("Scanning: schedule exists: %s %s (next %s, delay=%s)") + % mScanIp % mScanPort % tpNext % iInterval); } } else { - std::cerr << "peerScanSet: peer wasn't in PeerIps: " << strIp << " " << iPort << std::endl; + Log(lsWARNING) << "Scanning: peer wasn't in PeerIps: " << strIp << " " << iPort; } + + return bScanDirty; } -void ConnectionPool::peerFailed(const std::string& strIp, int iPort) +// --> strIp: not empty +void ConnectionPool::peerClosed(Peer::pointer peer, const std::string& strIp, int iPort) { - std::cerr << "ConnectionPool::peerFailed: " << strIp << " " << iPort << std::endl; - ipPort ipPeer = make_pair(strIp, iPort); + ipPort ipPeer = make_pair(strIp, iPort); + bool bScanRefresh = false; - // If the fail was our scan, we are no longer scanning. - if (bScanning && !mScanIp.compare(strIp) && mScanPort == iPort) + // If the connecttion was our scan, we are no longer scanning. + if (mScanning && mScanning == peer) { - bScanning = false; + Log(lsINFO) << "Scanning: scan fail: " << strIp << " " << iPort; - // Look for more to scan. - scanRefresh(); + mScanning = Peer::pointer(); // No longer scanning. + bScanRefresh = true; // Look for more to scan. } bool bScanSet = false; @@ -449,33 +486,47 @@ void ConnectionPool::peerFailed(const std::string& strIp, int iPort) if (itIp == mIpMap.end()) { // Did not find it. Not already connecting or connected. - std::cerr << "Internal Error: peer wasn't connected: " - << ipPeer.first << " " << ipPeer.second << std::endl; - // XXX Bad error. + Log(lsWARNING) << "Pool: Disconnect: UNEXPECTED: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; + // XXX Internal error. + } + else if (mIpMap[ipPeer] == peer) + { + // We were the identified connection. + Log(lsINFO) << "Pool: Disconnect: identified: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; + + // Delete our entry. + mIpMap.erase(itIp); + + // We want to connect again. + bScanSet = true; } else { - // Found it. Delete it. - mIpMap.erase(itIp); - - bScanSet = true; + // Found it. But, we were redundent. + Log(lsINFO) << "Pool: Disconnect: redundant: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; } } if (bScanSet) { - // Schedule for scanning. - peerScanSet(ipPeer.first, ipPeer.second); + // Since we disconnnected, try to schedule for scanning again. + bScanRefresh = peerScanSet(ipPeer.first, ipPeer.second); } + + if (bScanRefresh) + scanRefresh(); } -void ConnectionPool::peerVerified(const std::string& strIp, int iPort) +void ConnectionPool::peerVerified(Peer::pointer peer) { - if (bScanning && !mScanIp.compare(strIp), mScanPort == iPort) + if (mScanning && mScanning == peer) { + std::string strIp = peer->getIP(); + int iPort = peer->getPort(); + std::string strIpPort = str(boost::format("%s %d") % strIp % iPort); - std::cerr << str(boost::format("peerVerified: %s %s (scan off)") % mScanIp % mScanPort) << std::endl; + Log(lsINFO) << str(boost::format("Scanning: connected: %s %s (scan off)") % strIp % iPort); // Scan completed successfully. { @@ -487,8 +538,9 @@ void ConnectionPool::peerVerified(const std::string& strIp, int iPort) // XXX Check error. } - bScanning = false; - scanRefresh(); + mScanning = Peer::pointer(); + + scanRefresh(); // Continue scanning. } } @@ -511,10 +563,10 @@ void ConnectionPool::scanHandler(const boost::system::error_code& ecResult) // Scan ips as per db entries. void ConnectionPool::scanRefresh() { - if (bScanning) + if (mScanning) { // Currently scanning, will scan again after completion. - std::cerr << "scanRefresh: already scanning" << std::endl; + Log(lsTRACE) << "Scanning: already scanning"; nothing(); } @@ -527,8 +579,8 @@ void ConnectionPool::scanRefresh() int iInterval; { - ScopedLock sl(theApp->getWalletDB()->getDBLock()); - Database *db=theApp->getWalletDB()->getDB(); + ScopedLock sl(theApp->getWalletDB()->getDBLock()); + Database* db = theApp->getWalletDB()->getDB(); if (db->executeSQL("SELECT * FROM PeerIps INDEXED BY PeerScanIndex WHERE ScanNext NOT NULL ORDER BY ScanNext LIMIT 1;") && db->startIterRows()) @@ -551,7 +603,7 @@ void ConnectionPool::scanRefresh() if (tpNow.is_not_a_date_time()) { - std::cerr << "scanRefresh: no scan needed." << std::endl; + Log(lsINFO) << "Scanning: stop."; (void) mScanTimer.cancel(); } @@ -562,14 +614,13 @@ void ConnectionPool::scanRefresh() (void) mScanTimer.cancel(); - bScanning = true; - - iInterval *= 2; + // XXX iInterval *= 2; + iInterval = 0; iInterval = MAX(iInterval, theConfig.PEER_SCAN_INTERVAL_MIN); tpNext = tpNow + boost::posix_time::seconds(iInterval); - std::cerr << str(boost::format("scanRefresh: now scanning: %s %s (next %s, delay=%s)") + Log(lsTRACE) << str(boost::format("Scanning: %s %s (next %s, delay=%s)") % mScanIp % mScanPort % tpNext % iInterval) << std::endl; { @@ -583,7 +634,8 @@ void ConnectionPool::scanRefresh() // XXX Check error. } - if (!peerConnect(mScanIp, mScanPort)) + mScanning = peerConnect(mScanIp, mScanPort); + if (!mScanning) { // Already connected. Try again. scanRefresh(); @@ -591,7 +643,7 @@ void ConnectionPool::scanRefresh() } else { - std::cerr << "scanRefresh: next due: " << tpNow << std::endl; + Log(lsINFO) << "Scanning: next: " << tpNow; mScanTimer.expires_at(tpNext); mScanTimer.async_wait(boost::bind(&ConnectionPool::scanHandler, this, _1)); diff --git a/src/ConnectionPool.h b/src/ConnectionPool.h index 41a7182ea2..c71c59fef6 100644 --- a/src/ConnectionPool.h +++ b/src/ConnectionPool.h @@ -32,24 +32,24 @@ private: boost::asio::ssl::context mCtx; - bool bScanning; + Peer::pointer mScanning; boost::asio::deadline_timer mScanTimer; std::string mScanIp; int mScanPort; - void scanHandler(const boost::system::error_code& ecResult); + void scanHandler(const boost::system::error_code& ecResult); boost::asio::deadline_timer mPolicyTimer; - void policyHandler(const boost::system::error_code& ecResult); + void policyHandler(const boost::system::error_code& ecResult); // Peers we are establishing a connection with as a client. // int miConnectStarting; - bool peerAvailable(std::string& strIp, int& iPort); - void peerScanSet(const std::string& strIp, int iPort); + bool peerAvailable(std::string& strIp, int& iPort); + bool peerScanSet(const std::string& strIp, int iPort); - bool peerConnect(const std::string& strIp, int iPort); + Peer::pointer peerConnect(const std::string& strIp, int iPort); public: ConnectionPool(boost::asio::io_service& io_service); @@ -78,10 +78,10 @@ public: void peerDisconnected(Peer::pointer peer, const NewcoinAddress& naPeer); // As client accepted. - void peerVerified(const std::string& strIp, int iPort); + void peerVerified(Peer::pointer peer); // As client failed connect and be accepted. - void peerFailed(const std::string& strIp, int iPort); + void peerClosed(Peer::pointer peer, const std::string& strIp, int iPort); Json::Value getPeersJson(); std::vector getPeerVector(); diff --git a/src/Peer.cpp b/src/Peer.cpp index f90b2935a4..9839ed2f6a 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -27,14 +27,15 @@ Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx) mSocketSsl(io_service, ctx), mVerifyTimer(io_service) { + // Log(lsDEBUG) << "CREATING PEER: " << ADDRESS(this); } void Peer::handle_write(const boost::system::error_code& error, size_t bytes_transferred) { + if (error) + Log(lsINFO) << "Peer: Write: Error: " << ADDRESS(this) << ": bytes=" << bytes_transferred << ": " << error.category().name() << ": " << error.message() << ": " << error; #ifdef DEBUG - if(error) - std::cerr << "Peer::handle_write Error: " << error << " bytes: " << bytes_transferred << std::endl; -// else +// if (!error) // std::cerr << "Peer::handle_write bytes: "<< bytes_transferred << std::endl; #endif @@ -57,14 +58,21 @@ void Peer::handle_write(const boost::system::error_code& error, size_t bytes_tra } } +void Peer::setIpPort(const std::string& strIP, int iPort) +{ + mIpPort = make_pair(strIP, iPort); + + Log(lsDEBUG) << "Peer: Set: " + << ADDRESS(this) << "> " + << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort(); +} + void Peer::detach(const char *rsn) { -#ifdef DEBUG - Log(lsTRACE) << "DETACHING PEER: " << rsn - << ": " - << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") - << " " << getIP() << " " << getPort() << std::endl; -#endif + Log(lsDEBUG) << "Peer: Detach: " + << ADDRESS(this) << "> " + << rsn << ": " + << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort(); boost::system::error_code ecCancel; @@ -72,6 +80,12 @@ void Peer::detach(const char *rsn) mSendQ.clear(); + // We may close more than once. + boost::system::error_code ecShutdown; + getSocket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ecShutdown); + + getSocket().close(); + if (mNodePublic.isValid()) { theApp->getConnectionPool().peerDisconnected(shared_from_this(), mNodePublic); @@ -82,11 +96,16 @@ void Peer::detach(const char *rsn) if (!mIpPort.first.empty()) { // Connection might be part of scanning. Inform connect failed. - // Might need to scan. Inform connection disconnected. - theApp->getConnectionPool().peerFailed(mIpPort.first, mIpPort.second); + // Might need to scan. Inform connection closed. + theApp->getConnectionPool().peerClosed(shared_from_this(), mIpPort.first, mIpPort.second); - mIpPort.first.empty(); // Be idompotent. + mIpPort.first.clear(); // Be idompotent. } + + Log(lsDEBUG) << "Peer: Detach: " + << ADDRESS(this) << "< " + << rsn << ": " + << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort(); } void Peer::handleVerifyTimer(const boost::system::error_code& ecResult) @@ -100,14 +119,15 @@ void Peer::handleVerifyTimer(const boost::system::error_code& ecResult) } else if (ecResult) { - std::cerr << "Peer verify timer error: " << std::endl; + Log(lsINFO) << "Peer verify timer error"; // Can't do anything sound. abort(); } else { - std::cerr << "Peer failed to verify in time." << std::endl; + Log(lsINFO) << "Peer: Verify: Peer failed to verify in time."; + detach("hvt"); } } @@ -120,8 +140,8 @@ void Peer::connect(const std::string strIp, int iPort) mClientConnect = true; - std::cerr << "Peer::connect: " << strIp << " " << iPort << std::endl; - mIpPort = make_pair(strIp, iPort); + mIpPort = make_pair(strIp, iPort); + mIpPortConnect = mIpPort; assert(!mIpPort.first.empty()); boost::asio::ip::tcp::resolver::query query(strIp, boost::lexical_cast(iPortAct), @@ -132,7 +152,7 @@ void Peer::connect(const std::string strIp, int iPort) if (err || itrEndpoint == boost::asio::ip::tcp::resolver::iterator()) { - std::cerr << "Peer::connect: Bad IP" << std::endl; + Log(lsWARNING) << "Peer: Connect: Bad IP: " << strIp; detach("c"); return; } @@ -143,7 +163,7 @@ void Peer::connect(const std::string strIp, int iPort) if (err) { - std::cerr << "Peer::connect: Failed to set timer." << std::endl; + Log(lsWARNING) << "Peer: Connect: Failed to set timer."; detach("c2"); return; } @@ -151,10 +171,10 @@ void Peer::connect(const std::string strIp, int iPort) if (!err) { - std::cerr << "Peer::connect: Connecting: " << mIpPort.first << " " << mIpPort.second << std::endl; + Log(lsINFO) << "Peer: Connect: Outbound: " << ADDRESS(this) << ": " << mIpPort.first << " " << mIpPort.second; boost::asio::async_connect( - mSocketSsl.lowest_layer(), + getSocket(), itrEndpoint, boost::bind( &Peer::handleConnect, @@ -172,7 +192,7 @@ void Peer::handleStart(const boost::system::error_code& error) { if (error) { - std::cerr << "Peer::handleStart: failed:" << error << std::endl; + Log(lsINFO) << "Peer: Handshake: Error: " << error.category().name() << ": " << error.message() << ": " << error; detach("hs"); } else @@ -187,7 +207,7 @@ void Peer::handleConnect(const boost::system::error_code& error, boost::asio::ip { if (error) { - std::cerr << "Connect peer: failed:" << error << std::endl; + Log(lsINFO) << "Peer: Connect: Error: " << error.category().name() << ": " << error.message() << ": " << error; detach("hc"); } else @@ -205,25 +225,26 @@ void Peer::handleConnect(const boost::system::error_code& error, boost::asio::ip // - We don't bother remembering the inbound IP or port. Only useful for debugging. void Peer::connected(const boost::system::error_code& error) { - boost::asio::ip::tcp::endpoint ep = mSocketSsl.lowest_layer().remote_endpoint(); + boost::asio::ip::tcp::endpoint ep = getSocket().remote_endpoint(); int iPort = ep.port(); std::string strIp = ep.address().to_string(); mClientConnect = false; + mIpPortConnect = make_pair(strIp, iPort); if (iPort == SYSTEM_PEER_PORT) //TODO: Why are you doing this? iPort = -1; if (error) { - std::cerr << "Remote peer: accept error: " << strIp << " " << iPort << " : " << error << std::endl; + Log(lsINFO) << "Peer: Inbound: Error: " << ADDRESS(this) << ": " << strIp << " " << iPort << " : " << error.category().name() << ": " << error.message() << ": " << error; detach("ctd"); } else { - // Not redundant ip and port, add to connection list. + // Not redundant ip and port, handshake, and start. - std::cerr << "Remote peer: accepted: " << strIp << " " << iPort << std::endl; + Log(lsINFO) << "Peer: Inbound: Accepted: " << ADDRESS(this) << ": " << strIp << " " << iPort; mSocketSsl.set_verify_mode(boost::asio::ssl::verify_none); @@ -290,8 +311,8 @@ void Peer::handle_read_header(const boost::system::error_code& error) } else { + Log(lsINFO) << "Peer: Header: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error; detach("hrh2"); - std::cerr << "Peer::handle_read_header: Error: " << error << std::endl; } } @@ -304,8 +325,8 @@ void Peer::handle_read_body(const boost::system::error_code& error) } else { + Log(lsINFO) << "Peer: Body: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error; detach("hrb"); - std::cerr << "Peer::handle_read_body: Error: " << error << std::endl; } } @@ -517,27 +538,25 @@ void Peer::processReadBuffer() void Peer::recvHello(newcoin::TMHello& packet) { #ifdef DEBUG - std::cerr << "Recv(Hello) v=" << packet.version() - << ", index=" << packet.ledgerindex() - << std::endl; + Log(lsINFO) << "Recv(Hello) v=" << packet.version() << ", index=" << packet.ledgerindex(); #endif bool bDetach = true; if (!mNodePublic.setNodePublic(packet.nodepublic())) { - std::cerr << "Recv(Hello): Disconnect: Bad node public key." << std::endl; + Log(lsINFO) << "Recv(Hello): Disconnect: Bad node public key."; } else if (!mNodePublic.verifyNodePublic(mCookieHash, packet.nodeproof())) { // Unable to verify they have private key for claimed public key. - std::cerr << "Recv(Hello): Disconnect: Failed to verify session." << std::endl; + Log(lsINFO) << "Recv(Hello): Disconnect: Failed to verify session."; } else if (!theApp->getConnectionPool().peerConnected(shared_from_this(), mNodePublic, getIP(), getPort())) { // Already connected, self, or some other reason. - std::cerr << "Recv(Hello): Disconnect: Extraneous connection." << std::endl; + Log(lsINFO) << "Recv(Hello): Disconnect: Extraneous connection."; } else { // Successful connection. - std::cerr << "Recv(Hello): Connect: " << mNodePublic.humanNodePublic() << std::endl; + Log(lsINFO) << "Recv(Hello): Connect: " << mNodePublic.humanNodePublic(); // Cancel verification timeout. (void) mVerifyTimer.cancel(); @@ -545,16 +564,18 @@ void Peer::recvHello(newcoin::TMHello& packet) if (mClientConnect) { // If we connected due to scan, no longer need to scan. - theApp->getConnectionPool().peerVerified(mIpPort.first, mIpPort.second); + theApp->getConnectionPool().peerVerified(shared_from_this()); // No longer connecting as client. mClientConnect = false; } else { - // At this point we could add the inbound connection to our IP list. However, the inbound IP address might be that of - // a NAT. It would be best to only add it if and only if we can immediately verify it. - nothing(); + // Take a guess at remotes address. + std::string strIP = getSocket().remote_endpoint().address().to_string(); + int iPort = packet.ipv4port(); + + theApp->getConnectionPool().savePeer(strIP, iPort, UniqueNodeList::vsInbound); } // Consider us connected. No longer accepting mtHELLO. @@ -580,8 +601,10 @@ void Peer::recvHello(newcoin::TMHello& packet) mNodePublic.clear(); detach("recvh"); } - - sendGetPeers(); + else + { + sendGetPeers(); + } } void Peer::recvTransaction(newcoin::TMTransaction& packet) @@ -714,27 +737,27 @@ void Peer::recvGetPeers(newcoin::TMGetPeers& packet) { std::vector addrs; - theApp->getConnectionPool().getTopNAddrs(30,addrs); + theApp->getConnectionPool().getTopNAddrs(30, addrs); - if (addrs.size()) + if (!addrs.empty()) { newcoin::TMPeers peers; - for(int n=0; nset_ipv4(inet_addr(strIP.c_str())); - addr->set_ipv4port(port); + addr->set_ipv4port(iPort); - std::cout << "Teaching about: " << strIP << std::endl; + Log(lsINFO) << "Peer: Teaching: " << ADDRESS(this) << ": " << n << ": " << strIP << " " << iPort; } - PackedMessage::pointer message = boost::make_shared(peers, newcoin::mtPEERS); sendPacket(message); } @@ -746,20 +769,17 @@ void Peer::recvPeers(newcoin::TMPeers& packet) for (int i = 0; i < packet.nodes().size(); ++i) { in_addr addr; - addr.s_addr=packet.nodes(i).ipv4(); - std::string strIP(inet_ntoa(addr)); - int port=packet.nodes(i).ipv4port(); - if (strIP == "0.0.0.0") + addr.s_addr = packet.nodes(i).ipv4(); + + std::string strIP(inet_ntoa(addr)); + int iPort = packet.nodes(i).ipv4port(); + + if (strIP != "0.0.0.0" && strIP != "127.0.0.1") { - strIP = mSocketSsl.lowest_layer().remote_endpoint().address().to_string(); - } + Log(lsINFO) << "Peer: Learning: " << ADDRESS(this) << ": " << i << ": " << strIP << " " << iPort; - // if (strIP != "127.0.0.1") - { - std::cout << "Learning about: " << strIP << std::endl; - - theApp->getConnectionPool().savePeer(strIP, port, 'T'); + theApp->getConnectionPool().savePeer(strIP, iPort, UniqueNodeList::vsTold); } } } @@ -1059,12 +1079,14 @@ void Peer::sendGetPeers() { // get other peers this guy knows about newcoin::TMGetPeers getPeers; + getPeers.set_doweneedthis(1); + PackedMessage::pointer packet = boost::make_shared(getPeers, newcoin::mtGET_PEERS); + sendPacket(packet); } - void Peer::punishPeer(PeerPunish) { } @@ -1072,9 +1094,16 @@ void Peer::punishPeer(PeerPunish) Json::Value Peer::getJson() { Json::Value ret(Json::objectValue); - ret["ip"] = mIpPort.first; - ret["port"] = mIpPort.second; + ret["this"] = ADDRESS(this); ret["public_key"] = mNodePublic.ToString(); + ret["ip"] = mIpPortConnect.first; + ret["port"] = mIpPortConnect.second; + + if (!mIpPort.first.empty()) + { + ret["verified_ip"] = mIpPort.first; + ret["verified_port"] = mIpPort.second; + } return ret; } diff --git a/src/Peer.h b/src/Peer.h index 99c0204af2..e707ec28ce 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -34,6 +34,7 @@ private: bool mConnected; // True, if hello accepted. NewcoinAddress mNodePublic; // Node public key of peer. ipPort mIpPort; + ipPort mIpPortConnect; uint256 mCookieHash; // network state information @@ -99,7 +100,7 @@ public: std::string& getIP() { return mIpPort.first; } int getPort() { return mIpPort.second; } - void peerIpPort(const std::string& strIP, int iPort) { mIpPort = make_pair(strIP, iPort); } + void setIpPort(const std::string& strIP, int iPort); static pointer create(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx) { From 94bc059f87318337e8dfa20f2ca675a0c22c6271 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Wed, 20 Jun 2012 13:40:10 -0700 Subject: [PATCH 7/9] Fix some mishandling of genesis block close timing. --- src/Ledger.cpp | 10 ++++++++-- src/Ledger.h | 2 ++ src/LedgerConsensus.cpp | 1 + src/ValidationCollection.cpp | 2 +- 4 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/Ledger.cpp b/src/Ledger.cpp index 81e610c33d..5549c23433 100644 --- a/src/Ledger.cpp +++ b/src/Ledger.cpp @@ -463,12 +463,18 @@ void Ledger::setCloseTime(boost::posix_time::ptime ptm) mCloseTime = iToSeconds(ptm); } +uint64 Ledger::sGenesisClose = 0; + uint64 Ledger::getNextLedgerClose() const { if (mCloseTime == 0) { - uint64 closeTime = theApp->getOPs().getNetworkTimeNC() + mLedgerInterval - 1; - return closeTime - (closeTime % mLedgerInterval); + if (sGenesisClose == 0) + { + uint64 closeTime = theApp->getOPs().getNetworkTimeNC() + mLedgerInterval - 1; + sGenesisClose = closeTime - (closeTime % mLedgerInterval); + } + return sGenesisClose; } return mCloseTime + mLedgerInterval; } diff --git a/src/Ledger.h b/src/Ledger.h index 1f9c8b6c60..598c148b2d 100644 --- a/src/Ledger.h +++ b/src/Ledger.h @@ -67,6 +67,8 @@ private: uint16 mLedgerInterval; bool mClosed, mValidHash, mAccepted, mImmutable; + static uint64 sGenesisClose; + SHAMap::pointer mTransactionMap, mAccountStateMap; mutable boost::recursive_mutex mLock; diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index fc6a798c0e..30c1b404f1 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -824,6 +824,7 @@ void LedgerConsensus::accept(SHAMap::pointer set) if (mValidating) { + assert (theApp->getOPs().getNetworkTimeNC() > newLCL->getCloseTimeNC()); SerializedValidation::pointer v = boost::make_shared (newLCLHash, newLCL->getCloseTimeNC(), mOurPosition->peekSeed(), true); v->setTrusted(); diff --git a/src/ValidationCollection.cpp b/src/ValidationCollection.cpp index b30ea4a00f..59b725f0d7 100644 --- a/src/ValidationCollection.cpp +++ b/src/ValidationCollection.cpp @@ -15,7 +15,7 @@ bool ValidationCollection::addValidation(SerializedValidation::pointer val) if ((now > valClose) && (now < (valClose + 2 * LEDGER_INTERVAL))) isTrusted = true; else - Log(lsWARNING) << "Received stale validation"; + Log(lsWARNING) << "Received stale validation now=" << now << ", close=" << valClose; } uint256 hash = val->getLedgerHash(); From 40912499c71a4ec29690700b2dfafbae77e62fe7 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Wed, 20 Jun 2012 13:52:29 -0700 Subject: [PATCH 8/9] Properly set validations to trusted. --- src/ValidationCollection.cpp | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/ValidationCollection.cpp b/src/ValidationCollection.cpp index 59b725f0d7..5061d0d131 100644 --- a/src/ValidationCollection.cpp +++ b/src/ValidationCollection.cpp @@ -7,15 +7,16 @@ bool ValidationCollection::addValidation(SerializedValidation::pointer val) { - bool isTrusted = false; + bool isCurrent = false; if (theApp->getUNL().nodeInUNL(val->getSignerPublic())) { + val->setTrusted(); uint64 now = theApp->getOPs().getNetworkTimeNC(); uint64 valClose = val->getCloseTime(); if ((now > valClose) && (now < (valClose + 2 * LEDGER_INTERVAL))) - isTrusted = true; + isCurrent = true; else - Log(lsWARNING) << "Received stale validation now=" << now << ", close=" << valClose; + Log(lsWARNING) << "Received stale validation now=" << now << ", close=" << valClose; } uint256 hash = val->getLedgerHash(); @@ -25,7 +26,7 @@ bool ValidationCollection::addValidation(SerializedValidation::pointer val) boost::mutex::scoped_lock sl(mValidationLock); if (!mValidations[hash].insert(std::make_pair(node, val)).second) return false; - if (isTrusted) + if (isCurrent) { boost::unordered_map::iterator it = mCurrentValidations.find(node); if ((it == mCurrentValidations.end()) || (val->getCloseTime() >= it->second->getCloseTime())) @@ -33,8 +34,8 @@ bool ValidationCollection::addValidation(SerializedValidation::pointer val) } } - Log(lsINFO) << "Val for " << hash.GetHex() << " from " << node.GetHex() << " added " << - (val->isTrusted() ? "trusted" : "UNtrusted"); + Log(lsINFO) << "Val for " << hash.GetHex() << " from " << val->getSignerPublic().humanNodePublic() + << " added " << (val->isTrusted() ? "trusted" : "UNtrusted"); return true; } From efa38ea72b5d7ec0c3b3b61e8013c130678f4ec6 Mon Sep 17 00:00:00 2001 From: Arthur Britto Date: Wed, 20 Jun 2012 15:52:59 -0700 Subject: [PATCH 9/9] More clean up of connection logic. --- src/ConnectionPool.cpp | 69 ++++++++++------- src/Peer.cpp | 171 ++++++++++++++++++++++------------------- src/Peer.h | 1 + 3 files changed, 134 insertions(+), 107 deletions(-) diff --git a/src/ConnectionPool.cpp b/src/ConnectionPool.cpp index ed7df37439..a75220930f 100644 --- a/src/ConnectionPool.cpp +++ b/src/ConnectionPool.cpp @@ -69,6 +69,8 @@ bool ConnectionPool::getTopNAddrs(int n,std::vector& addrs) bool ConnectionPool::savePeer(const std::string& strIp, int iPort, char code) { + bool bNew = false; + Database* db = theApp->getWalletDB()->getDB(); std::string ipPort = sqlEscape(str(boost::format("%s %d") % strIp % iPort)); @@ -77,10 +79,10 @@ bool ConnectionPool::savePeer(const std::string& strIp, int iPort, char code) std::string sql = str(boost::format("SELECT COUNT(*) FROM PeerIps WHERE IpPort=%s;") % ipPort); if (db->executeSQL(sql) && db->startIterRows()) { - if ( db->getInt(0)==0) + if (!db->getInt(0)) { db->executeSQL(str(boost::format("INSERT INTO PeerIps (IpPort,Score,Source) values (%s,0,'%c');") % ipPort % code)); - return true; + bNew = true; }// else we already had this peer } else @@ -88,7 +90,10 @@ bool ConnectionPool::savePeer(const std::string& strIp, int iPort, char code) std::cout << "Error saving Peer" << std::endl; } - return false; + if (bNew) + scanRefresh(); + + return bNew; } // An available peer is one we had no trouble connect to last time and that we are not currently knowingly connected or connecting @@ -125,7 +130,7 @@ bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort) % strJoin(vstrIpPort.begin(), vstrIpPort.end(), ","))) && db->startIterRows()) { - db->getStr("IpPort", strIpPort); + strIpPort = db->getStrBinary("IpPort"); } } @@ -147,6 +152,8 @@ void ConnectionPool::policyLowWater() if (mConnectedMap.size() > theConfig.PEER_CONNECT_LOW_WATER) { // Above low water mark, don't need more connections. + Log(lsTRACE) << "Pool: Low water: sufficient connections: " << mConnectedMap.size() << "/" << theConfig.PEER_CONNECT_LOW_WATER; + nothing(); } #if 0 @@ -159,14 +166,18 @@ void ConnectionPool::policyLowWater() else if (!peerAvailable(strIp, iPort)) { // No more connections available to start. + Log(lsTRACE) << "Pool: Low water: no peers available."; + // XXX Might ask peers for more ips. nothing(); } else { // Try to start connection. + Log(lsTRACE) << "Pool: Low water: start connection."; + if (!peerConnect(strIp, iPort)) - Log(lsINFO) << "policyLowWater was already connected."; + Log(lsINFO) << "Pool: Low water: already connected."; // Check if we need more. policyLowWater(); @@ -431,7 +442,7 @@ bool ConnectionPool::peerScanSet(const std::string& strIp, int iPort) boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time(); boost::posix_time::ptime tpNext = tpNow + boost::posix_time::seconds(iInterval); - Log(lsINFO) << str(boost::format("Scanning: schedule create: %s %s (next %s, delay=%s)") + Log(lsINFO) << str(boost::format("Pool: Scan: schedule create: %s %s (next %s, delay=%s)") % mScanIp % mScanPort % tpNext % iInterval); db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IpPort=%s;") @@ -443,18 +454,18 @@ bool ConnectionPool::peerScanSet(const std::string& strIp, int iPort) } else { - // Scanning connection terminated, already scheduled for retry. + // Scan connection terminated, already scheduled for retry. boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time(); boost::posix_time::ptime tpNext = ptFromSeconds(db->getInt("ScanNext")); int iInterval = (tpNext-tpNow).seconds(); - Log(lsINFO) << str(boost::format("Scanning: schedule exists: %s %s (next %s, delay=%s)") + Log(lsINFO) << str(boost::format("Pool: Scan: schedule exists: %s %s (next %s, delay=%s)") % mScanIp % mScanPort % tpNext % iInterval); } } else { - Log(lsWARNING) << "Scanning: peer wasn't in PeerIps: " << strIp << " " << iPort; + Log(lsWARNING) << "Pool: Scan: peer wasn't in PeerIps: " << strIp << " " << iPort; } return bScanDirty; @@ -466,17 +477,17 @@ void ConnectionPool::peerClosed(Peer::pointer peer, const std::string& strIp, in ipPort ipPeer = make_pair(strIp, iPort); bool bScanRefresh = false; - // If the connecttion was our scan, we are no longer scanning. + // If the connection was our scan, we are no longer scanning. if (mScanning && mScanning == peer) { - Log(lsINFO) << "Scanning: scan fail: " << strIp << " " << iPort; + Log(lsINFO) << "Pool: Scan: scan fail: " << strIp << " " << iPort; mScanning = Peer::pointer(); // No longer scanning. bScanRefresh = true; // Look for more to scan. } - bool bScanSet = false; - + // Determine if closed peer was redundant. + bool bRedundant = true; { boost::mutex::scoped_lock sl(mPeerLock); boost::unordered_map::iterator itIp; @@ -486,31 +497,30 @@ void ConnectionPool::peerClosed(Peer::pointer peer, const std::string& strIp, in if (itIp == mIpMap.end()) { // Did not find it. Not already connecting or connected. - Log(lsWARNING) << "Pool: Disconnect: UNEXPECTED: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; + Log(lsWARNING) << "Pool: Closed: UNEXPECTED: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; // XXX Internal error. } else if (mIpMap[ipPeer] == peer) { // We were the identified connection. - Log(lsINFO) << "Pool: Disconnect: identified: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; + Log(lsINFO) << "Pool: Closed: identified: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; // Delete our entry. mIpMap.erase(itIp); - // We want to connect again. - bScanSet = true; + bRedundant = false; } else { // Found it. But, we were redundent. - Log(lsINFO) << "Pool: Disconnect: redundant: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; + Log(lsINFO) << "Pool: Closed: redundant: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; } } - if (bScanSet) + if (!bRedundant) { - // Since we disconnnected, try to schedule for scanning again. - bScanRefresh = peerScanSet(ipPeer.first, ipPeer.second); + // If closed was not redundant schedule if not already scheduled. + bScanRefresh = peerScanSet(ipPeer.first, ipPeer.second) || bScanRefresh; } if (bScanRefresh) @@ -526,7 +536,7 @@ void ConnectionPool::peerVerified(Peer::pointer peer) std::string strIpPort = str(boost::format("%s %d") % strIp % iPort); - Log(lsINFO) << str(boost::format("Scanning: connected: %s %s (scan off)") % strIp % iPort); + Log(lsINFO) << str(boost::format("Pool: Scan: connected: %s %s %s (scan off)") % ADDRESS_SHARED(peer) % strIp % iPort); // Scan completed successfully. { @@ -566,7 +576,7 @@ void ConnectionPool::scanRefresh() if (mScanning) { // Currently scanning, will scan again after completion. - Log(lsTRACE) << "Scanning: already scanning"; + Log(lsTRACE) << "Pool: Scan: already scanning"; nothing(); } @@ -603,7 +613,7 @@ void ConnectionPool::scanRefresh() if (tpNow.is_not_a_date_time()) { - Log(lsINFO) << "Scanning: stop."; + Log(lsINFO) << "Pool: Scan: stop."; (void) mScanTimer.cancel(); } @@ -614,14 +624,14 @@ void ConnectionPool::scanRefresh() (void) mScanTimer.cancel(); - // XXX iInterval *= 2; - iInterval = 0; iInterval = MAX(iInterval, theConfig.PEER_SCAN_INTERVAL_MIN); tpNext = tpNow + boost::posix_time::seconds(iInterval); - Log(lsTRACE) << str(boost::format("Scanning: %s %s (next %s, delay=%s)") - % mScanIp % mScanPort % tpNext % iInterval) << std::endl; + iInterval *= 2; + + Log(lsINFO) << str(boost::format("Pool: Scan: Now: %s %s (next %s, delay=%s)") + % mScanIp % mScanPort % tpNext % iInterval); { ScopedLock sl(theApp->getWalletDB()->getDBLock()); @@ -643,7 +653,8 @@ void ConnectionPool::scanRefresh() } else { - Log(lsINFO) << "Scanning: next: " << tpNow; + Log(lsINFO) << str(boost::format("Pool: Scan: Next: %s (next %s, delay=%s)") + % strIpPort % tpNext % (tpNext-tpNow).seconds()); mScanTimer.expires_at(tpNext); mScanTimer.async_wait(boost::bind(&ConnectionPool::scanHandler, this, _1)); diff --git a/src/Peer.cpp b/src/Peer.cpp index fd18cf6090..da00a437c2 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -24,6 +24,7 @@ Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx) : mConnected(false), + mDetaching(false), mSocketSsl(io_service, ctx), mVerifyTimer(io_service) { @@ -32,8 +33,6 @@ Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx) void Peer::handle_write(const boost::system::error_code& error, size_t bytes_transferred) { - if (error) - Log(lsINFO) << "Peer: Write: Error: " << ADDRESS(this) << ": bytes=" << bytes_transferred << ": " << error.category().name() << ": " << error.message() << ": " << error; #ifdef DEBUG // if (!error) // std::cerr << "Peer::handle_write bytes: "<< bytes_transferred << std::endl; @@ -43,11 +42,14 @@ void Peer::handle_write(const boost::system::error_code& error, size_t bytes_tra if (error) { - detach("hw"); - return; - } + if (!mDetaching) + { + Log(lsINFO) << "Peer: Write: Error: " << ADDRESS(this) << ": bytes=" << bytes_transferred << ": " << error.category().name() << ": " << error.message() << ": " << error; - if (!mSendQ.empty()) + detach("hw"); + } + + } else if (!mSendQ.empty()) { PackedMessage::pointer packet = mSendQ.front(); if(packet) @@ -69,43 +71,49 @@ void Peer::setIpPort(const std::string& strIP, int iPort) void Peer::detach(const char *rsn) { - Log(lsDEBUG) << "Peer: Detach: " - << ADDRESS(this) << "> " - << rsn << ": " - << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort(); - boost::system::error_code ecCancel; - - (void) mVerifyTimer.cancel(); - - mSendQ.clear(); - - // We may close more than once. - boost::system::error_code ecShutdown; - getSocket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ecShutdown); - - getSocket().close(); - - if (mNodePublic.isValid()) + if (!mDetaching) { - theApp->getConnectionPool().peerDisconnected(shared_from_this(), mNodePublic); + mDetaching = true; // Race is ok. - mNodePublic.clear(); // Be idompotent. + Log(lsDEBUG) << "Peer: Detach: " + << ADDRESS(this) << "> " + << rsn << ": " + << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort(); + + boost::system::error_code ecCancel; + + (void) mVerifyTimer.cancel(); + + mSendQ.clear(); + + // We may close more than once. + boost::system::error_code ecShutdown; + getSocket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ecShutdown); + + getSocket().close(); + + if (mNodePublic.isValid()) + { + theApp->getConnectionPool().peerDisconnected(shared_from_this(), mNodePublic); + + mNodePublic.clear(); // Be idompotent. + } + + if (!mIpPort.first.empty()) + { + // Connection might be part of scanning. Inform connect failed. + // Might need to scan. Inform connection closed. + theApp->getConnectionPool().peerClosed(shared_from_this(), mIpPort.first, mIpPort.second); + + mIpPort.first.clear(); // Be idompotent. + } + + Log(lsDEBUG) << "Peer: Detach: " + << ADDRESS(this) << "< " + << rsn << ": " + << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort(); } - - if (!mIpPort.first.empty()) - { - // Connection might be part of scanning. Inform connect failed. - // Might need to scan. Inform connection closed. - theApp->getConnectionPool().peerClosed(shared_from_this(), mIpPort.first, mIpPort.second); - - mIpPort.first.clear(); // Be idompotent. - } - - Log(lsDEBUG) << "Peer: Detach: " - << ADDRESS(this) << "< " - << rsn << ": " - << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort(); } void Peer::handleVerifyTimer(const boost::system::error_code& ecResult) @@ -235,12 +243,7 @@ void Peer::connected(const boost::system::error_code& error) if (iPort == SYSTEM_PEER_PORT) //TODO: Why are you doing this? iPort = -1; - if (error) - { - Log(lsINFO) << "Peer: Inbound: Error: " << ADDRESS(this) << ": " << strIp << " " << iPort << " : " << error.category().name() << ": " << error.message() << ": " << error; - detach("ctd"); - } - else + if (!error) { // Not redundant ip and port, handshake, and start. @@ -251,6 +254,12 @@ void Peer::connected(const boost::system::error_code& error) mSocketSsl.async_handshake(boost::asio::ssl::stream::server, boost::bind(&Peer::handleStart, shared_from_this(), boost::asio::placeholders::error)); } + else if (!mDetaching) + { + Log(lsINFO) << "Peer: Inbound: Error: " << ADDRESS(this) << ": " << strIp << " " << iPort << " : " << error.category().name() << ": " << error.message() << ": " << error; + + detach("ctd"); + } } void Peer::sendPacketForce(PackedMessage::pointer packet) @@ -309,7 +318,7 @@ void Peer::handle_read_header(const boost::system::error_code& error) } start_read_body(msg_len); } - else + else if (!mDetaching) { Log(lsINFO) << "Peer: Header: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error; detach("hrh2"); @@ -323,7 +332,7 @@ void Peer::handle_read_body(const boost::system::error_code& error) processReadBuffer(); start_read_header(); } - else + else if (!mDetaching) { Log(lsINFO) << "Peer: Body: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error; detach("hrb"); @@ -342,7 +351,7 @@ void Peer::processReadBuffer() // If connected and get a mtHELLO or if not connected and get a non-mtHELLO, wrong message was sent. if (mConnected == (type == newcoin::mtHELLO)) { - std::cerr << "Wrong message type: " << type << std::endl; + Log(lsWARNING) << "Wrong message type: " << type; detach("prb1"); } else @@ -542,6 +551,9 @@ void Peer::recvHello(newcoin::TMHello& packet) #endif bool bDetach = true; + // Cancel verification timeout. + (void) mVerifyTimer.cancel(); + if (!mNodePublic.setNodePublic(packet.nodepublic())) { Log(lsINFO) << "Recv(Hello): Disconnect: Bad node public key."; @@ -550,50 +562,53 @@ void Peer::recvHello(newcoin::TMHello& packet) { // Unable to verify they have private key for claimed public key. Log(lsINFO) << "Recv(Hello): Disconnect: Failed to verify session."; } - else if (!theApp->getConnectionPool().peerConnected(shared_from_this(), mNodePublic, getIP(), getPort())) - { // Already connected, self, or some other reason. - Log(lsINFO) << "Recv(Hello): Disconnect: Extraneous connection."; - } else { // Successful connection. Log(lsINFO) << "Recv(Hello): Connect: " << mNodePublic.humanNodePublic(); - // Cancel verification timeout. - (void) mVerifyTimer.cancel(); - if (mClientConnect) { // If we connected due to scan, no longer need to scan. theApp->getConnectionPool().peerVerified(shared_from_this()); + } - // No longer connecting as client. - mClientConnect = false; + if (!theApp->getConnectionPool().peerConnected(shared_from_this(), mNodePublic, getIP(), getPort())) + { // Already connected, self, or some other reason. + Log(lsINFO) << "Recv(Hello): Disconnect: Extraneous connection."; } else { - // Take a guess at remotes address. - std::string strIP = getSocket().remote_endpoint().address().to_string(); - int iPort = packet.ipv4port(); + if (mClientConnect) + { + // No longer connecting as client. + mClientConnect = false; + } + else + { + // Take a guess at remotes address. + std::string strIP = getSocket().remote_endpoint().address().to_string(); + int iPort = packet.ipv4port(); - theApp->getConnectionPool().savePeer(strIP, iPort, UniqueNodeList::vsInbound); + theApp->getConnectionPool().savePeer(strIP, iPort, UniqueNodeList::vsInbound); + } + + // Consider us connected. No longer accepting mtHELLO. + mConnected = true; + + // XXX Set timer: connection is in grace period to be useful. + // XXX Set timer: connection idle (idle may vary depending on connection type.) + + if ((packet.has_closedledger()) && (packet.closedledger().size() == (256 / 8))) + { + memcpy(mClosedLedgerHash.begin(), packet.closedledger().data(), 256 / 8); + if ((packet.has_previousledger()) && (packet.previousledger().size() == (256 / 8))) + memcpy(mPreviousLedgerHash.begin(), packet.previousledger().data(), 256 / 8); + else mPreviousLedgerHash.zero(); + mClosedLedgerTime = boost::posix_time::second_clock::universal_time(); + } + + bDetach = false; } - - // Consider us connected. No longer accepting mtHELLO. - mConnected = true; - - // XXX Set timer: connection is in grace period to be useful. - // XXX Set timer: connection idle (idle may vary depending on connection type.) - - if ((packet.has_closedledger()) && (packet.closedledger().size() == (256 / 8))) - { - memcpy(mClosedLedgerHash.begin(), packet.closedledger().data(), 256 / 8); - if ((packet.has_previousledger()) && (packet.previousledger().size() == (256 / 8))) - memcpy(mPreviousLedgerHash.begin(), packet.previousledger().data(), 256 / 8); - else mPreviousLedgerHash.zero(); - mClosedLedgerTime = boost::posix_time::second_clock::universal_time(); - } - - bDetach = false; } if (bDetach) diff --git a/src/Peer.h b/src/Peer.h index 6d38399ec7..f4ecc82d30 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -32,6 +32,7 @@ public: private: bool mClientConnect; // In process of connecting as client. bool mConnected; // True, if hello accepted. + bool mDetaching; // True, if detaching. NewcoinAddress mNodePublic; // Node public key of peer. ipPort mIpPort; ipPort mIpPortConnect;