From eb57573f9aba961f55458ab1342c48c139f86029 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Wed, 5 Jun 2013 10:58:23 -0700 Subject: [PATCH] Refactor ConnectionPool into IPeers and hide the implementation Conflicts: src/cpp/ripple/Application.h --- modules/ripple_data/ripple_data.h | 4 +- modules/ripple_main/ripple_main.cpp | 7 +- newcoin.vcxproj | 4 +- newcoin.vcxproj.filters | 12 +- src/cpp/ripple/Application.cpp | 6 +- src/cpp/ripple/Application.h | 116 ++++----- src/cpp/ripple/ConnectionPool.h | 121 --------- src/cpp/ripple/LedgerAcquire.cpp | 8 +- src/cpp/ripple/LedgerConsensus.cpp | 16 +- src/cpp/ripple/LedgerMaster.cpp | 2 +- src/cpp/ripple/NetworkOPs.cpp | 18 +- src/cpp/ripple/PeerDoor.cpp | 7 +- src/cpp/ripple/RPCHandler.cpp | 4 +- src/cpp/ripple/TransactionAcquire.cpp | 2 +- src/cpp/ripple/ripple_IPeers.h | 75 ++++++ src/cpp/ripple/ripple_Peer.cpp | 24 +- .../{ConnectionPool.cpp => ripple_Peers.cpp} | 245 +++++++++++++----- src/cpp/ripple/ripple_UniqueNodeList.cpp | 2 +- 18 files changed, 363 insertions(+), 310 deletions(-) delete mode 100644 src/cpp/ripple/ConnectionPool.h create mode 100644 src/cpp/ripple/ripple_IPeers.h rename src/cpp/ripple/{ConnectionPool.cpp => ripple_Peers.cpp} (65%) diff --git a/modules/ripple_data/ripple_data.h b/modules/ripple_data/ripple_data.h index a41a74bc68..2d8a5a38d0 100644 --- a/modules/ripple_data/ripple_data.h +++ b/modules/ripple_data/ripple_data.h @@ -57,13 +57,13 @@ // VFALCO: TODO, try to reduce these dependencies #include "../ripple_basics/ripple_basics.h" -// VFALCO: TODO, resolve the location of this file +// VFALCO: TODO, figure out a good place for this file, perhaps give it some +// additional hierarchy via directories. #include "ripple.pb.h" #include "crypto/ripple_CBigNum.h" #include "crypto/ripple_Base58.h" // VFALCO: TODO, Can be moved to .cpp if we clean up setAlphabet stuff #include "crypto/ripple_Base58Data.h" -// #include "src/cpp/ripple/ProofOfWork.h" #include "protocol/ripple_FieldNames.h" #include "protocol/ripple_PackedMessage.h" diff --git a/modules/ripple_main/ripple_main.cpp b/modules/ripple_main/ripple_main.cpp index 7592eb98a8..4bc5f084d9 100644 --- a/modules/ripple_main/ripple_main.cpp +++ b/modules/ripple_main/ripple_main.cpp @@ -111,6 +111,7 @@ #include "src/cpp/ripple/ripple_IHashRouter.h" #include "src/cpp/ripple/ripple_ILoadFeeTrack.h" #include "src/cpp/ripple/ripple_Peer.h" // VFALCO: TODO Rename to IPeer +#include "src/cpp/ripple/ripple_IPeers.h" #include "src/cpp/ripple/ripple_IProofOfWorkFactory.h" #include "src/cpp/ripple/ripple_IUniqueNodeList.h" #include "src/cpp/ripple/ripple_IValidations.h" @@ -119,9 +120,6 @@ // //------------------------------------------------------------------------------ - - - // VFALCO: NOTE, Order matters! If you get compile errors, move just 1 // include upwards as little as possible to fix it. // @@ -139,7 +137,6 @@ #include "src/cpp/ripple/CallRPC.h" #include "src/cpp/ripple/CanonicalTXSet.h" #include "src/cpp/ripple/ChangeTransactor.h" -#include "src/cpp/ripple/ConnectionPool.h" #include "src/cpp/ripple/FeatureTable.h" #include "src/cpp/ripple/HTTPRequest.h" #include "src/cpp/ripple/HashPrefixes.h" @@ -226,7 +223,6 @@ static DH* handleTmpDh(SSL* ssl, int is_export, int iKeyLength) #include "src/cpp/ripple/CallRPC.cpp" #include "src/cpp/ripple/CanonicalTXSet.cpp" #include "src/cpp/ripple/ChangeTransactor.cpp" // no log -#include "src/cpp/ripple/ConnectionPool.cpp" #include "src/cpp/ripple/Contract.cpp" // no log #include "src/cpp/ripple/DBInit.cpp" #include "src/cpp/ripple/HashedObject.cpp" @@ -305,6 +301,7 @@ static DH* handleTmpDh(SSL* ssl, int is_export, int iKeyLength) #include "src/cpp/ripple/ripple_LogWebsockets.cpp" #include "src/cpp/ripple/ripple_LoadFeeTrack.cpp" #include "src/cpp/ripple/ripple_Peer.cpp" +#include "src/cpp/ripple/ripple_Peers.cpp" #include "src/cpp/ripple/ripple_ProofOfWork.cpp" #include "src/cpp/ripple/ripple_ProofOfWorkFactory.cpp" #include "src/cpp/ripple/ripple_Validations.cpp" diff --git a/newcoin.vcxproj b/newcoin.vcxproj index 7073dfb2e5..84ad582e9f 100644 --- a/newcoin.vcxproj +++ b/newcoin.vcxproj @@ -758,7 +758,7 @@ true true - + true true true @@ -1645,7 +1645,7 @@ - + diff --git a/newcoin.vcxproj.filters b/newcoin.vcxproj.filters index 38d612d8b0..a521911fbd 100644 --- a/newcoin.vcxproj.filters +++ b/newcoin.vcxproj.filters @@ -600,9 +600,6 @@ 1. Modules\ripple_main\_unfactored\main - - 1. Modules\ripple_main\_unfactored\network - 1. Modules\ripple_main\_unfactored\network @@ -813,6 +810,9 @@ 1. Modules\ripple_main\refactored + + 1. Modules\ripple_main\refactored + @@ -1310,9 +1310,6 @@ 1. Modules\ripple_main\_unfactored\network - - 1. Modules\ripple_main\_unfactored\network - 1. Modules\ripple_main\_unfactored\network @@ -1511,6 +1508,9 @@ 1. Modules\ripple_main\refactored + + 1. Modules\ripple_main\refactored + diff --git a/src/cpp/ripple/Application.cpp b/src/cpp/ripple/Application.cpp index 291aedba3d..04eaeb41f8 100644 --- a/src/cpp/ripple/Application.cpp +++ b/src/cpp/ripple/Application.cpp @@ -48,6 +48,7 @@ Application::Application () , mValidations (IValidations::New ()) , mUNL (IUniqueNodeList::New (mIOService)) , mProofOfWorkFactory (IProofOfWorkFactory::New ()) + , mPeers (IPeers::New (mIOService)) // VFALCO: End new stuff // VFALCO: TODO replace all NULL with nullptr , mRpcDB (NULL) @@ -61,8 +62,7 @@ Application::Application () #ifdef USE_LEVELDB , mHashNodeLDB (NULL) #endif - , mConnectionPool (mIOService) - , mPeerDoor (NULL) + , mPeerDoor (NULL) , mRPCDoor (NULL) , mWSPublicDoor (NULL) , mWSPrivateDoor (NULL) @@ -369,7 +369,7 @@ void Application::setup() // Begin connecting to network. // if (!theConfig.RUN_STANDALONE) - mConnectionPool.start(); + mPeers->start(); if (theConfig.RUN_STANDALONE) diff --git a/src/cpp/ripple/Application.h b/src/cpp/ripple/Application.h index 33172a3f01..5ab15c2b3f 100644 --- a/src/cpp/ripple/Application.h +++ b/src/cpp/ripple/Application.h @@ -8,7 +8,6 @@ #include "../database/database.h" #include "LedgerMaster.h" -#include "ConnectionPool.h" #include "LedgerAcquire.h" #include "TransactionMaster.h" #include "Wallet.h" @@ -30,6 +29,7 @@ class ILoadFeeTrack; class IValidations; class IUniqueNodeList; class IProofOfWorkFactory; +class IPeers; class RPCDoor; class PeerDoor; @@ -38,65 +38,10 @@ typedef TaggedCache< uint256, SLE, UptimeTimerAdapter> SLECache; class Application { - boost::asio::io_service mIOService, mAuxService; - boost::asio::io_service::work mIOWork, mAuxWork; - - boost::recursive_mutex mMasterLock; - - Wallet mWallet; - LedgerMaster mLedgerMaster; - LedgerAcquireMaster mMasterLedgerAcquire; - TransactionMaster mMasterTransaction; - NetworkOPs mNetOps; - NodeCache mTempNodeCache; - HashedObjectStore mHashedObjectStore; - SLECache mSLECache; - SNTPClient mSNTPClient; - JobQueue mJobQueue; - LoadManager mLoadMgr; - TXQueue mTxnQueue; - OrderBookDB mOrderBookDB; - - // VFALCO: Clean stuff - beast::ScopedPointer mFeatures; - beast::ScopedPointer mFeeVote; - beast::ScopedPointer mFeeTrack; - beast::ScopedPointer mHashRouter; - beast::ScopedPointer mValidations; - beast::ScopedPointer mUNL; - beast::ScopedPointer mProofOfWorkFactory; - // VFALCO: End Clean stuff - - DatabaseCon *mRpcDB, *mTxnDB, *mLedgerDB, *mWalletDB, *mNetNodeDB, *mPathFindDB, *mHashNodeDB; - - leveldb::DB *mHashNodeLDB; - leveldb::DB *mEphemeralLDB; - - ConnectionPool mConnectionPool; - PeerDoor* mPeerDoor; - RPCDoor* mRPCDoor; - WSDoor* mWSPublicDoor; - WSDoor* mWSPrivateDoor; - - boost::asio::deadline_timer mSweepTimer; - - std::map mPeerMap; - boost::recursive_mutex mPeerMapLock; - - volatile bool mShutdown; - - void updateTables(bool); - void startNewLedger(); - bool loadOldLedger(const std::string&); - public: Application(); ~Application(); - ConnectionPool& getConnectionPool() { return mConnectionPool; } - - IUniqueNodeList& getUNL() { return *mUNL; } - Wallet& getWallet() { return mWallet ; } NetworkOPs& getOPs() { return mNetOps; } @@ -121,7 +66,9 @@ public: IFeeVote& getFeeVote() { return *mFeeVote; } IHashRouter& getHashRouter() { return *mHashRouter; } IValidations& getValidations() { return *mValidations; } + IUniqueNodeList& getUNL() { return *mUNL; } IProofOfWorkFactory& getProofOfWorkFactory() { return *mProofOfWorkFactory; } + IPeers& getPeers () { return *mPeers; } // VFALCO: TODO, Move these to the .cpp bool running() { return mTxnDB != NULL; } // VFALCO: TODO, replace with nullptr when beast is available @@ -144,12 +91,59 @@ public: void stop(); void sweep(); -#ifdef DEBUG - void mustHaveMasterLock() { bool tl = mMasterLock.try_lock(); assert(tl); mMasterLock.unlock(); } -#else - void mustHaveMasterLock() { ; } -#endif +private: + boost::asio::io_service mIOService; + boost::asio::io_service mAuxService; + boost::asio::io_service::work mIOWork; + boost::asio::io_service::work mAuxWork; + boost::recursive_mutex mMasterLock; + + Wallet mWallet; + LedgerMaster mLedgerMaster; + LedgerAcquireMaster mMasterLedgerAcquire; + TransactionMaster mMasterTransaction; + NetworkOPs mNetOps; + NodeCache mTempNodeCache; + HashedObjectStore mHashedObjectStore; + SLECache mSLECache; + SNTPClient mSNTPClient; + JobQueue mJobQueue; + LoadManager mLoadMgr; + TXQueue mTxnQueue; + OrderBookDB mOrderBookDB; + + // VFALCO: Clean stuff + beast::ScopedPointer mFeatures; + beast::ScopedPointer mFeeVote; + beast::ScopedPointer mFeeTrack; + beast::ScopedPointer mHashRouter; + beast::ScopedPointer mValidations; + beast::ScopedPointer mUNL; + beast::ScopedPointer mProofOfWorkFactory; + beast::ScopedPointer mPeers; + // VFALCO: End Clean stuff + + DatabaseCon *mRpcDB, *mTxnDB, *mLedgerDB, *mWalletDB, *mNetNodeDB, *mPathFindDB, *mHashNodeDB; + + leveldb::DB *mHashNodeLDB; + leveldb::DB *mEphemeralLDB; + + PeerDoor* mPeerDoor; + RPCDoor* mRPCDoor; + WSDoor* mWSPublicDoor; + WSDoor* mWSPrivateDoor; + + boost::asio::deadline_timer mSweepTimer; + + std::map mPeerMap; + boost::recursive_mutex mPeerMapLock; + + volatile bool mShutdown; + + void updateTables(bool); + void startNewLedger(); + bool loadOldLedger(const std::string&); }; extern Application* theApp; diff --git a/src/cpp/ripple/ConnectionPool.h b/src/cpp/ripple/ConnectionPool.h deleted file mode 100644 index 503eccc428..0000000000 --- a/src/cpp/ripple/ConnectionPool.h +++ /dev/null @@ -1,121 +0,0 @@ -#ifndef __CONNECTION_POOL__ -#define __CONNECTION_POOL__ - -#include - -#include -#include - -// -// Access to the Ripple network. -// -class ConnectionPool -{ -private: - boost::recursive_mutex mPeerLock; - uint64 mLastPeer; - int mPhase; - - typedef std::pair naPeer; - typedef std::pair pipPeer; - typedef std::map::value_type vtPeer; - - // Peers we are connecting with and non-thin peers we are connected to. - // Only peers we know the connection ip for are listed. - // We know the ip and port for: - // - All outbound connections - // - Some inbound connections (which we figured out). - boost::unordered_map mIpMap; - - // Non-thin peers which we are connected to. - // Peers we have the public key for. - typedef boost::unordered_map::value_type vtConMap; - boost::unordered_map mConnectedMap; - - // Connections with have a 64-bit identifier - boost::unordered_map mPeerIdMap; - - Peer::pointer mScanning; - boost::asio::deadline_timer mScanTimer; - std::string mScanIp; - int mScanPort; - - void scanHandler(const boost::system::error_code& ecResult); - - boost::asio::deadline_timer mPolicyTimer; - - void policyHandler(const boost::system::error_code& ecResult); - - // Peers we are establishing a connection with as a client. - // int miConnectStarting; - - bool peerAvailable(std::string& strIp, int& iPort); - bool peerScanSet(const std::string& strIp, int iPort); - - Peer::pointer peerConnect(const std::string& strIp, int iPort); - -public: - ConnectionPool(boost::asio::io_service& io_service) : - mLastPeer(0), mPhase(0), mScanTimer(io_service), mPolicyTimer(io_service) - { ; } - - // Begin enforcing connection policy. - void start(); - - // Send message to network. - int relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg); - void relayMessageTo(const std::set& fromPeers, const PackedMessage::pointer& msg); - void relayMessageBut(const std::set& fromPeers, const PackedMessage::pointer& msg); - - // Manual connection request. - // Queue for immediate scanning. - void connectTo(const std::string& strIp, int iPort); - - // - // Peer connectivity notification. - // - bool getTopNAddrs(int n,std::vector& addrs); - bool savePeer(const std::string& strIp, int iPort, char code); - - // We know peers node public key. - // <-- bool: false=reject - bool peerConnected(Peer::ref peer, const RippleAddress& naPeer, const std::string& strIP, int iPort); - - // No longer connected. - void peerDisconnected(Peer::ref peer, const RippleAddress& naPeer); - - // As client accepted. - void peerVerified(Peer::ref peer); - - // As client failed connect and be accepted. - void peerClosed(Peer::ref peer, const std::string& strIp, int iPort); - - int getPeerCount(); - Json::Value getPeersJson(); - std::vector getPeerVector(); - - // Peer 64-bit ID function - uint64 assignPeerId(); - Peer::pointer getPeerById(const uint64& id); - bool hasPeer(const uint64& id); - - // - // Scanning - // - - void scanRefresh(); - - // - // Connection policy - // - void policyLowWater(); - void policyEnforce(); - - // configured connections - void makeConfigured(); -}; - -extern void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort); - -#endif -// vim:ts=4 diff --git a/src/cpp/ripple/LedgerAcquire.cpp b/src/cpp/ripple/LedgerAcquire.cpp index 3c528a5b1a..2043962577 100644 --- a/src/cpp/ripple/LedgerAcquire.cpp +++ b/src/cpp/ripple/LedgerAcquire.cpp @@ -242,7 +242,7 @@ void LedgerAcquire::noAwaitData() void LedgerAcquire::addPeers() { - std::vector peerList = theApp->getConnectionPool().getPeerVector(); + std::vector peerList = theApp->getPeers().getPeerVector(); int vSize = peerList.size(); if (vSize == 0) @@ -399,7 +399,7 @@ void LedgerAcquire::trigger(Peer::ref peer) for (boost::unordered_map::iterator it = mPeers.begin(), end = mPeers.end(); it != end; ++it) { - Peer::pointer iPeer = theApp->getConnectionPool().getPeerById(it->first); + Peer::pointer iPeer = theApp->getPeers().getPeerById(it->first); if (iPeer) { mByHash = false; @@ -554,7 +554,7 @@ void PeerSet::sendRequest(const ripple::TMGetLedger& tmGL) PackedMessage::pointer packet = boost::make_shared(tmGL, ripple::mtGET_LEDGER); for (boost::unordered_map::iterator it = mPeers.begin(), end = mPeers.end(); it != end; ++it) { - Peer::pointer peer = theApp->getConnectionPool().getPeerById(it->first); + Peer::pointer peer = theApp->getPeers().getPeerById(it->first); if (peer) peer->sendPacket(packet, false); } @@ -577,7 +577,7 @@ int PeerSet::getPeerCount() const { int ret = 0; for (boost::unordered_map::const_iterator it = mPeers.begin(), end = mPeers.end(); it != end; ++it) - if (theApp->getConnectionPool().hasPeer(it->first)) + if (theApp->getPeers().hasPeer(it->first)) ++ret; return ret; } diff --git a/src/cpp/ripple/LedgerConsensus.cpp b/src/cpp/ripple/LedgerConsensus.cpp index 1e2c8e5225..d4589764b6 100644 --- a/src/cpp/ripple/LedgerConsensus.cpp +++ b/src/cpp/ripple/LedgerConsensus.cpp @@ -194,7 +194,7 @@ void LedgerConsensus::checkOurValidation() ripple::TMValidation val; val.set_validation(&validation[0], validation.size()); #if 0 - theApp->getConnectionPool().relayMessage(NULL, + theApp->getPeers().relayMessage(NULL, boost::make_shared(val, ripple::mtVALIDATION)); #endif theApp->getOPs().setLastValidation(v); @@ -450,7 +450,7 @@ void LedgerConsensus::sendHaveTxSet(const uint256& hash, bool direct) msg.set_hash(hash.begin(), 256 / 8); msg.set_status(direct ? ripple::tsHAVE : ripple::tsCAN_GET); PackedMessage::pointer packet = boost::make_shared(msg, ripple::mtHAVE_SET); - theApp->getConnectionPool().relayMessage(NULL, packet); + theApp->getPeers().relayMessage(NULL, packet); } void LedgerConsensus::adjustCount(SHAMap::ref map, const std::vector& peers) @@ -483,7 +483,7 @@ void LedgerConsensus::statusChange(ripple::NodeEvent event, Ledger& ledger) s.set_lastseq(uMax); PackedMessage::pointer packet = boost::make_shared(s, ripple::mtSTATUS_CHANGE); - theApp->getConnectionPool().relayMessage(NULL, packet); + theApp->getPeers().relayMessage(NULL, packet); WriteLog (lsTRACE, LedgerConsensus) << "send status change to peer"; } @@ -803,7 +803,7 @@ void LedgerConsensus::startAcquiring(TransactionAcquire::pointer acquire) } } - std::vector peerList = theApp->getConnectionPool().getPeerVector(); + std::vector peerList = theApp->getPeers().getPeerVector(); BOOST_FOREACH(Peer::ref peer, peerList) { if (peer->hasTxSet(acquire->getHash())) @@ -828,7 +828,7 @@ void LedgerConsensus::propose() std::vector sig = mOurPosition->sign(); prop.set_nodepubkey(&pubKey[0], pubKey.size()); prop.set_signature(&sig[0], sig.size()); - theApp->getConnectionPool().relayMessage(NULL, + theApp->getPeers().relayMessage(NULL, boost::make_shared(prop, ripple::mtPROPOSE_LEDGER)); } @@ -866,7 +866,7 @@ void LedgerConsensus::addDisputedTransaction(const uint256& txID, const std::vec msg.set_status(ripple::tsNEW); msg.set_receivetimestamp(theApp->getOPs().getNetworkTimeNC()); PackedMessage::pointer packet = boost::make_shared(msg, ripple::mtTRANSACTION); - theApp->getConnectionPool().relayMessage(NULL, packet); + theApp->getPeers().relayMessage(NULL, packet); } } @@ -1016,7 +1016,7 @@ void LedgerConsensus::playbackProposals() nodepubkey signature PackedMessage::pointer message = boost::make_shared(set, ripple::mtPROPOSE_LEDGER); - theApp->getConnectionPool().relayMessageBut(peers, message); + theApp->getPeers().relayMessageBut(peers, message); } #endif } @@ -1230,7 +1230,7 @@ void LedgerConsensus::accept(SHAMap::ref set, LoadEvent::pointer) std::vector validation = v->getSigned(); ripple::TMValidation val; val.set_validation(&validation[0], validation.size()); - int j = theApp->getConnectionPool().relayMessage(NULL, + int j = theApp->getPeers().relayMessage(NULL, boost::make_shared(val, ripple::mtVALIDATION)); WriteLog (lsINFO, LedgerConsensus) << "CNF Val " << newLCLHash << " to " << j << " peers"; } diff --git a/src/cpp/ripple/LedgerMaster.cpp b/src/cpp/ripple/LedgerMaster.cpp index 2146ee5772..194b6724c2 100644 --- a/src/cpp/ripple/LedgerMaster.cpp +++ b/src/cpp/ripple/LedgerMaster.cpp @@ -315,7 +315,7 @@ bool LedgerMaster::acquireMissingLedger(Ledger::ref origLedger, const uint256& l tmBH.set_query(true); tmBH.set_seq(ledgerSeq); tmBH.set_ledgerhash(ledgerHash.begin(), 32); - std::vector peerList = theApp->getConnectionPool().getPeerVector(); + std::vector peerList = theApp->getPeers().getPeerVector(); Peer::pointer target; int count = 0; diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index ea538ff433..0cb338053b 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -302,7 +302,7 @@ void NetworkOPs::runTransactionQueue() tx.set_receivetimestamp(getNetworkTimeNC()); // FIXME: This should be when we received it PackedMessage::pointer packet = boost::make_shared(tx, ripple::mtTRANSACTION); - theApp->getConnectionPool().relayMessageBut(peers, packet); + theApp->getPeers().relayMessageBut(peers, packet); } } @@ -402,7 +402,7 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, tx.set_receivetimestamp(getNetworkTimeNC()); // FIXME: This should be when we received it PackedMessage::pointer packet = boost::make_shared(tx, ripple::mtTRANSACTION); - theApp->getConnectionPool().relayMessageBut(peers, packet); + theApp->getPeers().relayMessageBut(peers, packet); } } @@ -601,7 +601,7 @@ void NetworkOPs::checkState(const boost::system::error_code& result) theApp->getLoadManager().noDeadLock(); - std::vector peerList = theApp->getConnectionPool().getPeerVector(); + std::vector peerList = theApp->getPeers().getPeerVector(); // do we have sufficient peers? If not, we are disconnected. if (peerList.size() < theConfig.NETWORK_QUORUM) @@ -639,7 +639,7 @@ void NetworkOPs::checkState(const boost::system::error_code& result) void NetworkOPs::tryStartConsensus() { uint256 networkClosed; - bool ledgerChange = checkLastClosedLedger(theApp->getConnectionPool().getPeerVector(), networkClosed); + bool ledgerChange = checkLastClosedLedger(theApp->getPeers().getPeerVector(), networkClosed); if (networkClosed.isZero()) return; @@ -822,7 +822,7 @@ void NetworkOPs::switchLastClosedLedger(Ledger::pointer newLedger, bool duringCo hash = newLedger->getHash(); s.set_ledgerhash(hash.begin(), hash.size()); PackedMessage::pointer packet = boost::make_shared(s, ripple::mtSTATUS_CHANGE); - theApp->getConnectionPool().relayMessage(NULL, packet); + theApp->getPeers().relayMessage(NULL, packet); } int NetworkOPs::beginConsensus(const uint256& networkClosed, Ledger::pointer closingLedger) @@ -865,7 +865,7 @@ bool NetworkOPs::haveConsensusObject() else { // we need to get into the consensus process uint256 networkClosed; - std::vector peerList = theApp->getConnectionPool().getPeerVector(); + std::vector peerList = theApp->getPeers().getPeerVector(); bool ledgerChange = checkLastClosedLedger(peerList, networkClosed); if (!ledgerChange) { @@ -923,7 +923,7 @@ void NetworkOPs::processTrustedProposal(LedgerProposal::pointer proposal, std::set peers; theApp->getHashRouter().swapSet(proposal->getHashRouter(), peers, SF_RELAYED); PackedMessage::pointer message = boost::make_shared(*set, ripple::mtPROPOSE_LEDGER); - theApp->getConnectionPool().relayMessageBut(peers, message); + theApp->getPeers().relayMessageBut(peers, message); } else WriteLog (lsINFO, NetworkOPs) << "Not relaying trusted proposal"; @@ -994,7 +994,7 @@ void NetworkOPs::mapComplete(const uint256& hash, SHAMap::ref map) void NetworkOPs::endConsensus(bool correctLCL) { uint256 deadLedger = mLedgerMaster->getClosedLedger()->getParentHash(); - std::vector peerList = theApp->getConnectionPool().getPeerVector(); + std::vector peerList = theApp->getPeers().getPeerVector(); BOOST_FOREACH(Peer::ref it, peerList) if (it && (it->getClosedLedgerHash() == deadLedger)) { @@ -1282,7 +1282,7 @@ Json::Value NetworkOPs::getServerInfo(bool human, bool admin) if (fp != 0) info["fetch_pack"] = Json::UInt(fp); - info["peers"] = theApp->getConnectionPool().getPeerCount(); + info["peers"] = theApp->getPeers().getPeerCount(); Json::Value lastClose = Json::objectValue; lastClose["proposers"] = theApp->getOPs().getPreviousProposers(); diff --git a/src/cpp/ripple/PeerDoor.cpp b/src/cpp/ripple/PeerDoor.cpp index e342780171..f91bca5e3e 100644 --- a/src/cpp/ripple/PeerDoor.cpp +++ b/src/cpp/ripple/PeerDoor.cpp @@ -41,7 +41,7 @@ void PeerDoor::startListening() Peer::pointer new_connection = Peer::New ( mAcceptor.get_io_service(), mCtx, - theApp->getConnectionPool().assignPeerId(), + theApp->getPeers().assignPeerId(), true); mAcceptor.async_accept(new_connection->getSocket(), @@ -70,10 +70,12 @@ void PeerDoor::handleConnect(Peer::pointer new_connection, mDelayTimer.async_wait(boost::bind(&PeerDoor::startListening, this)); } else + { startListening(); + } } -void initSSLContext(boost::asio::ssl::context& context, +void initSSLContext (boost::asio::ssl::context& context, std::string key_file, std::string cert_file, std::string chain_file) { SSL_CTX* sslContext = context.native_handle(); @@ -95,6 +97,7 @@ void initSSLContext(boost::asio::ssl::context& context, if (!chain_file.empty()) { + // VFALCO: Replace fopen() with RAII FILE *f = fopen(chain_file.c_str(), "r"); if (!f) throw std::runtime_error("Unable to open chain file"); diff --git a/src/cpp/ripple/RPCHandler.cpp b/src/cpp/ripple/RPCHandler.cpp index 4ccdf927b8..5a874b4ced 100644 --- a/src/cpp/ripple/RPCHandler.cpp +++ b/src/cpp/ripple/RPCHandler.cpp @@ -625,7 +625,7 @@ Json::Value RPCHandler::doConnect(Json::Value jvRequest, int& cost, ScopedLock& int iPort = jvRequest.isMember("port") ? jvRequest["port"].asInt() : -1; // XXX Validate legal IP and port - theApp->getConnectionPool().connectTo(strIp, iPort); + theApp->getPeers().connectTo(strIp, iPort); return "connecting"; } @@ -772,7 +772,7 @@ Json::Value RPCHandler::doPeers(Json::Value, int& cost, ScopedLock& MasterLockHo { Json::Value jvResult(Json::objectValue); - jvResult["peers"] = theApp->getConnectionPool().getPeersJson(); + jvResult["peers"] = theApp->getPeers().getPeersJson(); return jvResult; } diff --git a/src/cpp/ripple/TransactionAcquire.cpp b/src/cpp/ripple/TransactionAcquire.cpp index 07342084dd..ca7b7f689b 100644 --- a/src/cpp/ripple/TransactionAcquire.cpp +++ b/src/cpp/ripple/TransactionAcquire.cpp @@ -63,7 +63,7 @@ void TransactionAcquire::onTimer(bool progress) WriteLog (lsWARNING, TransactionAcquire) << "Out of peers for TX set " << getHash(); bool found = false; - std::vector peerList = theApp->getConnectionPool().getPeerVector(); + std::vector peerList = theApp->getPeers().getPeerVector(); BOOST_FOREACH(Peer::ref peer, peerList) { if (peer->hasTxSet(getHash())) diff --git a/src/cpp/ripple/ripple_IPeers.h b/src/cpp/ripple/ripple_IPeers.h new file mode 100644 index 0000000000..35f2aac458 --- /dev/null +++ b/src/cpp/ripple/ripple_IPeers.h @@ -0,0 +1,75 @@ + + +#ifndef RIPPLE_IPEERS_H +#define RIPPLE_IPEERS_H + +/** Manages the set of connected peers. +*/ +class IPeers +{ +public: + static IPeers* New (boost::asio::io_service& io_service); + + virtual ~IPeers () { } + + // Begin enforcing connection policy. + virtual void start () = 0; + + // Send message to network. + virtual int relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg) = 0; + virtual void relayMessageTo(const std::set& fromPeers, const PackedMessage::pointer& msg) = 0; + virtual void relayMessageBut(const std::set& fromPeers, const PackedMessage::pointer& msg) = 0; + + // Manual connection request. + // Queue for immediate scanning. + virtual void connectTo(const std::string& strIp, int iPort) = 0; + + // + // Peer connectivity notification. + // + virtual bool getTopNAddrs(int n,std::vector& addrs) = 0; + virtual bool savePeer(const std::string& strIp, int iPort, char code) = 0; + + // We know peers node public key. + // <-- bool: false=reject + virtual bool peerConnected(Peer::ref peer, const RippleAddress& naPeer, const std::string& strIP, int iPort) = 0; + + // No longer connected. + virtual void peerDisconnected(Peer::ref peer, const RippleAddress& naPeer) = 0; + + // As client accepted. + virtual void peerVerified(Peer::ref peer) = 0; + + // As client failed connect and be accepted. + virtual void peerClosed(Peer::ref peer, const std::string& strIp, int iPort) = 0; + + virtual int getPeerCount() = 0; + virtual Json::Value getPeersJson() = 0; + virtual std::vector getPeerVector() = 0; + + // Peer 64-bit ID function + virtual uint64 assignPeerId() = 0; + virtual Peer::pointer getPeerById(const uint64& id) = 0; + virtual bool hasPeer(const uint64& id) = 0; + + // + // Scanning + // + + virtual void scanRefresh() = 0; + + // + // Connection policy + // + virtual void policyLowWater() = 0; + virtual void policyEnforce() = 0; // VFALCO: This and others can be made private + + // configured connections + virtual void makeConfigured() = 0; +}; + +// VFALCO: TODO Put this in some group of utilities +extern void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort); + +#endif +// vim:ts=4 diff --git a/src/cpp/ripple/ripple_Peer.cpp b/src/cpp/ripple/ripple_Peer.cpp index 5357014ac3..3cf80d8edd 100644 --- a/src/cpp/ripple/ripple_Peer.cpp +++ b/src/cpp/ripple/ripple_Peer.cpp @@ -240,7 +240,7 @@ void PeerImp::detach(const char *rsn, bool onIOStrand) if (mNodePublic.isValid()) { - theApp->getConnectionPool().peerDisconnected(shared_from_this(), mNodePublic); + theApp->getPeers().peerDisconnected(shared_from_this(), mNodePublic); mNodePublic.clear(); // Be idempotent. } @@ -249,7 +249,7 @@ void PeerImp::detach(const char *rsn, bool onIOStrand) { // Connection might be part of scanning. Inform connect failed. // Might need to scan. Inform connection closed. - theApp->getConnectionPool().peerClosed(shared_from_this(), mIpPort.first, mIpPort.second); + theApp->getPeers().peerClosed(shared_from_this(), mIpPort.first, mIpPort.second); mIpPort.first.clear(); // Be idempotent. } @@ -897,10 +897,10 @@ void PeerImp::recvHello(ripple::TMHello& packet) if (mClientConnect) { // If we connected due to scan, no longer need to scan. - theApp->getConnectionPool().peerVerified(shared_from_this()); + theApp->getPeers().peerVerified(shared_from_this()); } - if (!theApp->getConnectionPool().peerConnected(shared_from_this(), mNodePublic, getIP(), getPort())) + if (! theApp->getPeers().peerConnected(shared_from_this(), mNodePublic, getIP(), getPort())) { // Already connected, self, or some other reason. WriteLog (lsINFO, Peer) << "Recv(Hello): Disconnect: Extraneous connection."; } @@ -926,7 +926,7 @@ void PeerImp::recvHello(ripple::TMHello& packet) // Don't save IP address if the node wants privacy. // Note: We don't go so far as to delete it. If a node which has previously announced itself now wants // privacy, it should at least change its port. - theApp->getConnectionPool().savePeer(strIP, iPort, IUniqueNodeList::vsInbound); + theApp->getPeers().savePeer(strIP, iPort, IUniqueNodeList::vsInbound); } } @@ -1100,7 +1100,7 @@ static void checkPropose(Job& job, boost::shared_ptr packe std::set peers; theApp->getHashRouter().swapSet(proposal->getHashRouter(), peers, SF_RELAYED); PackedMessage::pointer message = boost::make_shared(set, ripple::mtPROPOSE_LEDGER); - theApp->getConnectionPool().relayMessageBut(peers, message); + theApp->getPeers().relayMessageBut(peers, message); } else WriteLog (lsDEBUG, Peer) << "Not relaying untrusted proposal"; @@ -1208,7 +1208,7 @@ static void checkValidation(Job&, SerializedValidation::pointer val, uint256 sig theApp->getHashRouter().swapSet(signingHash, peers, SF_RELAYED)) { PackedMessage::pointer message = boost::make_shared(*packet, ripple::mtVALIDATION); - theApp->getConnectionPool().relayMessageBut(peers, message); + theApp->getPeers().relayMessageBut(peers, message); } } #ifndef TRUST_NETWORK @@ -1279,7 +1279,7 @@ void PeerImp::recvGetPeers(ripple::TMGetPeers& packet, ScopedLock& MasterLockHol MasterLockHolder.unlock(); std::vector addrs; - theApp->getConnectionPool().getTopNAddrs(30, addrs); + theApp->getPeers().getTopNAddrs(30, addrs); if (!addrs.empty()) { @@ -1321,7 +1321,7 @@ void PeerImp::recvPeers(ripple::TMPeers& packet) { //WriteLog (lsINFO, Peer) << "Peer: Learning: " << ADDRESS(this) << ": " << i << ": " << strIP << " " << iPort; - theApp->getConnectionPool().savePeer(strIP, iPort, IUniqueNodeList::vsTold); + theApp->getPeers().savePeer(strIP, iPort, IUniqueNodeList::vsTold); } } } @@ -1584,7 +1584,7 @@ void PeerImp::recvGetLedger(ripple::TMGetLedger& packet, ScopedLock& MasterLockH if (packet.has_querytype() && !packet.has_requestcookie()) { WriteLog (lsDEBUG, Peer) << "Trying to route TX set request"; - std::vector peerList = theApp->getConnectionPool().getPeerVector(); + std::vector peerList = theApp->getPeers().getPeerVector(); std::vector usablePeers; BOOST_FOREACH(Peer::ref peer, peerList) { @@ -1634,7 +1634,7 @@ void PeerImp::recvGetLedger(ripple::TMGetLedger& packet, ScopedLock& MasterLockH uint32 seq = 0; if (packet.has_ledgerseq()) seq = packet.ledgerseq(); - std::vector peerList = theApp->getConnectionPool().getPeerVector(); + std::vector peerList = theApp->getPeers().getPeerVector(); std::vector usablePeers; BOOST_FOREACH(Peer::ref peer, peerList) { @@ -1816,7 +1816,7 @@ void PeerImp::recvLedger(const boost::shared_ptr& packet_p if (packet.has_requestcookie()) { - Peer::pointer target = theApp->getConnectionPool().getPeerById(packet.requestcookie()); + Peer::pointer target = theApp->getPeers().getPeerById(packet.requestcookie()); if (target) { packet.clear_requestcookie(); diff --git a/src/cpp/ripple/ConnectionPool.cpp b/src/cpp/ripple/ripple_Peers.cpp similarity index 65% rename from src/cpp/ripple/ConnectionPool.cpp rename to src/cpp/ripple/ripple_Peers.cpp index 5ccc491579..28313f3e9f 100644 --- a/src/cpp/ripple/ConnectionPool.cpp +++ b/src/cpp/ripple/ripple_Peers.cpp @@ -1,23 +1,123 @@ -#include "ConnectionPool.h" - -#include -#include -#include -#include -#include -#include - -#include "PeerDoor.h" -#include "Application.h" - // VFALCO: TODO, make this an inline function #define ADDRESS_SHARED(p) strHex(uint64( ((char*) (p).get()) - ((char*) 0))) // How often to enforce policies. #define POLICY_INTERVAL_SECONDS 5 -SETUP_LOG (ConnectionPool) +class Peers; + +SETUP_LOG (Peers) + +class Peers : public IPeers +{ +public: + explicit Peers (boost::asio::io_service& io_service) + : mLastPeer (0) + , mPhase (0) + , mScanTimer (io_service) + , mPolicyTimer (io_service) + { + } + + // Begin enforcing connection policy. + void start(); + + // Send message to network. + int relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg); + void relayMessageTo(const std::set& fromPeers, const PackedMessage::pointer& msg); + void relayMessageBut(const std::set& fromPeers, const PackedMessage::pointer& msg); + + // Manual connection request. + // Queue for immediate scanning. + void connectTo(const std::string& strIp, int iPort); + + // + // Peer connectivity notification. + // + bool getTopNAddrs(int n,std::vector& addrs); + bool savePeer(const std::string& strIp, int iPort, char code); + + // We know peers node public key. + // <-- bool: false=reject + bool peerConnected(Peer::ref peer, const RippleAddress& naPeer, const std::string& strIP, int iPort); + + // No longer connected. + void peerDisconnected(Peer::ref peer, const RippleAddress& naPeer); + + // As client accepted. + void peerVerified(Peer::ref peer); + + // As client failed connect and be accepted. + void peerClosed(Peer::ref peer, const std::string& strIp, int iPort); + + int getPeerCount(); + Json::Value getPeersJson(); + std::vector getPeerVector(); + + // Peer 64-bit ID function + uint64 assignPeerId(); + Peer::pointer getPeerById(const uint64& id); + bool hasPeer(const uint64& id); + + // + // Scanning + // + + void scanRefresh(); + + // + // Connection policy + // + void policyLowWater(); + void policyEnforce(); + + // configured connections + void makeConfigured(); + +private: + boost::recursive_mutex mPeerLock; + uint64 mLastPeer; + int mPhase; + + typedef std::pair naPeer; + typedef std::pair pipPeer; + typedef std::map::value_type vtPeer; + + // Peers we are connecting with and non-thin peers we are connected to. + // Only peers we know the connection ip for are listed. + // We know the ip and port for: + // - All outbound connections + // - Some inbound connections (which we figured out). + boost::unordered_map mIpMap; + + // Non-thin peers which we are connected to. + // Peers we have the public key for. + typedef boost::unordered_map::value_type vtConMap; + boost::unordered_map mConnectedMap; + + // Connections with have a 64-bit identifier + boost::unordered_map mPeerIdMap; + + Peer::pointer mScanning; + boost::asio::deadline_timer mScanTimer; + std::string mScanIp; + int mScanPort; + + void scanHandler(const boost::system::error_code& ecResult); + + boost::asio::deadline_timer mPolicyTimer; + + void policyHandler(const boost::system::error_code& ecResult); + + // Peers we are establishing a connection with as a client. + // int miConnectStarting; + + bool peerAvailable(std::string& strIp, int& iPort); + bool peerScanSet(const std::string& strIp, int iPort); + + Peer::pointer peerConnect(const std::string& strIp, int iPort); +}; void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort) { @@ -28,7 +128,7 @@ void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort) iPort = boost::lexical_cast(vIpPort[1]); } -void ConnectionPool::start() +void Peers::start() { if (theConfig.RUN_STANDALONE) return; @@ -40,7 +140,7 @@ void ConnectionPool::start() scanRefresh(); } -bool ConnectionPool::getTopNAddrs(int n,std::vector& addrs) +bool Peers::getTopNAddrs(int n,std::vector& addrs) { // XXX Filter out other local addresses (like ipv6) Database* db = theApp->getWalletDB()->getDB(); @@ -58,7 +158,7 @@ bool ConnectionPool::getTopNAddrs(int n,std::vector& addrs) return true; } -bool ConnectionPool::savePeer(const std::string& strIp, int iPort, char code) +bool Peers::savePeer(const std::string& strIp, int iPort, char code) { bool bNew = false; @@ -96,7 +196,7 @@ bool ConnectionPool::savePeer(const std::string& strIp, int iPort, char code) return bNew; } -Peer::pointer ConnectionPool::getPeerById(const uint64& id) +Peer::pointer Peers::getPeerById(const uint64& id) { boost::recursive_mutex::scoped_lock sl(mPeerLock); const boost::unordered_map::iterator& it = mPeerIdMap.find(id); @@ -105,7 +205,7 @@ Peer::pointer ConnectionPool::getPeerById(const uint64& id) return it->second; } -bool ConnectionPool::hasPeer(const uint64& id) +bool Peers::hasPeer(const uint64& id) { boost::recursive_mutex::scoped_lock sl(mPeerLock); return mPeerIdMap.find(id) != mPeerIdMap.end(); @@ -115,7 +215,7 @@ bool ConnectionPool::hasPeer(const uint64& id) // too. // // <-- true, if a peer is available to connect to -bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort) +bool Peers::peerAvailable(std::string& strIp, int& iPort) { Database* db = theApp->getWalletDB()->getDB(); std::vector vstrIpPort; @@ -159,7 +259,7 @@ bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort) } // Make sure we have at least low water connections. -void ConnectionPool::policyLowWater() +void Peers::policyLowWater() { std::string strIp; int iPort; @@ -168,7 +268,7 @@ void ConnectionPool::policyLowWater() if (getPeerCount() > theConfig.PEER_CONNECT_LOW_WATER) { // Above low water mark, don't need more connections. - WriteLog (lsTRACE, ConnectionPool) << "Pool: Low water: sufficient connections: " << mConnectedMap.size() << "/" << theConfig.PEER_CONNECT_LOW_WATER; + WriteLog (lsTRACE, Peers) << "Pool: Low water: sufficient connections: " << mConnectedMap.size() << "/" << theConfig.PEER_CONNECT_LOW_WATER; nothing(); } @@ -182,7 +282,7 @@ void ConnectionPool::policyLowWater() else if (!peerAvailable(strIp, iPort)) { // No more connections available to start. - WriteLog (lsTRACE, ConnectionPool) << "Pool: Low water: no peers available."; + WriteLog (lsTRACE, Peers) << "Pool: Low water: no peers available."; // XXX Might ask peers for more ips. nothing(); @@ -190,11 +290,11 @@ void ConnectionPool::policyLowWater() else { // Try to start connection. - WriteLog (lsTRACE, ConnectionPool) << "Pool: Low water: start connection."; + WriteLog (lsTRACE, Peers) << "Pool: Low water: start connection."; if (!peerConnect(strIp, iPort)) { - WriteLog (lsINFO, ConnectionPool) << "Pool: Low water: already connected."; + WriteLog (lsINFO, Peers) << "Pool: Low water: already connected."; } // Check if we need more. @@ -202,7 +302,7 @@ void ConnectionPool::policyLowWater() } } -void ConnectionPool::policyEnforce() +void Peers::policyEnforce() { // Cancel any in progress timer. (void) mPolicyTimer.cancel(); @@ -212,16 +312,16 @@ void ConnectionPool::policyEnforce() if (((++mPhase) % 12) == 0) { - WriteLog (lsTRACE, ConnectionPool) << "Making configured connections"; + WriteLog (lsTRACE, Peers) << "Making configured connections"; makeConfigured(); } // Schedule next enforcement. mPolicyTimer.expires_at(boost::posix_time::second_clock::universal_time()+boost::posix_time::seconds(POLICY_INTERVAL_SECONDS)); - mPolicyTimer.async_wait(boost::bind(&ConnectionPool::policyHandler, this, _1)); + mPolicyTimer.async_wait(boost::bind(&Peers::policyHandler, this, _1)); } -void ConnectionPool::policyHandler(const boost::system::error_code& ecResult) +void Peers::policyHandler(const boost::system::error_code& ecResult) { if (ecResult == boost::asio::error::operation_aborted) { @@ -239,7 +339,7 @@ void ConnectionPool::policyHandler(const boost::system::error_code& ecResult) // YYY: Should probably do this in the background. // YYY: Might end up sending to disconnected peer? -int ConnectionPool::relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg) +int Peers::relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg) { int sentTo = 0; std::vector peerVector = getPeerVector(); @@ -255,7 +355,7 @@ int ConnectionPool::relayMessage(Peer* fromPeer, const PackedMessage::pointer& m return sentTo; } -void ConnectionPool::relayMessageBut(const std::set& fromPeers, const PackedMessage::pointer& msg) +void Peers::relayMessageBut(const std::set& fromPeers, const PackedMessage::pointer& msg) { // Relay message to all but the specified peers std::vector peerVector = getPeerVector(); BOOST_FOREACH(Peer::ref peer, peerVector) @@ -266,7 +366,7 @@ void ConnectionPool::relayMessageBut(const std::set& fromPeers, const Pa } -void ConnectionPool::relayMessageTo(const std::set& fromPeers, const PackedMessage::pointer& msg) +void Peers::relayMessageTo(const std::set& fromPeers, const PackedMessage::pointer& msg) { // Relay message to the specified peers std::vector peerVector = getPeerVector(); BOOST_FOREACH(Peer::ref peer, peerVector) @@ -281,7 +381,7 @@ void ConnectionPool::relayMessageTo(const std::set& fromPeers, const Pac // // Add or modify into PeerIps as a manual entry for immediate scanning. // Requires sane IP and port. -void ConnectionPool::connectTo(const std::string& strIp, int iPort) +void Peers::connectTo(const std::string& strIp, int iPort) { { Database* db = theApp->getWalletDB()->getDB(); @@ -299,7 +399,7 @@ void ConnectionPool::connectTo(const std::string& strIp, int iPort) // Start a connection, if not already known connected or connecting. // // <-- true, if already connected. -Peer::pointer ConnectionPool::peerConnect(const std::string& strIp, int iPort) +Peer::pointer Peers::peerConnect(const std::string& strIp, int iPort) { ipPort pipPeer = make_pair(strIp, iPort); Peer::pointer ppResult; @@ -322,18 +422,18 @@ Peer::pointer ConnectionPool::peerConnect(const std::string& strIp, int iPort) if (ppResult) { ppResult->connect(strIp, iPort); - WriteLog (lsDEBUG, ConnectionPool) << "Pool: Connecting: " << strIp << " " << iPort; + WriteLog (lsDEBUG, Peers) << "Pool: Connecting: " << strIp << " " << iPort; } else { - WriteLog (lsTRACE, ConnectionPool) << "Pool: Already connected: " << strIp << " " << iPort; + WriteLog (lsTRACE, Peers) << "Pool: Already connected: " << strIp << " " << iPort; } return ppResult; } // Returns information on verified peers. -Json::Value ConnectionPool::getPeersJson() +Json::Value Peers::getPeersJson() { Json::Value ret(Json::arrayValue); std::vector vppPeers = getPeerVector(); @@ -346,14 +446,14 @@ Json::Value ConnectionPool::getPeersJson() return ret; } -int ConnectionPool::getPeerCount() +int Peers::getPeerCount() { boost::recursive_mutex::scoped_lock sl(mPeerLock); return mConnectedMap.size(); } -std::vector ConnectionPool::getPeerVector() +std::vector Peers::getPeerVector() { std::vector ret; @@ -370,7 +470,7 @@ std::vector ConnectionPool::getPeerVector() return ret; } -uint64 ConnectionPool::assignPeerId() +uint64 Peers::assignPeerId() { boost::recursive_mutex::scoped_lock sl(mPeerLock); return ++mLastPeer; @@ -378,7 +478,7 @@ uint64 ConnectionPool::assignPeerId() // Now know peer's node public key. Determine if we want to stay connected. // <-- bNew: false = redundant -bool ConnectionPool::peerConnected(Peer::ref peer, const RippleAddress& naPeer, +bool Peers::peerConnected(Peer::ref peer, const RippleAddress& naPeer, const std::string& strIP, int iPort) { bool bNew = false; @@ -387,7 +487,7 @@ bool ConnectionPool::peerConnected(Peer::ref peer, const RippleAddress& naPeer, if (naPeer == theApp->getWallet().getNodePublic()) { - WriteLog (lsINFO, ConnectionPool) << "Pool: Connected: self: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort; + WriteLog (lsINFO, Peers) << "Pool: Connected: self: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort; } else { @@ -397,7 +497,7 @@ bool ConnectionPool::peerConnected(Peer::ref peer, const RippleAddress& naPeer, if (itCm == mConnectedMap.end()) { // New connection. - //WriteLog (lsINFO, ConnectionPool) << "Pool: Connected: new: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort; + //WriteLog (lsINFO, Peers) << "Pool: Connected: new: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort; mConnectedMap[naPeer] = peer; bNew = true; @@ -413,7 +513,7 @@ bool ConnectionPool::peerConnected(Peer::ref peer, const RippleAddress& naPeer, if (itCm->second->getIP().empty()) { // Old peer did not know it's IP. - //WriteLog (lsINFO, ConnectionPool) << "Pool: Connected: redundant: outbound: " << ADDRESS_SHARED(peer) << " discovered: " << ADDRESS_SHARED(itCm->second) << ": " << strIP << " " << iPort; + //WriteLog (lsINFO, Peers) << "Pool: Connected: redundant: outbound: " << ADDRESS_SHARED(peer) << " discovered: " << ADDRESS_SHARED(itCm->second) << ": " << strIP << " " << iPort; itCm->second->setIpPort(strIP, iPort); @@ -423,14 +523,14 @@ bool ConnectionPool::peerConnected(Peer::ref peer, const RippleAddress& naPeer, else { // Old peer knew its IP. Do nothing. - //WriteLog (lsINFO, ConnectionPool) << "Pool: Connected: redundant: outbound: rediscovered: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort; + //WriteLog (lsINFO, Peers) << "Pool: Connected: redundant: outbound: rediscovered: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort; nothing(); } } else { - //WriteLog (lsINFO, ConnectionPool) << "Pool: Connected: redundant: inbound: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort; + //WriteLog (lsINFO, Peers) << "Pool: Connected: redundant: inbound: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort; nothing(); } @@ -440,7 +540,7 @@ bool ConnectionPool::peerConnected(Peer::ref peer, const RippleAddress& naPeer, } // We maintain a map of public key to peer for connected and verified peers. Maintain it. -void ConnectionPool::peerDisconnected(Peer::ref peer, const RippleAddress& naPeer) +void Peers::peerDisconnected(Peer::ref peer, const RippleAddress& naPeer) { boost::recursive_mutex::scoped_lock sl(mPeerLock); @@ -451,12 +551,12 @@ void ConnectionPool::peerDisconnected(Peer::ref peer, const RippleAddress& naPee if (itCm == mConnectedMap.end()) { // Did not find it. Not already connecting or connected. - WriteLog (lsWARNING, ConnectionPool) << "Pool: disconnected: Internal Error: mConnectedMap was inconsistent."; + WriteLog (lsWARNING, Peers) << "Pool: disconnected: Internal Error: mConnectedMap was inconsistent."; // XXX Maybe bad error, considering we have racing connections, may not so bad. } else if (itCm->second != peer) { - WriteLog (lsWARNING, ConnectionPool) << "Pool: disconected: non canonical entry"; + WriteLog (lsWARNING, Peers) << "Pool: disconected: non canonical entry"; nothing(); } @@ -465,12 +565,12 @@ void ConnectionPool::peerDisconnected(Peer::ref peer, const RippleAddress& naPee // Found it. Delete it. mConnectedMap.erase(itCm); - //WriteLog (lsINFO, ConnectionPool) << "Pool: disconnected: " << naPeer.humanNodePublic() << " " << peer->getIP() << " " << peer->getPort(); + //WriteLog (lsINFO, Peers) << "Pool: disconnected: " << naPeer.humanNodePublic() << " " << peer->getIP() << " " << peer->getPort(); } } else { - //WriteLog (lsINFO, ConnectionPool) << "Pool: disconnected: anonymous: " << peer->getIP() << " " << peer->getPort(); + //WriteLog (lsINFO, Peers) << "Pool: disconnected: anonymous: " << peer->getIP() << " " << peer->getPort(); } assert(peer->getPeerId() != 0); @@ -480,7 +580,7 @@ void ConnectionPool::peerDisconnected(Peer::ref peer, const RippleAddress& naPee // Schedule for immediate scanning, if not already scheduled. // // <-- true, scanRefresh needed. -bool ConnectionPool::peerScanSet(const std::string& strIp, int iPort) +bool Peers::peerScanSet(const std::string& strIp, int iPort) { std::string strIpPort = str(boost::format("%s %d") % strIp % iPort); bool bScanDirty = false; @@ -499,7 +599,7 @@ bool ConnectionPool::peerScanSet(const std::string& strIp, int iPort) boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time(); boost::posix_time::ptime tpNext = tpNow + boost::posix_time::seconds(iInterval); - //WriteLog (lsINFO, ConnectionPool) << str(boost::format("Pool: Scan: schedule create: %s %s (next %s, delay=%d)") + //WriteLog (lsINFO, Peers) << str(boost::format("Pool: Scan: schedule create: %s %s (next %s, delay=%d)") // % mScanIp % mScanPort % tpNext % (tpNext-tpNow).total_seconds()); db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IpPort=%s;") @@ -515,21 +615,21 @@ bool ConnectionPool::peerScanSet(const std::string& strIp, int iPort) // boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time(); // boost::posix_time::ptime tpNext = ptFromSeconds(db->getInt("ScanNext")); - //WriteLog (lsINFO, ConnectionPool) << str(boost::format("Pool: Scan: schedule exists: %s %s (next %s, delay=%d)") + //WriteLog (lsINFO, Peers) << str(boost::format("Pool: Scan: schedule exists: %s %s (next %s, delay=%d)") // % mScanIp % mScanPort % tpNext % (tpNext-tpNow).total_seconds()); } db->endIterRows(); } else { - //WriteLog (lsWARNING, ConnectionPool) << "Pool: Scan: peer wasn't in PeerIps: " << strIp << " " << iPort; + //WriteLog (lsWARNING, Peers) << "Pool: Scan: peer wasn't in PeerIps: " << strIp << " " << iPort; } return bScanDirty; } // --> strIp: not empty -void ConnectionPool::peerClosed(Peer::ref peer, const std::string& strIp, int iPort) +void Peers::peerClosed(Peer::ref peer, const std::string& strIp, int iPort) { ipPort ipPeer = make_pair(strIp, iPort); bool bScanRefresh = false; @@ -537,7 +637,7 @@ void ConnectionPool::peerClosed(Peer::ref peer, const std::string& strIp, int iP // If the connection was our scan, we are no longer scanning. if (mScanning && mScanning == peer) { - //WriteLog (lsINFO, ConnectionPool) << "Pool: Scan: scan fail: " << strIp << " " << iPort; + //WriteLog (lsINFO, Peers) << "Pool: Scan: scan fail: " << strIp << " " << iPort; mScanning.reset(); // No longer scanning. bScanRefresh = true; // Look for more to scan. @@ -552,13 +652,13 @@ void ConnectionPool::peerClosed(Peer::ref peer, const std::string& strIp, int iP if (itIp == mIpMap.end()) { // Did not find it. Not already connecting or connected. - WriteLog (lsWARNING, ConnectionPool) << "Pool: Closed: UNEXPECTED: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; + WriteLog (lsWARNING, Peers) << "Pool: Closed: UNEXPECTED: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; // XXX Internal error. } else if (mIpMap[ipPeer] == peer) { // We were the identified connection. - //WriteLog (lsINFO, ConnectionPool) << "Pool: Closed: identified: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; + //WriteLog (lsINFO, Peers) << "Pool: Closed: identified: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; // Delete our entry. mIpMap.erase(itIp); @@ -568,7 +668,7 @@ void ConnectionPool::peerClosed(Peer::ref peer, const std::string& strIp, int iP else { // Found it. But, we were redundant. - //WriteLog (lsINFO, ConnectionPool) << "Pool: Closed: redundant: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; + //WriteLog (lsINFO, Peers) << "Pool: Closed: redundant: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort; } } @@ -582,7 +682,7 @@ void ConnectionPool::peerClosed(Peer::ref peer, const std::string& strIp, int iP scanRefresh(); } -void ConnectionPool::peerVerified(Peer::ref peer) +void Peers::peerVerified(Peer::ref peer) { if (mScanning && mScanning == peer) { @@ -592,7 +692,7 @@ void ConnectionPool::peerVerified(Peer::ref peer) std::string strIpPort = str(boost::format("%s %d") % strIp % iPort); - //WriteLog (lsINFO, ConnectionPool) << str(boost::format("Pool: Scan: connected: %s %s %s (scanned)") % ADDRESS_SHARED(peer) % strIp % iPort); + //WriteLog (lsINFO, Peers) << str(boost::format("Pool: Scan: connected: %s %s %s (scanned)") % ADDRESS_SHARED(peer) % strIp % iPort); if (peer->getNodePublic() == theApp->getWallet().getNodePublic()) { @@ -617,7 +717,7 @@ void ConnectionPool::peerVerified(Peer::ref peer) } } -void ConnectionPool::scanHandler(const boost::system::error_code& ecResult) +void Peers::scanHandler(const boost::system::error_code& ecResult) { if (ecResult == boost::asio::error::operation_aborted) { @@ -633,7 +733,7 @@ void ConnectionPool::scanHandler(const boost::system::error_code& ecResult) } } -void ConnectionPool::makeConfigured() +void Peers::makeConfigured() { if (theConfig.RUN_STANDALONE) return; @@ -648,7 +748,7 @@ void ConnectionPool::makeConfigured() } // Scan ips as per db entries. -void ConnectionPool::scanRefresh() +void Peers::scanRefresh() { if (theConfig.RUN_STANDALONE) { @@ -657,7 +757,7 @@ void ConnectionPool::scanRefresh() else if (mScanning) { // Currently scanning, will scan again after completion. - WriteLog (lsTRACE, ConnectionPool) << "Pool: Scan: already scanning"; + WriteLog (lsTRACE, Peers) << "Pool: Scan: already scanning"; nothing(); } @@ -695,7 +795,7 @@ void ConnectionPool::scanRefresh() if (tpNow.is_not_a_date_time()) { - //WriteLog (lsINFO, ConnectionPool) << "Pool: Scan: stop."; + //WriteLog (lsINFO, Peers) << "Pool: Scan: stop."; (void) mScanTimer.cancel(); } @@ -710,7 +810,7 @@ void ConnectionPool::scanRefresh() tpNext = tpNow + boost::posix_time::seconds(iInterval); - //WriteLog (lsINFO, ConnectionPool) << str(boost::format("Pool: Scan: Now: %s %s (next %s, delay=%d)") + //WriteLog (lsINFO, Peers) << str(boost::format("Pool: Scan: Now: %s %s (next %s, delay=%d)") // % mScanIp % mScanPort % tpNext % (tpNext-tpNow).total_seconds()); iInterval *= 2; @@ -735,17 +835,22 @@ void ConnectionPool::scanRefresh() } else { - //WriteLog (lsINFO, ConnectionPool) << str(boost::format("Pool: Scan: Next: %s (next %s, delay=%d)") + //WriteLog (lsINFO, Peers) << str(boost::format("Pool: Scan: Next: %s (next %s, delay=%d)") // % strIpPort % tpNext % (tpNext-tpNow).total_seconds()); mScanTimer.expires_at(tpNext); - mScanTimer.async_wait(boost::bind(&ConnectionPool::scanHandler, this, _1)); + mScanTimer.async_wait(boost::bind(&Peers::scanHandler, this, _1)); } } } +IPeers* IPeers::New (boost::asio::io_service& io_service) +{ + return new Peers (io_service); +} + #if 0 -bool ConnectionPool::isMessageKnown(PackedMessage::pointer msg) +bool Peers::isMessageKnown(PackedMessage::pointer msg) { for(unsigned int n=0; ngetConnectionPool().scanRefresh(); + theApp->getPeers().scanRefresh(); } }