diff --git a/modules/ripple_data/ripple_data.h b/modules/ripple_data/ripple_data.h
index a41a74bc6..2d8a5a38d 100644
--- a/modules/ripple_data/ripple_data.h
+++ b/modules/ripple_data/ripple_data.h
@@ -57,13 +57,13 @@
// VFALCO: TODO, try to reduce these dependencies
#include "../ripple_basics/ripple_basics.h"
-// VFALCO: TODO, resolve the location of this file
+// VFALCO: TODO, figure out a good place for this file, perhaps give it some
+// additional hierarchy via directories.
#include "ripple.pb.h"
#include "crypto/ripple_CBigNum.h"
#include "crypto/ripple_Base58.h" // VFALCO: TODO, Can be moved to .cpp if we clean up setAlphabet stuff
#include "crypto/ripple_Base58Data.h"
-// #include "src/cpp/ripple/ProofOfWork.h"
#include "protocol/ripple_FieldNames.h"
#include "protocol/ripple_PackedMessage.h"
diff --git a/modules/ripple_main/ripple_main.cpp b/modules/ripple_main/ripple_main.cpp
index 7592eb98a..4bc5f084d 100644
--- a/modules/ripple_main/ripple_main.cpp
+++ b/modules/ripple_main/ripple_main.cpp
@@ -111,6 +111,7 @@
#include "src/cpp/ripple/ripple_IHashRouter.h"
#include "src/cpp/ripple/ripple_ILoadFeeTrack.h"
#include "src/cpp/ripple/ripple_Peer.h" // VFALCO: TODO Rename to IPeer
+#include "src/cpp/ripple/ripple_IPeers.h"
#include "src/cpp/ripple/ripple_IProofOfWorkFactory.h"
#include "src/cpp/ripple/ripple_IUniqueNodeList.h"
#include "src/cpp/ripple/ripple_IValidations.h"
@@ -119,9 +120,6 @@
//
//------------------------------------------------------------------------------
-
-
-
// VFALCO: NOTE, Order matters! If you get compile errors, move just 1
// include upwards as little as possible to fix it.
//
@@ -139,7 +137,6 @@
#include "src/cpp/ripple/CallRPC.h"
#include "src/cpp/ripple/CanonicalTXSet.h"
#include "src/cpp/ripple/ChangeTransactor.h"
-#include "src/cpp/ripple/ConnectionPool.h"
#include "src/cpp/ripple/FeatureTable.h"
#include "src/cpp/ripple/HTTPRequest.h"
#include "src/cpp/ripple/HashPrefixes.h"
@@ -226,7 +223,6 @@ static DH* handleTmpDh(SSL* ssl, int is_export, int iKeyLength)
#include "src/cpp/ripple/CallRPC.cpp"
#include "src/cpp/ripple/CanonicalTXSet.cpp"
#include "src/cpp/ripple/ChangeTransactor.cpp" // no log
-#include "src/cpp/ripple/ConnectionPool.cpp"
#include "src/cpp/ripple/Contract.cpp" // no log
#include "src/cpp/ripple/DBInit.cpp"
#include "src/cpp/ripple/HashedObject.cpp"
@@ -305,6 +301,7 @@ static DH* handleTmpDh(SSL* ssl, int is_export, int iKeyLength)
#include "src/cpp/ripple/ripple_LogWebsockets.cpp"
#include "src/cpp/ripple/ripple_LoadFeeTrack.cpp"
#include "src/cpp/ripple/ripple_Peer.cpp"
+#include "src/cpp/ripple/ripple_Peers.cpp"
#include "src/cpp/ripple/ripple_ProofOfWork.cpp"
#include "src/cpp/ripple/ripple_ProofOfWorkFactory.cpp"
#include "src/cpp/ripple/ripple_Validations.cpp"
diff --git a/newcoin.vcxproj b/newcoin.vcxproj
index 7073dfb2e..84ad582e9 100644
--- a/newcoin.vcxproj
+++ b/newcoin.vcxproj
@@ -758,7 +758,7 @@
true
true
-
+
true
true
true
@@ -1645,7 +1645,7 @@
-
+
diff --git a/newcoin.vcxproj.filters b/newcoin.vcxproj.filters
index 38d612d8b..a521911fb 100644
--- a/newcoin.vcxproj.filters
+++ b/newcoin.vcxproj.filters
@@ -600,9 +600,6 @@
1. Modules\ripple_main\_unfactored\main
-
- 1. Modules\ripple_main\_unfactored\network
-
1. Modules\ripple_main\_unfactored\network
@@ -813,6 +810,9 @@
1. Modules\ripple_main\refactored
+
+ 1. Modules\ripple_main\refactored
+
@@ -1310,9 +1310,6 @@
1. Modules\ripple_main\_unfactored\network
-
- 1. Modules\ripple_main\_unfactored\network
-
1. Modules\ripple_main\_unfactored\network
@@ -1511,6 +1508,9 @@
1. Modules\ripple_main\refactored
+
+ 1. Modules\ripple_main\refactored
+
diff --git a/src/cpp/ripple/Application.cpp b/src/cpp/ripple/Application.cpp
index 291aedba3..04eaeb41f 100644
--- a/src/cpp/ripple/Application.cpp
+++ b/src/cpp/ripple/Application.cpp
@@ -48,6 +48,7 @@ Application::Application ()
, mValidations (IValidations::New ())
, mUNL (IUniqueNodeList::New (mIOService))
, mProofOfWorkFactory (IProofOfWorkFactory::New ())
+ , mPeers (IPeers::New (mIOService))
// VFALCO: End new stuff
// VFALCO: TODO replace all NULL with nullptr
, mRpcDB (NULL)
@@ -61,8 +62,7 @@ Application::Application ()
#ifdef USE_LEVELDB
, mHashNodeLDB (NULL)
#endif
- , mConnectionPool (mIOService)
- , mPeerDoor (NULL)
+ , mPeerDoor (NULL)
, mRPCDoor (NULL)
, mWSPublicDoor (NULL)
, mWSPrivateDoor (NULL)
@@ -369,7 +369,7 @@ void Application::setup()
// Begin connecting to network.
//
if (!theConfig.RUN_STANDALONE)
- mConnectionPool.start();
+ mPeers->start();
if (theConfig.RUN_STANDALONE)
diff --git a/src/cpp/ripple/Application.h b/src/cpp/ripple/Application.h
index 33172a3f0..5ab15c2b3 100644
--- a/src/cpp/ripple/Application.h
+++ b/src/cpp/ripple/Application.h
@@ -8,7 +8,6 @@
#include "../database/database.h"
#include "LedgerMaster.h"
-#include "ConnectionPool.h"
#include "LedgerAcquire.h"
#include "TransactionMaster.h"
#include "Wallet.h"
@@ -30,6 +29,7 @@ class ILoadFeeTrack;
class IValidations;
class IUniqueNodeList;
class IProofOfWorkFactory;
+class IPeers;
class RPCDoor;
class PeerDoor;
@@ -38,65 +38,10 @@ typedef TaggedCache< uint256, SLE, UptimeTimerAdapter> SLECache;
class Application
{
- boost::asio::io_service mIOService, mAuxService;
- boost::asio::io_service::work mIOWork, mAuxWork;
-
- boost::recursive_mutex mMasterLock;
-
- Wallet mWallet;
- LedgerMaster mLedgerMaster;
- LedgerAcquireMaster mMasterLedgerAcquire;
- TransactionMaster mMasterTransaction;
- NetworkOPs mNetOps;
- NodeCache mTempNodeCache;
- HashedObjectStore mHashedObjectStore;
- SLECache mSLECache;
- SNTPClient mSNTPClient;
- JobQueue mJobQueue;
- LoadManager mLoadMgr;
- TXQueue mTxnQueue;
- OrderBookDB mOrderBookDB;
-
- // VFALCO: Clean stuff
- beast::ScopedPointer mFeatures;
- beast::ScopedPointer mFeeVote;
- beast::ScopedPointer mFeeTrack;
- beast::ScopedPointer mHashRouter;
- beast::ScopedPointer mValidations;
- beast::ScopedPointer mUNL;
- beast::ScopedPointer mProofOfWorkFactory;
- // VFALCO: End Clean stuff
-
- DatabaseCon *mRpcDB, *mTxnDB, *mLedgerDB, *mWalletDB, *mNetNodeDB, *mPathFindDB, *mHashNodeDB;
-
- leveldb::DB *mHashNodeLDB;
- leveldb::DB *mEphemeralLDB;
-
- ConnectionPool mConnectionPool;
- PeerDoor* mPeerDoor;
- RPCDoor* mRPCDoor;
- WSDoor* mWSPublicDoor;
- WSDoor* mWSPrivateDoor;
-
- boost::asio::deadline_timer mSweepTimer;
-
- std::map mPeerMap;
- boost::recursive_mutex mPeerMapLock;
-
- volatile bool mShutdown;
-
- void updateTables(bool);
- void startNewLedger();
- bool loadOldLedger(const std::string&);
-
public:
Application();
~Application();
- ConnectionPool& getConnectionPool() { return mConnectionPool; }
-
- IUniqueNodeList& getUNL() { return *mUNL; }
-
Wallet& getWallet() { return mWallet ; }
NetworkOPs& getOPs() { return mNetOps; }
@@ -121,7 +66,9 @@ public:
IFeeVote& getFeeVote() { return *mFeeVote; }
IHashRouter& getHashRouter() { return *mHashRouter; }
IValidations& getValidations() { return *mValidations; }
+ IUniqueNodeList& getUNL() { return *mUNL; }
IProofOfWorkFactory& getProofOfWorkFactory() { return *mProofOfWorkFactory; }
+ IPeers& getPeers () { return *mPeers; }
// VFALCO: TODO, Move these to the .cpp
bool running() { return mTxnDB != NULL; } // VFALCO: TODO, replace with nullptr when beast is available
@@ -144,12 +91,59 @@ public:
void stop();
void sweep();
-#ifdef DEBUG
- void mustHaveMasterLock() { bool tl = mMasterLock.try_lock(); assert(tl); mMasterLock.unlock(); }
-#else
- void mustHaveMasterLock() { ; }
-#endif
+private:
+ boost::asio::io_service mIOService;
+ boost::asio::io_service mAuxService;
+ boost::asio::io_service::work mIOWork;
+ boost::asio::io_service::work mAuxWork;
+ boost::recursive_mutex mMasterLock;
+
+ Wallet mWallet;
+ LedgerMaster mLedgerMaster;
+ LedgerAcquireMaster mMasterLedgerAcquire;
+ TransactionMaster mMasterTransaction;
+ NetworkOPs mNetOps;
+ NodeCache mTempNodeCache;
+ HashedObjectStore mHashedObjectStore;
+ SLECache mSLECache;
+ SNTPClient mSNTPClient;
+ JobQueue mJobQueue;
+ LoadManager mLoadMgr;
+ TXQueue mTxnQueue;
+ OrderBookDB mOrderBookDB;
+
+ // VFALCO: Clean stuff
+ beast::ScopedPointer mFeatures;
+ beast::ScopedPointer mFeeVote;
+ beast::ScopedPointer mFeeTrack;
+ beast::ScopedPointer mHashRouter;
+ beast::ScopedPointer mValidations;
+ beast::ScopedPointer mUNL;
+ beast::ScopedPointer mProofOfWorkFactory;
+ beast::ScopedPointer mPeers;
+ // VFALCO: End Clean stuff
+
+ DatabaseCon *mRpcDB, *mTxnDB, *mLedgerDB, *mWalletDB, *mNetNodeDB, *mPathFindDB, *mHashNodeDB;
+
+ leveldb::DB *mHashNodeLDB;
+ leveldb::DB *mEphemeralLDB;
+
+ PeerDoor* mPeerDoor;
+ RPCDoor* mRPCDoor;
+ WSDoor* mWSPublicDoor;
+ WSDoor* mWSPrivateDoor;
+
+ boost::asio::deadline_timer mSweepTimer;
+
+ std::map mPeerMap;
+ boost::recursive_mutex mPeerMapLock;
+
+ volatile bool mShutdown;
+
+ void updateTables(bool);
+ void startNewLedger();
+ bool loadOldLedger(const std::string&);
};
extern Application* theApp;
diff --git a/src/cpp/ripple/ConnectionPool.h b/src/cpp/ripple/ConnectionPool.h
deleted file mode 100644
index 503eccc42..000000000
--- a/src/cpp/ripple/ConnectionPool.h
+++ /dev/null
@@ -1,121 +0,0 @@
-#ifndef __CONNECTION_POOL__
-#define __CONNECTION_POOL__
-
-#include
-
-#include
-#include
-
-//
-// Access to the Ripple network.
-//
-class ConnectionPool
-{
-private:
- boost::recursive_mutex mPeerLock;
- uint64 mLastPeer;
- int mPhase;
-
- typedef std::pair naPeer;
- typedef std::pair pipPeer;
- typedef std::map::value_type vtPeer;
-
- // Peers we are connecting with and non-thin peers we are connected to.
- // Only peers we know the connection ip for are listed.
- // We know the ip and port for:
- // - All outbound connections
- // - Some inbound connections (which we figured out).
- boost::unordered_map mIpMap;
-
- // Non-thin peers which we are connected to.
- // Peers we have the public key for.
- typedef boost::unordered_map::value_type vtConMap;
- boost::unordered_map mConnectedMap;
-
- // Connections with have a 64-bit identifier
- boost::unordered_map mPeerIdMap;
-
- Peer::pointer mScanning;
- boost::asio::deadline_timer mScanTimer;
- std::string mScanIp;
- int mScanPort;
-
- void scanHandler(const boost::system::error_code& ecResult);
-
- boost::asio::deadline_timer mPolicyTimer;
-
- void policyHandler(const boost::system::error_code& ecResult);
-
- // Peers we are establishing a connection with as a client.
- // int miConnectStarting;
-
- bool peerAvailable(std::string& strIp, int& iPort);
- bool peerScanSet(const std::string& strIp, int iPort);
-
- Peer::pointer peerConnect(const std::string& strIp, int iPort);
-
-public:
- ConnectionPool(boost::asio::io_service& io_service) :
- mLastPeer(0), mPhase(0), mScanTimer(io_service), mPolicyTimer(io_service)
- { ; }
-
- // Begin enforcing connection policy.
- void start();
-
- // Send message to network.
- int relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg);
- void relayMessageTo(const std::set& fromPeers, const PackedMessage::pointer& msg);
- void relayMessageBut(const std::set& fromPeers, const PackedMessage::pointer& msg);
-
- // Manual connection request.
- // Queue for immediate scanning.
- void connectTo(const std::string& strIp, int iPort);
-
- //
- // Peer connectivity notification.
- //
- bool getTopNAddrs(int n,std::vector& addrs);
- bool savePeer(const std::string& strIp, int iPort, char code);
-
- // We know peers node public key.
- // <-- bool: false=reject
- bool peerConnected(Peer::ref peer, const RippleAddress& naPeer, const std::string& strIP, int iPort);
-
- // No longer connected.
- void peerDisconnected(Peer::ref peer, const RippleAddress& naPeer);
-
- // As client accepted.
- void peerVerified(Peer::ref peer);
-
- // As client failed connect and be accepted.
- void peerClosed(Peer::ref peer, const std::string& strIp, int iPort);
-
- int getPeerCount();
- Json::Value getPeersJson();
- std::vector getPeerVector();
-
- // Peer 64-bit ID function
- uint64 assignPeerId();
- Peer::pointer getPeerById(const uint64& id);
- bool hasPeer(const uint64& id);
-
- //
- // Scanning
- //
-
- void scanRefresh();
-
- //
- // Connection policy
- //
- void policyLowWater();
- void policyEnforce();
-
- // configured connections
- void makeConfigured();
-};
-
-extern void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort);
-
-#endif
-// vim:ts=4
diff --git a/src/cpp/ripple/LedgerAcquire.cpp b/src/cpp/ripple/LedgerAcquire.cpp
index 3c528a5b1..204396257 100644
--- a/src/cpp/ripple/LedgerAcquire.cpp
+++ b/src/cpp/ripple/LedgerAcquire.cpp
@@ -242,7 +242,7 @@ void LedgerAcquire::noAwaitData()
void LedgerAcquire::addPeers()
{
- std::vector peerList = theApp->getConnectionPool().getPeerVector();
+ std::vector peerList = theApp->getPeers().getPeerVector();
int vSize = peerList.size();
if (vSize == 0)
@@ -399,7 +399,7 @@ void LedgerAcquire::trigger(Peer::ref peer)
for (boost::unordered_map::iterator it = mPeers.begin(), end = mPeers.end();
it != end; ++it)
{
- Peer::pointer iPeer = theApp->getConnectionPool().getPeerById(it->first);
+ Peer::pointer iPeer = theApp->getPeers().getPeerById(it->first);
if (iPeer)
{
mByHash = false;
@@ -554,7 +554,7 @@ void PeerSet::sendRequest(const ripple::TMGetLedger& tmGL)
PackedMessage::pointer packet = boost::make_shared(tmGL, ripple::mtGET_LEDGER);
for (boost::unordered_map::iterator it = mPeers.begin(), end = mPeers.end(); it != end; ++it)
{
- Peer::pointer peer = theApp->getConnectionPool().getPeerById(it->first);
+ Peer::pointer peer = theApp->getPeers().getPeerById(it->first);
if (peer)
peer->sendPacket(packet, false);
}
@@ -577,7 +577,7 @@ int PeerSet::getPeerCount() const
{
int ret = 0;
for (boost::unordered_map::const_iterator it = mPeers.begin(), end = mPeers.end(); it != end; ++it)
- if (theApp->getConnectionPool().hasPeer(it->first))
+ if (theApp->getPeers().hasPeer(it->first))
++ret;
return ret;
}
diff --git a/src/cpp/ripple/LedgerConsensus.cpp b/src/cpp/ripple/LedgerConsensus.cpp
index 1e2c8e522..d4589764b 100644
--- a/src/cpp/ripple/LedgerConsensus.cpp
+++ b/src/cpp/ripple/LedgerConsensus.cpp
@@ -194,7 +194,7 @@ void LedgerConsensus::checkOurValidation()
ripple::TMValidation val;
val.set_validation(&validation[0], validation.size());
#if 0
- theApp->getConnectionPool().relayMessage(NULL,
+ theApp->getPeers().relayMessage(NULL,
boost::make_shared(val, ripple::mtVALIDATION));
#endif
theApp->getOPs().setLastValidation(v);
@@ -450,7 +450,7 @@ void LedgerConsensus::sendHaveTxSet(const uint256& hash, bool direct)
msg.set_hash(hash.begin(), 256 / 8);
msg.set_status(direct ? ripple::tsHAVE : ripple::tsCAN_GET);
PackedMessage::pointer packet = boost::make_shared(msg, ripple::mtHAVE_SET);
- theApp->getConnectionPool().relayMessage(NULL, packet);
+ theApp->getPeers().relayMessage(NULL, packet);
}
void LedgerConsensus::adjustCount(SHAMap::ref map, const std::vector& peers)
@@ -483,7 +483,7 @@ void LedgerConsensus::statusChange(ripple::NodeEvent event, Ledger& ledger)
s.set_lastseq(uMax);
PackedMessage::pointer packet = boost::make_shared(s, ripple::mtSTATUS_CHANGE);
- theApp->getConnectionPool().relayMessage(NULL, packet);
+ theApp->getPeers().relayMessage(NULL, packet);
WriteLog (lsTRACE, LedgerConsensus) << "send status change to peer";
}
@@ -803,7 +803,7 @@ void LedgerConsensus::startAcquiring(TransactionAcquire::pointer acquire)
}
}
- std::vector peerList = theApp->getConnectionPool().getPeerVector();
+ std::vector peerList = theApp->getPeers().getPeerVector();
BOOST_FOREACH(Peer::ref peer, peerList)
{
if (peer->hasTxSet(acquire->getHash()))
@@ -828,7 +828,7 @@ void LedgerConsensus::propose()
std::vector sig = mOurPosition->sign();
prop.set_nodepubkey(&pubKey[0], pubKey.size());
prop.set_signature(&sig[0], sig.size());
- theApp->getConnectionPool().relayMessage(NULL,
+ theApp->getPeers().relayMessage(NULL,
boost::make_shared(prop, ripple::mtPROPOSE_LEDGER));
}
@@ -866,7 +866,7 @@ void LedgerConsensus::addDisputedTransaction(const uint256& txID, const std::vec
msg.set_status(ripple::tsNEW);
msg.set_receivetimestamp(theApp->getOPs().getNetworkTimeNC());
PackedMessage::pointer packet = boost::make_shared(msg, ripple::mtTRANSACTION);
- theApp->getConnectionPool().relayMessage(NULL, packet);
+ theApp->getPeers().relayMessage(NULL, packet);
}
}
@@ -1016,7 +1016,7 @@ void LedgerConsensus::playbackProposals()
nodepubkey
signature
PackedMessage::pointer message = boost::make_shared(set, ripple::mtPROPOSE_LEDGER);
- theApp->getConnectionPool().relayMessageBut(peers, message);
+ theApp->getPeers().relayMessageBut(peers, message);
}
#endif
}
@@ -1230,7 +1230,7 @@ void LedgerConsensus::accept(SHAMap::ref set, LoadEvent::pointer)
std::vector validation = v->getSigned();
ripple::TMValidation val;
val.set_validation(&validation[0], validation.size());
- int j = theApp->getConnectionPool().relayMessage(NULL,
+ int j = theApp->getPeers().relayMessage(NULL,
boost::make_shared(val, ripple::mtVALIDATION));
WriteLog (lsINFO, LedgerConsensus) << "CNF Val " << newLCLHash << " to " << j << " peers";
}
diff --git a/src/cpp/ripple/LedgerMaster.cpp b/src/cpp/ripple/LedgerMaster.cpp
index 2146ee577..194b6724c 100644
--- a/src/cpp/ripple/LedgerMaster.cpp
+++ b/src/cpp/ripple/LedgerMaster.cpp
@@ -315,7 +315,7 @@ bool LedgerMaster::acquireMissingLedger(Ledger::ref origLedger, const uint256& l
tmBH.set_query(true);
tmBH.set_seq(ledgerSeq);
tmBH.set_ledgerhash(ledgerHash.begin(), 32);
- std::vector peerList = theApp->getConnectionPool().getPeerVector();
+ std::vector peerList = theApp->getPeers().getPeerVector();
Peer::pointer target;
int count = 0;
diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp
index ea538ff43..0cb338053 100644
--- a/src/cpp/ripple/NetworkOPs.cpp
+++ b/src/cpp/ripple/NetworkOPs.cpp
@@ -302,7 +302,7 @@ void NetworkOPs::runTransactionQueue()
tx.set_receivetimestamp(getNetworkTimeNC()); // FIXME: This should be when we received it
PackedMessage::pointer packet = boost::make_shared(tx, ripple::mtTRANSACTION);
- theApp->getConnectionPool().relayMessageBut(peers, packet);
+ theApp->getPeers().relayMessageBut(peers, packet);
}
}
@@ -402,7 +402,7 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans,
tx.set_receivetimestamp(getNetworkTimeNC()); // FIXME: This should be when we received it
PackedMessage::pointer packet = boost::make_shared(tx, ripple::mtTRANSACTION);
- theApp->getConnectionPool().relayMessageBut(peers, packet);
+ theApp->getPeers().relayMessageBut(peers, packet);
}
}
@@ -601,7 +601,7 @@ void NetworkOPs::checkState(const boost::system::error_code& result)
theApp->getLoadManager().noDeadLock();
- std::vector peerList = theApp->getConnectionPool().getPeerVector();
+ std::vector peerList = theApp->getPeers().getPeerVector();
// do we have sufficient peers? If not, we are disconnected.
if (peerList.size() < theConfig.NETWORK_QUORUM)
@@ -639,7 +639,7 @@ void NetworkOPs::checkState(const boost::system::error_code& result)
void NetworkOPs::tryStartConsensus()
{
uint256 networkClosed;
- bool ledgerChange = checkLastClosedLedger(theApp->getConnectionPool().getPeerVector(), networkClosed);
+ bool ledgerChange = checkLastClosedLedger(theApp->getPeers().getPeerVector(), networkClosed);
if (networkClosed.isZero())
return;
@@ -822,7 +822,7 @@ void NetworkOPs::switchLastClosedLedger(Ledger::pointer newLedger, bool duringCo
hash = newLedger->getHash();
s.set_ledgerhash(hash.begin(), hash.size());
PackedMessage::pointer packet = boost::make_shared(s, ripple::mtSTATUS_CHANGE);
- theApp->getConnectionPool().relayMessage(NULL, packet);
+ theApp->getPeers().relayMessage(NULL, packet);
}
int NetworkOPs::beginConsensus(const uint256& networkClosed, Ledger::pointer closingLedger)
@@ -865,7 +865,7 @@ bool NetworkOPs::haveConsensusObject()
else
{ // we need to get into the consensus process
uint256 networkClosed;
- std::vector peerList = theApp->getConnectionPool().getPeerVector();
+ std::vector peerList = theApp->getPeers().getPeerVector();
bool ledgerChange = checkLastClosedLedger(peerList, networkClosed);
if (!ledgerChange)
{
@@ -923,7 +923,7 @@ void NetworkOPs::processTrustedProposal(LedgerProposal::pointer proposal,
std::set peers;
theApp->getHashRouter().swapSet(proposal->getHashRouter(), peers, SF_RELAYED);
PackedMessage::pointer message = boost::make_shared(*set, ripple::mtPROPOSE_LEDGER);
- theApp->getConnectionPool().relayMessageBut(peers, message);
+ theApp->getPeers().relayMessageBut(peers, message);
}
else
WriteLog (lsINFO, NetworkOPs) << "Not relaying trusted proposal";
@@ -994,7 +994,7 @@ void NetworkOPs::mapComplete(const uint256& hash, SHAMap::ref map)
void NetworkOPs::endConsensus(bool correctLCL)
{
uint256 deadLedger = mLedgerMaster->getClosedLedger()->getParentHash();
- std::vector peerList = theApp->getConnectionPool().getPeerVector();
+ std::vector peerList = theApp->getPeers().getPeerVector();
BOOST_FOREACH(Peer::ref it, peerList)
if (it && (it->getClosedLedgerHash() == deadLedger))
{
@@ -1282,7 +1282,7 @@ Json::Value NetworkOPs::getServerInfo(bool human, bool admin)
if (fp != 0)
info["fetch_pack"] = Json::UInt(fp);
- info["peers"] = theApp->getConnectionPool().getPeerCount();
+ info["peers"] = theApp->getPeers().getPeerCount();
Json::Value lastClose = Json::objectValue;
lastClose["proposers"] = theApp->getOPs().getPreviousProposers();
diff --git a/src/cpp/ripple/PeerDoor.cpp b/src/cpp/ripple/PeerDoor.cpp
index e34278017..f91bca5e3 100644
--- a/src/cpp/ripple/PeerDoor.cpp
+++ b/src/cpp/ripple/PeerDoor.cpp
@@ -41,7 +41,7 @@ void PeerDoor::startListening()
Peer::pointer new_connection = Peer::New (
mAcceptor.get_io_service(),
mCtx,
- theApp->getConnectionPool().assignPeerId(),
+ theApp->getPeers().assignPeerId(),
true);
mAcceptor.async_accept(new_connection->getSocket(),
@@ -70,10 +70,12 @@ void PeerDoor::handleConnect(Peer::pointer new_connection,
mDelayTimer.async_wait(boost::bind(&PeerDoor::startListening, this));
}
else
+ {
startListening();
+ }
}
-void initSSLContext(boost::asio::ssl::context& context,
+void initSSLContext (boost::asio::ssl::context& context,
std::string key_file, std::string cert_file, std::string chain_file)
{
SSL_CTX* sslContext = context.native_handle();
@@ -95,6 +97,7 @@ void initSSLContext(boost::asio::ssl::context& context,
if (!chain_file.empty())
{
+ // VFALCO: Replace fopen() with RAII
FILE *f = fopen(chain_file.c_str(), "r");
if (!f)
throw std::runtime_error("Unable to open chain file");
diff --git a/src/cpp/ripple/RPCHandler.cpp b/src/cpp/ripple/RPCHandler.cpp
index 4ccdf927b..5a874b4ce 100644
--- a/src/cpp/ripple/RPCHandler.cpp
+++ b/src/cpp/ripple/RPCHandler.cpp
@@ -625,7 +625,7 @@ Json::Value RPCHandler::doConnect(Json::Value jvRequest, int& cost, ScopedLock&
int iPort = jvRequest.isMember("port") ? jvRequest["port"].asInt() : -1;
// XXX Validate legal IP and port
- theApp->getConnectionPool().connectTo(strIp, iPort);
+ theApp->getPeers().connectTo(strIp, iPort);
return "connecting";
}
@@ -772,7 +772,7 @@ Json::Value RPCHandler::doPeers(Json::Value, int& cost, ScopedLock& MasterLockHo
{
Json::Value jvResult(Json::objectValue);
- jvResult["peers"] = theApp->getConnectionPool().getPeersJson();
+ jvResult["peers"] = theApp->getPeers().getPeersJson();
return jvResult;
}
diff --git a/src/cpp/ripple/TransactionAcquire.cpp b/src/cpp/ripple/TransactionAcquire.cpp
index 07342084d..ca7b7f689 100644
--- a/src/cpp/ripple/TransactionAcquire.cpp
+++ b/src/cpp/ripple/TransactionAcquire.cpp
@@ -63,7 +63,7 @@ void TransactionAcquire::onTimer(bool progress)
WriteLog (lsWARNING, TransactionAcquire) << "Out of peers for TX set " << getHash();
bool found = false;
- std::vector peerList = theApp->getConnectionPool().getPeerVector();
+ std::vector peerList = theApp->getPeers().getPeerVector();
BOOST_FOREACH(Peer::ref peer, peerList)
{
if (peer->hasTxSet(getHash()))
diff --git a/src/cpp/ripple/ripple_IPeers.h b/src/cpp/ripple/ripple_IPeers.h
new file mode 100644
index 000000000..35f2aac45
--- /dev/null
+++ b/src/cpp/ripple/ripple_IPeers.h
@@ -0,0 +1,75 @@
+
+
+#ifndef RIPPLE_IPEERS_H
+#define RIPPLE_IPEERS_H
+
+/** Manages the set of connected peers.
+*/
+class IPeers
+{
+public:
+ static IPeers* New (boost::asio::io_service& io_service);
+
+ virtual ~IPeers () { }
+
+ // Begin enforcing connection policy.
+ virtual void start () = 0;
+
+ // Send message to network.
+ virtual int relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg) = 0;
+ virtual void relayMessageTo(const std::set& fromPeers, const PackedMessage::pointer& msg) = 0;
+ virtual void relayMessageBut(const std::set& fromPeers, const PackedMessage::pointer& msg) = 0;
+
+ // Manual connection request.
+ // Queue for immediate scanning.
+ virtual void connectTo(const std::string& strIp, int iPort) = 0;
+
+ //
+ // Peer connectivity notification.
+ //
+ virtual bool getTopNAddrs(int n,std::vector& addrs) = 0;
+ virtual bool savePeer(const std::string& strIp, int iPort, char code) = 0;
+
+ // We know peers node public key.
+ // <-- bool: false=reject
+ virtual bool peerConnected(Peer::ref peer, const RippleAddress& naPeer, const std::string& strIP, int iPort) = 0;
+
+ // No longer connected.
+ virtual void peerDisconnected(Peer::ref peer, const RippleAddress& naPeer) = 0;
+
+ // As client accepted.
+ virtual void peerVerified(Peer::ref peer) = 0;
+
+ // As client failed connect and be accepted.
+ virtual void peerClosed(Peer::ref peer, const std::string& strIp, int iPort) = 0;
+
+ virtual int getPeerCount() = 0;
+ virtual Json::Value getPeersJson() = 0;
+ virtual std::vector getPeerVector() = 0;
+
+ // Peer 64-bit ID function
+ virtual uint64 assignPeerId() = 0;
+ virtual Peer::pointer getPeerById(const uint64& id) = 0;
+ virtual bool hasPeer(const uint64& id) = 0;
+
+ //
+ // Scanning
+ //
+
+ virtual void scanRefresh() = 0;
+
+ //
+ // Connection policy
+ //
+ virtual void policyLowWater() = 0;
+ virtual void policyEnforce() = 0; // VFALCO: This and others can be made private
+
+ // configured connections
+ virtual void makeConfigured() = 0;
+};
+
+// VFALCO: TODO Put this in some group of utilities
+extern void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort);
+
+#endif
+// vim:ts=4
diff --git a/src/cpp/ripple/ripple_Peer.cpp b/src/cpp/ripple/ripple_Peer.cpp
index 5357014ac..3cf80d8ed 100644
--- a/src/cpp/ripple/ripple_Peer.cpp
+++ b/src/cpp/ripple/ripple_Peer.cpp
@@ -240,7 +240,7 @@ void PeerImp::detach(const char *rsn, bool onIOStrand)
if (mNodePublic.isValid())
{
- theApp->getConnectionPool().peerDisconnected(shared_from_this(), mNodePublic);
+ theApp->getPeers().peerDisconnected(shared_from_this(), mNodePublic);
mNodePublic.clear(); // Be idempotent.
}
@@ -249,7 +249,7 @@ void PeerImp::detach(const char *rsn, bool onIOStrand)
{
// Connection might be part of scanning. Inform connect failed.
// Might need to scan. Inform connection closed.
- theApp->getConnectionPool().peerClosed(shared_from_this(), mIpPort.first, mIpPort.second);
+ theApp->getPeers().peerClosed(shared_from_this(), mIpPort.first, mIpPort.second);
mIpPort.first.clear(); // Be idempotent.
}
@@ -897,10 +897,10 @@ void PeerImp::recvHello(ripple::TMHello& packet)
if (mClientConnect)
{
// If we connected due to scan, no longer need to scan.
- theApp->getConnectionPool().peerVerified(shared_from_this());
+ theApp->getPeers().peerVerified(shared_from_this());
}
- if (!theApp->getConnectionPool().peerConnected(shared_from_this(), mNodePublic, getIP(), getPort()))
+ if (! theApp->getPeers().peerConnected(shared_from_this(), mNodePublic, getIP(), getPort()))
{ // Already connected, self, or some other reason.
WriteLog (lsINFO, Peer) << "Recv(Hello): Disconnect: Extraneous connection.";
}
@@ -926,7 +926,7 @@ void PeerImp::recvHello(ripple::TMHello& packet)
// Don't save IP address if the node wants privacy.
// Note: We don't go so far as to delete it. If a node which has previously announced itself now wants
// privacy, it should at least change its port.
- theApp->getConnectionPool().savePeer(strIP, iPort, IUniqueNodeList::vsInbound);
+ theApp->getPeers().savePeer(strIP, iPort, IUniqueNodeList::vsInbound);
}
}
@@ -1100,7 +1100,7 @@ static void checkPropose(Job& job, boost::shared_ptr packe
std::set peers;
theApp->getHashRouter().swapSet(proposal->getHashRouter(), peers, SF_RELAYED);
PackedMessage::pointer message = boost::make_shared(set, ripple::mtPROPOSE_LEDGER);
- theApp->getConnectionPool().relayMessageBut(peers, message);
+ theApp->getPeers().relayMessageBut(peers, message);
}
else
WriteLog (lsDEBUG, Peer) << "Not relaying untrusted proposal";
@@ -1208,7 +1208,7 @@ static void checkValidation(Job&, SerializedValidation::pointer val, uint256 sig
theApp->getHashRouter().swapSet(signingHash, peers, SF_RELAYED))
{
PackedMessage::pointer message = boost::make_shared(*packet, ripple::mtVALIDATION);
- theApp->getConnectionPool().relayMessageBut(peers, message);
+ theApp->getPeers().relayMessageBut(peers, message);
}
}
#ifndef TRUST_NETWORK
@@ -1279,7 +1279,7 @@ void PeerImp::recvGetPeers(ripple::TMGetPeers& packet, ScopedLock& MasterLockHol
MasterLockHolder.unlock();
std::vector addrs;
- theApp->getConnectionPool().getTopNAddrs(30, addrs);
+ theApp->getPeers().getTopNAddrs(30, addrs);
if (!addrs.empty())
{
@@ -1321,7 +1321,7 @@ void PeerImp::recvPeers(ripple::TMPeers& packet)
{
//WriteLog (lsINFO, Peer) << "Peer: Learning: " << ADDRESS(this) << ": " << i << ": " << strIP << " " << iPort;
- theApp->getConnectionPool().savePeer(strIP, iPort, IUniqueNodeList::vsTold);
+ theApp->getPeers().savePeer(strIP, iPort, IUniqueNodeList::vsTold);
}
}
}
@@ -1584,7 +1584,7 @@ void PeerImp::recvGetLedger(ripple::TMGetLedger& packet, ScopedLock& MasterLockH
if (packet.has_querytype() && !packet.has_requestcookie())
{
WriteLog (lsDEBUG, Peer) << "Trying to route TX set request";
- std::vector peerList = theApp->getConnectionPool().getPeerVector();
+ std::vector peerList = theApp->getPeers().getPeerVector();
std::vector usablePeers;
BOOST_FOREACH(Peer::ref peer, peerList)
{
@@ -1634,7 +1634,7 @@ void PeerImp::recvGetLedger(ripple::TMGetLedger& packet, ScopedLock& MasterLockH
uint32 seq = 0;
if (packet.has_ledgerseq())
seq = packet.ledgerseq();
- std::vector peerList = theApp->getConnectionPool().getPeerVector();
+ std::vector peerList = theApp->getPeers().getPeerVector();
std::vector usablePeers;
BOOST_FOREACH(Peer::ref peer, peerList)
{
@@ -1816,7 +1816,7 @@ void PeerImp::recvLedger(const boost::shared_ptr& packet_p
if (packet.has_requestcookie())
{
- Peer::pointer target = theApp->getConnectionPool().getPeerById(packet.requestcookie());
+ Peer::pointer target = theApp->getPeers().getPeerById(packet.requestcookie());
if (target)
{
packet.clear_requestcookie();
diff --git a/src/cpp/ripple/ConnectionPool.cpp b/src/cpp/ripple/ripple_Peers.cpp
similarity index 65%
rename from src/cpp/ripple/ConnectionPool.cpp
rename to src/cpp/ripple/ripple_Peers.cpp
index 5ccc49157..28313f3e9 100644
--- a/src/cpp/ripple/ConnectionPool.cpp
+++ b/src/cpp/ripple/ripple_Peers.cpp
@@ -1,23 +1,123 @@
-#include "ConnectionPool.h"
-
-#include
-#include
-#include
-#include
-#include
-#include
-
-#include "PeerDoor.h"
-#include "Application.h"
-
// VFALCO: TODO, make this an inline function
#define ADDRESS_SHARED(p) strHex(uint64( ((char*) (p).get()) - ((char*) 0)))
// How often to enforce policies.
#define POLICY_INTERVAL_SECONDS 5
-SETUP_LOG (ConnectionPool)
+class Peers;
+
+SETUP_LOG (Peers)
+
+class Peers : public IPeers
+{
+public:
+ explicit Peers (boost::asio::io_service& io_service)
+ : mLastPeer (0)
+ , mPhase (0)
+ , mScanTimer (io_service)
+ , mPolicyTimer (io_service)
+ {
+ }
+
+ // Begin enforcing connection policy.
+ void start();
+
+ // Send message to network.
+ int relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg);
+ void relayMessageTo(const std::set& fromPeers, const PackedMessage::pointer& msg);
+ void relayMessageBut(const std::set& fromPeers, const PackedMessage::pointer& msg);
+
+ // Manual connection request.
+ // Queue for immediate scanning.
+ void connectTo(const std::string& strIp, int iPort);
+
+ //
+ // Peer connectivity notification.
+ //
+ bool getTopNAddrs(int n,std::vector& addrs);
+ bool savePeer(const std::string& strIp, int iPort, char code);
+
+ // We know peers node public key.
+ // <-- bool: false=reject
+ bool peerConnected(Peer::ref peer, const RippleAddress& naPeer, const std::string& strIP, int iPort);
+
+ // No longer connected.
+ void peerDisconnected(Peer::ref peer, const RippleAddress& naPeer);
+
+ // As client accepted.
+ void peerVerified(Peer::ref peer);
+
+ // As client failed connect and be accepted.
+ void peerClosed(Peer::ref peer, const std::string& strIp, int iPort);
+
+ int getPeerCount();
+ Json::Value getPeersJson();
+ std::vector getPeerVector();
+
+ // Peer 64-bit ID function
+ uint64 assignPeerId();
+ Peer::pointer getPeerById(const uint64& id);
+ bool hasPeer(const uint64& id);
+
+ //
+ // Scanning
+ //
+
+ void scanRefresh();
+
+ //
+ // Connection policy
+ //
+ void policyLowWater();
+ void policyEnforce();
+
+ // configured connections
+ void makeConfigured();
+
+private:
+ boost::recursive_mutex mPeerLock;
+ uint64 mLastPeer;
+ int mPhase;
+
+ typedef std::pair naPeer;
+ typedef std::pair pipPeer;
+ typedef std::map::value_type vtPeer;
+
+ // Peers we are connecting with and non-thin peers we are connected to.
+ // Only peers we know the connection ip for are listed.
+ // We know the ip and port for:
+ // - All outbound connections
+ // - Some inbound connections (which we figured out).
+ boost::unordered_map mIpMap;
+
+ // Non-thin peers which we are connected to.
+ // Peers we have the public key for.
+ typedef boost::unordered_map::value_type vtConMap;
+ boost::unordered_map mConnectedMap;
+
+ // Connections with have a 64-bit identifier
+ boost::unordered_map mPeerIdMap;
+
+ Peer::pointer mScanning;
+ boost::asio::deadline_timer mScanTimer;
+ std::string mScanIp;
+ int mScanPort;
+
+ void scanHandler(const boost::system::error_code& ecResult);
+
+ boost::asio::deadline_timer mPolicyTimer;
+
+ void policyHandler(const boost::system::error_code& ecResult);
+
+ // Peers we are establishing a connection with as a client.
+ // int miConnectStarting;
+
+ bool peerAvailable(std::string& strIp, int& iPort);
+ bool peerScanSet(const std::string& strIp, int iPort);
+
+ Peer::pointer peerConnect(const std::string& strIp, int iPort);
+};
void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort)
{
@@ -28,7 +128,7 @@ void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort)
iPort = boost::lexical_cast(vIpPort[1]);
}
-void ConnectionPool::start()
+void Peers::start()
{
if (theConfig.RUN_STANDALONE)
return;
@@ -40,7 +140,7 @@ void ConnectionPool::start()
scanRefresh();
}
-bool ConnectionPool::getTopNAddrs(int n,std::vector& addrs)
+bool Peers::getTopNAddrs(int n,std::vector& addrs)
{
// XXX Filter out other local addresses (like ipv6)
Database* db = theApp->getWalletDB()->getDB();
@@ -58,7 +158,7 @@ bool ConnectionPool::getTopNAddrs(int n,std::vector& addrs)
return true;
}
-bool ConnectionPool::savePeer(const std::string& strIp, int iPort, char code)
+bool Peers::savePeer(const std::string& strIp, int iPort, char code)
{
bool bNew = false;
@@ -96,7 +196,7 @@ bool ConnectionPool::savePeer(const std::string& strIp, int iPort, char code)
return bNew;
}
-Peer::pointer ConnectionPool::getPeerById(const uint64& id)
+Peer::pointer Peers::getPeerById(const uint64& id)
{
boost::recursive_mutex::scoped_lock sl(mPeerLock);
const boost::unordered_map::iterator& it = mPeerIdMap.find(id);
@@ -105,7 +205,7 @@ Peer::pointer ConnectionPool::getPeerById(const uint64& id)
return it->second;
}
-bool ConnectionPool::hasPeer(const uint64& id)
+bool Peers::hasPeer(const uint64& id)
{
boost::recursive_mutex::scoped_lock sl(mPeerLock);
return mPeerIdMap.find(id) != mPeerIdMap.end();
@@ -115,7 +215,7 @@ bool ConnectionPool::hasPeer(const uint64& id)
// too.
//
// <-- true, if a peer is available to connect to
-bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort)
+bool Peers::peerAvailable(std::string& strIp, int& iPort)
{
Database* db = theApp->getWalletDB()->getDB();
std::vector vstrIpPort;
@@ -159,7 +259,7 @@ bool ConnectionPool::peerAvailable(std::string& strIp, int& iPort)
}
// Make sure we have at least low water connections.
-void ConnectionPool::policyLowWater()
+void Peers::policyLowWater()
{
std::string strIp;
int iPort;
@@ -168,7 +268,7 @@ void ConnectionPool::policyLowWater()
if (getPeerCount() > theConfig.PEER_CONNECT_LOW_WATER)
{
// Above low water mark, don't need more connections.
- WriteLog (lsTRACE, ConnectionPool) << "Pool: Low water: sufficient connections: " << mConnectedMap.size() << "/" << theConfig.PEER_CONNECT_LOW_WATER;
+ WriteLog (lsTRACE, Peers) << "Pool: Low water: sufficient connections: " << mConnectedMap.size() << "/" << theConfig.PEER_CONNECT_LOW_WATER;
nothing();
}
@@ -182,7 +282,7 @@ void ConnectionPool::policyLowWater()
else if (!peerAvailable(strIp, iPort))
{
// No more connections available to start.
- WriteLog (lsTRACE, ConnectionPool) << "Pool: Low water: no peers available.";
+ WriteLog (lsTRACE, Peers) << "Pool: Low water: no peers available.";
// XXX Might ask peers for more ips.
nothing();
@@ -190,11 +290,11 @@ void ConnectionPool::policyLowWater()
else
{
// Try to start connection.
- WriteLog (lsTRACE, ConnectionPool) << "Pool: Low water: start connection.";
+ WriteLog (lsTRACE, Peers) << "Pool: Low water: start connection.";
if (!peerConnect(strIp, iPort))
{
- WriteLog (lsINFO, ConnectionPool) << "Pool: Low water: already connected.";
+ WriteLog (lsINFO, Peers) << "Pool: Low water: already connected.";
}
// Check if we need more.
@@ -202,7 +302,7 @@ void ConnectionPool::policyLowWater()
}
}
-void ConnectionPool::policyEnforce()
+void Peers::policyEnforce()
{
// Cancel any in progress timer.
(void) mPolicyTimer.cancel();
@@ -212,16 +312,16 @@ void ConnectionPool::policyEnforce()
if (((++mPhase) % 12) == 0)
{
- WriteLog (lsTRACE, ConnectionPool) << "Making configured connections";
+ WriteLog (lsTRACE, Peers) << "Making configured connections";
makeConfigured();
}
// Schedule next enforcement.
mPolicyTimer.expires_at(boost::posix_time::second_clock::universal_time()+boost::posix_time::seconds(POLICY_INTERVAL_SECONDS));
- mPolicyTimer.async_wait(boost::bind(&ConnectionPool::policyHandler, this, _1));
+ mPolicyTimer.async_wait(boost::bind(&Peers::policyHandler, this, _1));
}
-void ConnectionPool::policyHandler(const boost::system::error_code& ecResult)
+void Peers::policyHandler(const boost::system::error_code& ecResult)
{
if (ecResult == boost::asio::error::operation_aborted)
{
@@ -239,7 +339,7 @@ void ConnectionPool::policyHandler(const boost::system::error_code& ecResult)
// YYY: Should probably do this in the background.
// YYY: Might end up sending to disconnected peer?
-int ConnectionPool::relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg)
+int Peers::relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg)
{
int sentTo = 0;
std::vector peerVector = getPeerVector();
@@ -255,7 +355,7 @@ int ConnectionPool::relayMessage(Peer* fromPeer, const PackedMessage::pointer& m
return sentTo;
}
-void ConnectionPool::relayMessageBut(const std::set& fromPeers, const PackedMessage::pointer& msg)
+void Peers::relayMessageBut(const std::set& fromPeers, const PackedMessage::pointer& msg)
{ // Relay message to all but the specified peers
std::vector peerVector = getPeerVector();
BOOST_FOREACH(Peer::ref peer, peerVector)
@@ -266,7 +366,7 @@ void ConnectionPool::relayMessageBut(const std::set& fromPeers, const Pa
}
-void ConnectionPool::relayMessageTo(const std::set& fromPeers, const PackedMessage::pointer& msg)
+void Peers::relayMessageTo(const std::set& fromPeers, const PackedMessage::pointer& msg)
{ // Relay message to the specified peers
std::vector peerVector = getPeerVector();
BOOST_FOREACH(Peer::ref peer, peerVector)
@@ -281,7 +381,7 @@ void ConnectionPool::relayMessageTo(const std::set& fromPeers, const Pac
//
// Add or modify into PeerIps as a manual entry for immediate scanning.
// Requires sane IP and port.
-void ConnectionPool::connectTo(const std::string& strIp, int iPort)
+void Peers::connectTo(const std::string& strIp, int iPort)
{
{
Database* db = theApp->getWalletDB()->getDB();
@@ -299,7 +399,7 @@ void ConnectionPool::connectTo(const std::string& strIp, int iPort)
// Start a connection, if not already known connected or connecting.
//
// <-- true, if already connected.
-Peer::pointer ConnectionPool::peerConnect(const std::string& strIp, int iPort)
+Peer::pointer Peers::peerConnect(const std::string& strIp, int iPort)
{
ipPort pipPeer = make_pair(strIp, iPort);
Peer::pointer ppResult;
@@ -322,18 +422,18 @@ Peer::pointer ConnectionPool::peerConnect(const std::string& strIp, int iPort)
if (ppResult)
{
ppResult->connect(strIp, iPort);
- WriteLog (lsDEBUG, ConnectionPool) << "Pool: Connecting: " << strIp << " " << iPort;
+ WriteLog (lsDEBUG, Peers) << "Pool: Connecting: " << strIp << " " << iPort;
}
else
{
- WriteLog (lsTRACE, ConnectionPool) << "Pool: Already connected: " << strIp << " " << iPort;
+ WriteLog (lsTRACE, Peers) << "Pool: Already connected: " << strIp << " " << iPort;
}
return ppResult;
}
// Returns information on verified peers.
-Json::Value ConnectionPool::getPeersJson()
+Json::Value Peers::getPeersJson()
{
Json::Value ret(Json::arrayValue);
std::vector vppPeers = getPeerVector();
@@ -346,14 +446,14 @@ Json::Value ConnectionPool::getPeersJson()
return ret;
}
-int ConnectionPool::getPeerCount()
+int Peers::getPeerCount()
{
boost::recursive_mutex::scoped_lock sl(mPeerLock);
return mConnectedMap.size();
}
-std::vector ConnectionPool::getPeerVector()
+std::vector Peers::getPeerVector()
{
std::vector ret;
@@ -370,7 +470,7 @@ std::vector ConnectionPool::getPeerVector()
return ret;
}
-uint64 ConnectionPool::assignPeerId()
+uint64 Peers::assignPeerId()
{
boost::recursive_mutex::scoped_lock sl(mPeerLock);
return ++mLastPeer;
@@ -378,7 +478,7 @@ uint64 ConnectionPool::assignPeerId()
// Now know peer's node public key. Determine if we want to stay connected.
// <-- bNew: false = redundant
-bool ConnectionPool::peerConnected(Peer::ref peer, const RippleAddress& naPeer,
+bool Peers::peerConnected(Peer::ref peer, const RippleAddress& naPeer,
const std::string& strIP, int iPort)
{
bool bNew = false;
@@ -387,7 +487,7 @@ bool ConnectionPool::peerConnected(Peer::ref peer, const RippleAddress& naPeer,
if (naPeer == theApp->getWallet().getNodePublic())
{
- WriteLog (lsINFO, ConnectionPool) << "Pool: Connected: self: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort;
+ WriteLog (lsINFO, Peers) << "Pool: Connected: self: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort;
}
else
{
@@ -397,7 +497,7 @@ bool ConnectionPool::peerConnected(Peer::ref peer, const RippleAddress& naPeer,
if (itCm == mConnectedMap.end())
{
// New connection.
- //WriteLog (lsINFO, ConnectionPool) << "Pool: Connected: new: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort;
+ //WriteLog (lsINFO, Peers) << "Pool: Connected: new: " << ADDRESS_SHARED(peer) << ": " << naPeer.humanNodePublic() << " " << strIP << " " << iPort;
mConnectedMap[naPeer] = peer;
bNew = true;
@@ -413,7 +513,7 @@ bool ConnectionPool::peerConnected(Peer::ref peer, const RippleAddress& naPeer,
if (itCm->second->getIP().empty())
{
// Old peer did not know it's IP.
- //WriteLog (lsINFO, ConnectionPool) << "Pool: Connected: redundant: outbound: " << ADDRESS_SHARED(peer) << " discovered: " << ADDRESS_SHARED(itCm->second) << ": " << strIP << " " << iPort;
+ //WriteLog (lsINFO, Peers) << "Pool: Connected: redundant: outbound: " << ADDRESS_SHARED(peer) << " discovered: " << ADDRESS_SHARED(itCm->second) << ": " << strIP << " " << iPort;
itCm->second->setIpPort(strIP, iPort);
@@ -423,14 +523,14 @@ bool ConnectionPool::peerConnected(Peer::ref peer, const RippleAddress& naPeer,
else
{
// Old peer knew its IP. Do nothing.
- //WriteLog (lsINFO, ConnectionPool) << "Pool: Connected: redundant: outbound: rediscovered: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort;
+ //WriteLog (lsINFO, Peers) << "Pool: Connected: redundant: outbound: rediscovered: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort;
nothing();
}
}
else
{
- //WriteLog (lsINFO, ConnectionPool) << "Pool: Connected: redundant: inbound: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort;
+ //WriteLog (lsINFO, Peers) << "Pool: Connected: redundant: inbound: " << ADDRESS_SHARED(peer) << " " << strIP << " " << iPort;
nothing();
}
@@ -440,7 +540,7 @@ bool ConnectionPool::peerConnected(Peer::ref peer, const RippleAddress& naPeer,
}
// We maintain a map of public key to peer for connected and verified peers. Maintain it.
-void ConnectionPool::peerDisconnected(Peer::ref peer, const RippleAddress& naPeer)
+void Peers::peerDisconnected(Peer::ref peer, const RippleAddress& naPeer)
{
boost::recursive_mutex::scoped_lock sl(mPeerLock);
@@ -451,12 +551,12 @@ void ConnectionPool::peerDisconnected(Peer::ref peer, const RippleAddress& naPee
if (itCm == mConnectedMap.end())
{
// Did not find it. Not already connecting or connected.
- WriteLog (lsWARNING, ConnectionPool) << "Pool: disconnected: Internal Error: mConnectedMap was inconsistent.";
+ WriteLog (lsWARNING, Peers) << "Pool: disconnected: Internal Error: mConnectedMap was inconsistent.";
// XXX Maybe bad error, considering we have racing connections, may not so bad.
}
else if (itCm->second != peer)
{
- WriteLog (lsWARNING, ConnectionPool) << "Pool: disconected: non canonical entry";
+ WriteLog (lsWARNING, Peers) << "Pool: disconected: non canonical entry";
nothing();
}
@@ -465,12 +565,12 @@ void ConnectionPool::peerDisconnected(Peer::ref peer, const RippleAddress& naPee
// Found it. Delete it.
mConnectedMap.erase(itCm);
- //WriteLog (lsINFO, ConnectionPool) << "Pool: disconnected: " << naPeer.humanNodePublic() << " " << peer->getIP() << " " << peer->getPort();
+ //WriteLog (lsINFO, Peers) << "Pool: disconnected: " << naPeer.humanNodePublic() << " " << peer->getIP() << " " << peer->getPort();
}
}
else
{
- //WriteLog (lsINFO, ConnectionPool) << "Pool: disconnected: anonymous: " << peer->getIP() << " " << peer->getPort();
+ //WriteLog (lsINFO, Peers) << "Pool: disconnected: anonymous: " << peer->getIP() << " " << peer->getPort();
}
assert(peer->getPeerId() != 0);
@@ -480,7 +580,7 @@ void ConnectionPool::peerDisconnected(Peer::ref peer, const RippleAddress& naPee
// Schedule for immediate scanning, if not already scheduled.
//
// <-- true, scanRefresh needed.
-bool ConnectionPool::peerScanSet(const std::string& strIp, int iPort)
+bool Peers::peerScanSet(const std::string& strIp, int iPort)
{
std::string strIpPort = str(boost::format("%s %d") % strIp % iPort);
bool bScanDirty = false;
@@ -499,7 +599,7 @@ bool ConnectionPool::peerScanSet(const std::string& strIp, int iPort)
boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time();
boost::posix_time::ptime tpNext = tpNow + boost::posix_time::seconds(iInterval);
- //WriteLog (lsINFO, ConnectionPool) << str(boost::format("Pool: Scan: schedule create: %s %s (next %s, delay=%d)")
+ //WriteLog (lsINFO, Peers) << str(boost::format("Pool: Scan: schedule create: %s %s (next %s, delay=%d)")
// % mScanIp % mScanPort % tpNext % (tpNext-tpNow).total_seconds());
db->executeSQL(str(boost::format("UPDATE PeerIps SET ScanNext=%d,ScanInterval=%d WHERE IpPort=%s;")
@@ -515,21 +615,21 @@ bool ConnectionPool::peerScanSet(const std::string& strIp, int iPort)
// boost::posix_time::ptime tpNow = boost::posix_time::second_clock::universal_time();
// boost::posix_time::ptime tpNext = ptFromSeconds(db->getInt("ScanNext"));
- //WriteLog (lsINFO, ConnectionPool) << str(boost::format("Pool: Scan: schedule exists: %s %s (next %s, delay=%d)")
+ //WriteLog (lsINFO, Peers) << str(boost::format("Pool: Scan: schedule exists: %s %s (next %s, delay=%d)")
// % mScanIp % mScanPort % tpNext % (tpNext-tpNow).total_seconds());
}
db->endIterRows();
}
else
{
- //WriteLog (lsWARNING, ConnectionPool) << "Pool: Scan: peer wasn't in PeerIps: " << strIp << " " << iPort;
+ //WriteLog (lsWARNING, Peers) << "Pool: Scan: peer wasn't in PeerIps: " << strIp << " " << iPort;
}
return bScanDirty;
}
// --> strIp: not empty
-void ConnectionPool::peerClosed(Peer::ref peer, const std::string& strIp, int iPort)
+void Peers::peerClosed(Peer::ref peer, const std::string& strIp, int iPort)
{
ipPort ipPeer = make_pair(strIp, iPort);
bool bScanRefresh = false;
@@ -537,7 +637,7 @@ void ConnectionPool::peerClosed(Peer::ref peer, const std::string& strIp, int iP
// If the connection was our scan, we are no longer scanning.
if (mScanning && mScanning == peer)
{
- //WriteLog (lsINFO, ConnectionPool) << "Pool: Scan: scan fail: " << strIp << " " << iPort;
+ //WriteLog (lsINFO, Peers) << "Pool: Scan: scan fail: " << strIp << " " << iPort;
mScanning.reset(); // No longer scanning.
bScanRefresh = true; // Look for more to scan.
@@ -552,13 +652,13 @@ void ConnectionPool::peerClosed(Peer::ref peer, const std::string& strIp, int iP
if (itIp == mIpMap.end())
{
// Did not find it. Not already connecting or connected.
- WriteLog (lsWARNING, ConnectionPool) << "Pool: Closed: UNEXPECTED: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort;
+ WriteLog (lsWARNING, Peers) << "Pool: Closed: UNEXPECTED: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort;
// XXX Internal error.
}
else if (mIpMap[ipPeer] == peer)
{
// We were the identified connection.
- //WriteLog (lsINFO, ConnectionPool) << "Pool: Closed: identified: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort;
+ //WriteLog (lsINFO, Peers) << "Pool: Closed: identified: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort;
// Delete our entry.
mIpMap.erase(itIp);
@@ -568,7 +668,7 @@ void ConnectionPool::peerClosed(Peer::ref peer, const std::string& strIp, int iP
else
{
// Found it. But, we were redundant.
- //WriteLog (lsINFO, ConnectionPool) << "Pool: Closed: redundant: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort;
+ //WriteLog (lsINFO, Peers) << "Pool: Closed: redundant: " << ADDRESS_SHARED(peer) << ": " << strIp << " " << iPort;
}
}
@@ -582,7 +682,7 @@ void ConnectionPool::peerClosed(Peer::ref peer, const std::string& strIp, int iP
scanRefresh();
}
-void ConnectionPool::peerVerified(Peer::ref peer)
+void Peers::peerVerified(Peer::ref peer)
{
if (mScanning && mScanning == peer)
{
@@ -592,7 +692,7 @@ void ConnectionPool::peerVerified(Peer::ref peer)
std::string strIpPort = str(boost::format("%s %d") % strIp % iPort);
- //WriteLog (lsINFO, ConnectionPool) << str(boost::format("Pool: Scan: connected: %s %s %s (scanned)") % ADDRESS_SHARED(peer) % strIp % iPort);
+ //WriteLog (lsINFO, Peers) << str(boost::format("Pool: Scan: connected: %s %s %s (scanned)") % ADDRESS_SHARED(peer) % strIp % iPort);
if (peer->getNodePublic() == theApp->getWallet().getNodePublic())
{
@@ -617,7 +717,7 @@ void ConnectionPool::peerVerified(Peer::ref peer)
}
}
-void ConnectionPool::scanHandler(const boost::system::error_code& ecResult)
+void Peers::scanHandler(const boost::system::error_code& ecResult)
{
if (ecResult == boost::asio::error::operation_aborted)
{
@@ -633,7 +733,7 @@ void ConnectionPool::scanHandler(const boost::system::error_code& ecResult)
}
}
-void ConnectionPool::makeConfigured()
+void Peers::makeConfigured()
{
if (theConfig.RUN_STANDALONE)
return;
@@ -648,7 +748,7 @@ void ConnectionPool::makeConfigured()
}
// Scan ips as per db entries.
-void ConnectionPool::scanRefresh()
+void Peers::scanRefresh()
{
if (theConfig.RUN_STANDALONE)
{
@@ -657,7 +757,7 @@ void ConnectionPool::scanRefresh()
else if (mScanning)
{
// Currently scanning, will scan again after completion.
- WriteLog (lsTRACE, ConnectionPool) << "Pool: Scan: already scanning";
+ WriteLog (lsTRACE, Peers) << "Pool: Scan: already scanning";
nothing();
}
@@ -695,7 +795,7 @@ void ConnectionPool::scanRefresh()
if (tpNow.is_not_a_date_time())
{
- //WriteLog (lsINFO, ConnectionPool) << "Pool: Scan: stop.";
+ //WriteLog (lsINFO, Peers) << "Pool: Scan: stop.";
(void) mScanTimer.cancel();
}
@@ -710,7 +810,7 @@ void ConnectionPool::scanRefresh()
tpNext = tpNow + boost::posix_time::seconds(iInterval);
- //WriteLog (lsINFO, ConnectionPool) << str(boost::format("Pool: Scan: Now: %s %s (next %s, delay=%d)")
+ //WriteLog (lsINFO, Peers) << str(boost::format("Pool: Scan: Now: %s %s (next %s, delay=%d)")
// % mScanIp % mScanPort % tpNext % (tpNext-tpNow).total_seconds());
iInterval *= 2;
@@ -735,17 +835,22 @@ void ConnectionPool::scanRefresh()
}
else
{
- //WriteLog (lsINFO, ConnectionPool) << str(boost::format("Pool: Scan: Next: %s (next %s, delay=%d)")
+ //WriteLog (lsINFO, Peers) << str(boost::format("Pool: Scan: Next: %s (next %s, delay=%d)")
// % strIpPort % tpNext % (tpNext-tpNow).total_seconds());
mScanTimer.expires_at(tpNext);
- mScanTimer.async_wait(boost::bind(&ConnectionPool::scanHandler, this, _1));
+ mScanTimer.async_wait(boost::bind(&Peers::scanHandler, this, _1));
}
}
}
+IPeers* IPeers::New (boost::asio::io_service& io_service)
+{
+ return new Peers (io_service);
+}
+
#if 0
-bool ConnectionPool::isMessageKnown(PackedMessage::pointer msg)
+bool Peers::isMessageKnown(PackedMessage::pointer msg)
{
for(unsigned int n=0; ngetConnectionPool().scanRefresh();
+ theApp->getPeers().scanRefresh();
}
}