From d8c97c214965478005648e848b0841481da18e23 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Wed, 5 Jun 2013 08:38:27 -0700 Subject: [PATCH] Refactor Peer --- .../utility/ripple_InstanceCounter.h | 1 + modules/ripple_main/ripple_main.cpp | 16 +- newcoin.vcxproj | 14 +- newcoin.vcxproj.filters | 12 +- src/cpp/ripple/ConnectionPool.cpp | 6 +- src/cpp/ripple/Peer.h | 187 +++------ src/cpp/ripple/PeerDoor.cpp | 7 +- src/cpp/ripple/RPCHandler.cpp | 26 +- src/cpp/ripple/{Peer.cpp => ripple_Peer.cpp} | 383 ++++++++++++------ src/cpp/ripple/ripple_Peer.h | 95 +++++ 10 files changed, 460 insertions(+), 287 deletions(-) rename src/cpp/ripple/{Peer.cpp => ripple_Peer.cpp} (80%) create mode 100644 src/cpp/ripple/ripple_Peer.h diff --git a/modules/ripple_basics/utility/ripple_InstanceCounter.h b/modules/ripple_basics/utility/ripple_InstanceCounter.h index 422551399..2dad98407 100644 --- a/modules/ripple_basics/utility/ripple_InstanceCounter.h +++ b/modules/ripple_basics/utility/ripple_InstanceCounter.h @@ -5,6 +5,7 @@ // with a robust leak checker when we have atomics. // +// VFALCO: TODO, swap these. Declaration means header, definition means .cpp!!! #define DEFINE_INSTANCE(x) \ extern InstanceType IT_##x; \ class Instance_##x : private Instance \ diff --git a/modules/ripple_main/ripple_main.cpp b/modules/ripple_main/ripple_main.cpp index 239234f54..7592eb98a 100644 --- a/modules/ripple_main/ripple_main.cpp +++ b/modules/ripple_main/ripple_main.cpp @@ -25,6 +25,7 @@ //------------------------------------------------------------------------------ #include +#include #include #include #include @@ -41,13 +42,13 @@ #include #include #include +#include #include #include #include #include #include #include -//#include #include #include #include @@ -60,7 +61,6 @@ #include #include -//#include // is this needed? #include #include @@ -102,6 +102,7 @@ #include "src/cpp/ripple/ripple_DatabaseCon.h" #include "src/cpp/ripple/ripple_LoadEvent.h" #include "src/cpp/ripple/ripple_LoadMonitor.h" +#include "src/cpp/ripple/ripple_ProofOfWork.h" #include "src/cpp/ripple/ripple_Job.h" #include "src/cpp/ripple/ripple_JobQueue.h" @@ -109,6 +110,8 @@ #include "src/cpp/ripple/ripple_IFeeVote.h" #include "src/cpp/ripple/ripple_IHashRouter.h" #include "src/cpp/ripple/ripple_ILoadFeeTrack.h" +#include "src/cpp/ripple/ripple_Peer.h" // VFALCO: TODO Rename to IPeer +#include "src/cpp/ripple/ripple_IProofOfWorkFactory.h" #include "src/cpp/ripple/ripple_IUniqueNodeList.h" #include "src/cpp/ripple/ripple_IValidations.h" @@ -131,9 +134,6 @@ #include "src/cpp/ripple/AccountItems.h" #include "src/cpp/ripple/AccountSetTransactor.h" #include "src/cpp/ripple/AccountState.h" -#include "src/cpp/ripple/ripple_ProofOfWork.h" -#include "src/cpp/ripple/ripple_IProofOfWorkFactory.h" -#include "src/cpp/ripple/Peer.h" #include "src/cpp/ripple/Application.h" #include "src/cpp/ripple/AutoSocket.h" #include "src/cpp/ripple/CallRPC.h" @@ -255,11 +255,8 @@ static DH* handleTmpDh(SSL* ssl, int is_export, int iKeyLength) #include "src/cpp/ripple/ParseSection.cpp" #include "src/cpp/ripple/Pathfinder.cpp" #include "src/cpp/ripple/PaymentTransactor.cpp" -#include "src/cpp/ripple/Peer.cpp" #include "src/cpp/ripple/PeerDoor.cpp" #include "src/cpp/ripple/PFRequest.cpp" -#include "src/cpp/ripple/ripple_ProofOfWork.cpp" -#include "src/cpp/ripple/ripple_ProofOfWorkFactory.cpp" #include "src/cpp/ripple/RegularKeySetTransactor.cpp" #include "src/cpp/ripple/RippleCalc.cpp" #include "src/cpp/ripple/RippleState.cpp" // no log @@ -307,6 +304,9 @@ static DH* handleTmpDh(SSL* ssl, int is_export, int iKeyLength) #include "src/cpp/ripple/ripple_LoadMonitor.cpp" #include "src/cpp/ripple/ripple_LogWebsockets.cpp" #include "src/cpp/ripple/ripple_LoadFeeTrack.cpp" +#include "src/cpp/ripple/ripple_Peer.cpp" +#include "src/cpp/ripple/ripple_ProofOfWork.cpp" +#include "src/cpp/ripple/ripple_ProofOfWorkFactory.cpp" #include "src/cpp/ripple/ripple_Validations.cpp" #include "src/cpp/ripple/ripple_UniqueNodeList.cpp" diff --git a/newcoin.vcxproj b/newcoin.vcxproj index 140a2170e..7073dfb2e 100644 --- a/newcoin.vcxproj +++ b/newcoin.vcxproj @@ -954,12 +954,6 @@ true true - - true - true - true - true - true true @@ -1024,6 +1018,12 @@ true true + + true + true + true + true + true true @@ -1677,7 +1677,6 @@ - @@ -1690,6 +1689,7 @@ + diff --git a/newcoin.vcxproj.filters b/newcoin.vcxproj.filters index 59c56148f..38d612d8b 100644 --- a/newcoin.vcxproj.filters +++ b/newcoin.vcxproj.filters @@ -606,9 +606,6 @@ 1. Modules\ripple_main\_unfactored\network - - 1. Modules\ripple_main\_unfactored\network - 1. Modules\ripple_main\_unfactored\network @@ -813,6 +810,9 @@ 1. Modules\ripple_main\refactored + + 1. Modules\ripple_main\refactored + @@ -1316,9 +1316,6 @@ 1. Modules\ripple_main\_unfactored\network - - 1. Modules\ripple_main\_unfactored\network - 1. Modules\ripple_main\_unfactored\network @@ -1511,6 +1508,9 @@ 1. Modules\ripple_main\refactored + + 1. Modules\ripple_main\refactored + diff --git a/src/cpp/ripple/ConnectionPool.cpp b/src/cpp/ripple/ConnectionPool.cpp index 38da9be9b..5ccc49157 100644 --- a/src/cpp/ripple/ConnectionPool.cpp +++ b/src/cpp/ripple/ConnectionPool.cpp @@ -309,8 +309,10 @@ Peer::pointer ConnectionPool::peerConnect(const std::string& strIp, int iPort) boost::recursive_mutex::scoped_lock sl(mPeerLock); if (mIpMap.find(pipPeer) == mIpMap.end()) { - ppResult = Peer::create(theApp->getIOService(), theApp->getPeerDoor().getSSLContext(), - ++mLastPeer, false); + ppResult = Peer::New (theApp->getIOService(), + theApp->getPeerDoor().getSSLContext(), + ++mLastPeer, + false); mIpMap[pipPeer] = ppResult; // ++miConnectStarting; diff --git a/src/cpp/ripple/Peer.h b/src/cpp/ripple/Peer.h index 53995ca99..28d5fb5a0 100644 --- a/src/cpp/ripple/Peer.h +++ b/src/cpp/ripple/Peer.h @@ -1,23 +1,9 @@ -#ifndef __PEER__ -#define __PEER__ +#ifndef RIPPLE_PEER_H +#define RIPPLE_PEER_H -#include -#include -#include -#include -#include -#include - -#include "Ledger.h" -#include "Transaction.h" -#include "LoadManager.h" - -typedef std::pair ipPort; - -DEFINE_INSTANCE(Peer); +typedef std::pair ipPort; class Peer : public boost::enable_shared_from_this - , public IS_INSTANCE (Peer) { public: typedef boost::shared_ptr pointer; @@ -32,141 +18,66 @@ public: static int const psbDownLevel = 6; public: - //bool operator == (const Peer& other); + static pointer New (boost::asio::io_service& io_service, + boost::asio::ssl::context& ctx, + uint64 id, + bool inbound); - void handleConnect (const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it); + // VFALCO: TODO see if this and below can be private + virtual void handleConnect (const boost::system::error_code& error, + boost::asio::ip::tcp::resolver::iterator it) = 0; - std::string& getIP() { return mIpPort.first; } - std::string getDisplayName() { return mCluster ? mNodeName : mIpPort.first; } - int getPort() { return mIpPort.second; } + virtual std::string& getIP () = 0; + + virtual std::string getDisplayName() = 0; + + virtual int getPort () = 0; - void setIpPort(const std::string& strIP, int iPort); + virtual void setIpPort (const std::string& strIP, int iPort) = 0; - static pointer create(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 id, bool inbound) - { - return pointer(new Peer(io_service, ctx, id, inbound)); - } + virtual boost::asio::ssl::stream::lowest_layer_type& getSocket() = 0; - boost::asio::ssl::stream::lowest_layer_type& getSocket() - { - return mSocketSsl.lowest_layer(); - } + virtual void connect (const std::string& strIp, int iPort) = 0; + + virtual void connected (const boost::system::error_code& error) = 0; + + virtual void detach (const char *, bool onIOStrand) = 0; + + //virtual bool samePeer (Peer::ref p) = 0; + //virtual bool samePeer (const Peer& p) = 0; - void connect(const std::string& strIp, int iPort); - void connected(const boost::system::error_code& error); - void detach(const char *, bool onIOStrand); - bool samePeer(Peer::ref p) { return samePeer(*p); } - bool samePeer(const Peer& p) { return this == &p; } + virtual void sendPacket (const PackedMessage::pointer& packet, bool onStrand) = 0; + + virtual void sendGetPeers () = 0; - void sendPacket(const PackedMessage::pointer& packet, bool onStrand); - void sendLedgerProposal(Ledger::ref ledger); - void sendFullLedger(Ledger::ref ledger); - void sendGetFullLedger(uint256& hash); - void sendGetPeers(); - - void punishPeer(LoadType); + virtual void punishPeer (LoadType) = 0; // VFALCO: NOTE, what's with this odd parameter passing? Why the static member? - static void punishPeer(const boost::weak_ptr&, LoadType); + static void punishPeer (const boost::weak_ptr&, LoadType); - Json::Value getJson(); - bool isConnected() const { return mHelloed && !mDetaching; } - bool isInbound() const { return mInbound; } - bool isOutbound() const { return !mInbound; } + virtual Json::Value getJson () = 0; - const uint256& getClosedLedgerHash() const { return mClosedLedgerHash; } - bool hasLedger(const uint256& hash, uint32 seq) const; - bool hasTxSet(const uint256& hash) const; - uint64 getPeerId() const { return mPeerId; } + virtual bool isConnected () const = 0; + + virtual bool isInbound () const = 0; + + virtual bool isOutbound () const = 0; - const RippleAddress& getNodePublic() const { return mNodePublic; } - void cycleStatus() { mPreviousLedgerHash = mClosedLedgerHash; mClosedLedgerHash.zero(); } - bool hasProto(int version); - bool hasRange(uint32 uMin, uint32 uMax) { return (uMin >= mMinLedger) && (uMax <= mMaxLedger); } + virtual const uint256& getClosedLedgerHash () const = 0; -private: - bool mInbound; // Connection is inbound - bool mClientConnect; // In process of connecting as client. - bool mHelloed; // True, if hello accepted. - bool mDetaching; // True, if detaching. - int mActive; // 0=idle, 1=pingsent, 2=active - bool mCluster; // Node in our cluster - RippleAddress mNodePublic; // Node public key of peer. - std::string mNodeName; - ipPort mIpPort; - ipPort mIpPortConnect; - uint256 mCookieHash; - uint64 mPeerId; - bool mPrivate; // Keep peer IP private. - LoadSource mLoad; - uint32 mMinLedger, mMaxLedger; + virtual bool hasLedger (const uint256& hash, uint32 seq) const = 0; + + virtual bool hasTxSet (const uint256& hash) const = 0; + + virtual uint64 getPeerId () const = 0; - uint256 mClosedLedgerHash; - uint256 mPreviousLedgerHash; - std::list mRecentLedgers; - std::list mRecentTxSets; - - boost::asio::ssl::stream mSocketSsl; - - boost::asio::deadline_timer mActivityTimer; - - void handleStart(const boost::system::error_code& ecResult); - void handleVerifyTimer(const boost::system::error_code& ecResult); - void handlePingTimer(const boost::system::error_code& ecResult); - -private: - boost::asio::io_service::strand mIOStrand; - std::vector mReadbuf; - std::list mSendQ; - PackedMessage::pointer mSendingPacket; - ripple::TMStatusChange mLastStatus; - ripple::TMHello mHello; - - Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 peerId, bool inbound); - - void handleShutdown(const boost::system::error_code& error) { ; } - void handleWrite(const boost::system::error_code& error, size_t bytes_transferred); - void handleReadHeader(const boost::system::error_code& error); - void handleReadBody(const boost::system::error_code& error); - - void processReadBuffer(); - void startReadHeader(); - void startReadBody(unsigned msg_len); - - void sendPacketForce(const PackedMessage::pointer& packet); - - void sendHello(); - - void recvHello(ripple::TMHello& packet); - void recvTransaction(ripple::TMTransaction& packet, ScopedLock& MasterLockHolder); - void recvValidation(const boost::shared_ptr& packet, ScopedLock& MasterLockHolder); - void recvGetValidation(ripple::TMGetValidations& packet); - void recvContact(ripple::TMContact& packet); - void recvGetContacts(ripple::TMGetContacts& packet); - void recvGetPeers(ripple::TMGetPeers& packet, ScopedLock& MasterLockHolder); - void recvPeers(ripple::TMPeers& packet); - void recvGetObjectByHash(const boost::shared_ptr& packet); - void recvPing(ripple::TMPing& packet); - void recvErrorMessage(ripple::TMErrorMsg& packet); - void recvSearchTransaction(ripple::TMSearchTransaction& packet); - void recvGetAccount(ripple::TMGetAccount& packet); - void recvAccount(ripple::TMAccount& packet); - void recvGetLedger(ripple::TMGetLedger& packet, ScopedLock& MasterLockHolder); - void recvLedger(const boost::shared_ptr& packet, ScopedLock& MasterLockHolder); - void recvStatus(ripple::TMStatusChange& packet); - void recvPropose(const boost::shared_ptr& packet); - void recvHaveTxSet(ripple::TMHaveTransactionSet& packet); - void recvProofWork(ripple::TMProofWork& packet); - - void getSessionCookie(std::string& strDst); - - void addLedger(const uint256& ledger); - void addTxSet(const uint256& TxSet); - - void doFetchPack(const boost::shared_ptr& packet); - - // VFALCO: NOTE, why is this a static member instead of a regular member? - static void doProofOfWork(Job&, boost::weak_ptr, ProofOfWork::pointer); + virtual const RippleAddress& getNodePublic () const = 0; + + virtual void cycleStatus () = 0; + + virtual bool hasProto (int version) = 0; + + virtual bool hasRange (uint32 uMin, uint32 uMax) = 0; }; #endif diff --git a/src/cpp/ripple/PeerDoor.cpp b/src/cpp/ripple/PeerDoor.cpp index 1612ee495..e34278017 100644 --- a/src/cpp/ripple/PeerDoor.cpp +++ b/src/cpp/ripple/PeerDoor.cpp @@ -38,8 +38,11 @@ PeerDoor::PeerDoor(boost::asio::io_service& io_service) : void PeerDoor::startListening() { - Peer::pointer new_connection = Peer::create(mAcceptor.get_io_service(), mCtx, - theApp->getConnectionPool().assignPeerId(), true); + Peer::pointer new_connection = Peer::New ( + mAcceptor.get_io_service(), + mCtx, + theApp->getConnectionPool().assignPeerId(), + true); mAcceptor.async_accept(new_connection->getSocket(), boost::bind(&PeerDoor::handleConnect, this, new_connection, diff --git a/src/cpp/ripple/RPCHandler.cpp b/src/cpp/ripple/RPCHandler.cpp index bdb891947..4ccdf927b 100644 --- a/src/cpp/ripple/RPCHandler.cpp +++ b/src/cpp/ripple/RPCHandler.cpp @@ -888,7 +888,8 @@ Json::Value RPCHandler::doProofCreate(Json::Value jvRequest, int& cost, ScopedLo if (jvRequest.isMember("difficulty") || jvRequest.isMember("secret")) { - ProofOfWorkFactory pgGen; + // VFALCO: TODO, why aren't we using the app's factory? + beast::ScopedPointer pgGen (IProofOfWorkFactory::New ()); if (jvRequest.isMember("difficulty")) { @@ -900,18 +901,20 @@ Json::Value RPCHandler::doProofCreate(Json::Value jvRequest, int& cost, ScopedLo if (iDifficulty < 0 || iDifficulty > ProofOfWork::sMaxDifficulty) return rpcError(rpcINVALID_PARAMS); - pgGen.setDifficulty(iDifficulty); + pgGen->setDifficulty(iDifficulty); } if (jvRequest.isMember("secret")) { uint256 uSecret(jvRequest["secret"].asString()); - pgGen.setSecret(uSecret); + pgGen->setSecret(uSecret); } - jvResult["token"] = pgGen.getProof().getToken(); - jvResult["secret"] = pgGen.getSecret().GetHex(); - } else { + jvResult["token"] = pgGen->getProof().getToken(); + jvResult["secret"] = pgGen->getSecret().GetHex(); + } + else + { jvResult["token"] = theApp->getProofOfWorkFactory().getProof().getToken(); } @@ -970,7 +973,8 @@ Json::Value RPCHandler::doProofVerify(Json::Value jvRequest, int& cost, ScopedLo POWResult prResult; if (jvRequest.isMember("difficulty") || jvRequest.isMember("secret")) { - ProofOfWorkFactory pgGen; + // VFALCO: TODO, why aren't we using the app's factory? + beast::ScopedPointer pgGen (IProofOfWorkFactory::New ()); if (jvRequest.isMember("difficulty")) { @@ -982,18 +986,18 @@ Json::Value RPCHandler::doProofVerify(Json::Value jvRequest, int& cost, ScopedLo if (iDifficulty < 0 || iDifficulty > ProofOfWork::sMaxDifficulty) return rpcError(rpcINVALID_PARAMS); - pgGen.setDifficulty(iDifficulty); + pgGen->setDifficulty(iDifficulty); } if (jvRequest.isMember("secret")) { uint256 uSecret(jvRequest["secret"].asString()); - pgGen.setSecret(uSecret); + pgGen->setSecret(uSecret); } - prResult = pgGen.checkProof(strToken, uSolution); + prResult = pgGen->checkProof(strToken, uSolution); - jvResult["secret"] = pgGen.getSecret().GetHex(); + jvResult["secret"] = pgGen->getSecret().GetHex(); } else { diff --git a/src/cpp/ripple/Peer.cpp b/src/cpp/ripple/ripple_Peer.cpp similarity index 80% rename from src/cpp/ripple/Peer.cpp rename to src/cpp/ripple/ripple_Peer.cpp index 35bbefee5..5357014ac 100644 --- a/src/cpp/ripple/Peer.cpp +++ b/src/cpp/ripple/ripple_Peer.cpp @@ -1,20 +1,13 @@ -#include - -#include -#include -#include -#include - -#include "Version.h" -#include "Application.h" -#include "SerializedTransaction.h" - // VFALCO: TODO, make this an inline function #define ADDRESS(p) strHex(uint64( ((char*) p) - ((char*) 0))) SETUP_LOG (Peer) -DECLARE_INSTANCE(Peer); +class PeerImp; + +DEFINE_INSTANCE (PeerImp); + +DECLARE_INSTANCE(PeerImp); // Don't try to run past receiving nonsense from a peer #define TRUST_NETWORK @@ -25,7 +18,141 @@ DECLARE_INSTANCE(Peer); // Idle nodes are probed this often #define NODE_IDLE_SECONDS 120 -Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 peerID, bool inbound) : +class PeerImp : public Peer + , public IS_INSTANCE (PeerImp) +{ +public: + PeerImp (boost::asio::io_service& io_service, + boost::asio::ssl::context& ctx, + uint64 peerId, + bool inbound); + + void handleConnect (const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it); + + std::string& getIP() { return mIpPort.first; } + std::string getDisplayName() { return mCluster ? mNodeName : mIpPort.first; } + int getPort() { return mIpPort.second; } + + void setIpPort(const std::string& strIP, int iPort); + + boost::asio::ssl::stream::lowest_layer_type& getSocket() + { + return mSocketSsl.lowest_layer(); + } + + void connect(const std::string& strIp, int iPort); + void connected(const boost::system::error_code& error); + void detach(const char *, bool onIOStrand); + + // VFALCO: Seems no one is using these + //bool samePeer (Peer::ref p) { return samePeer(*p); } + //bool samePeer (const Peer& p) { return this == &p; } + + void sendPacket(const PackedMessage::pointer& packet, bool onStrand); + + void sendGetPeers(); + + void punishPeer(LoadType); + + Json::Value getJson(); + bool isConnected() const { return mHelloed && !mDetaching; } + bool isInbound() const { return mInbound; } + bool isOutbound() const { return !mInbound; } + + const uint256& getClosedLedgerHash() const { return mClosedLedgerHash; } + bool hasLedger(const uint256& hash, uint32 seq) const; + bool hasTxSet(const uint256& hash) const; + uint64 getPeerId() const { return mPeerId; } + + const RippleAddress& getNodePublic() const { return mNodePublic; } + void cycleStatus() { mPreviousLedgerHash = mClosedLedgerHash; mClosedLedgerHash.zero(); } + bool hasProto(int version); + bool hasRange(uint32 uMin, uint32 uMax) { return (uMin >= mMinLedger) && (uMax <= mMaxLedger); } + +private: + bool mInbound; // Connection is inbound + bool mClientConnect; // In process of connecting as client. + bool mHelloed; // True, if hello accepted. + bool mDetaching; // True, if detaching. + int mActive; // 0=idle, 1=pingsent, 2=active + bool mCluster; // Node in our cluster + RippleAddress mNodePublic; // Node public key of peer. + std::string mNodeName; + ipPort mIpPort; + ipPort mIpPortConnect; + uint256 mCookieHash; + uint64 mPeerId; + bool mPrivate; // Keep peer IP private. + LoadSource mLoad; + uint32 mMinLedger, mMaxLedger; + + uint256 mClosedLedgerHash; + uint256 mPreviousLedgerHash; + std::list mRecentLedgers; + std::list mRecentTxSets; + + boost::asio::ssl::stream mSocketSsl; + + boost::asio::deadline_timer mActivityTimer; + + boost::asio::io_service::strand mIOStrand; + std::vector mReadbuf; + std::list mSendQ; + PackedMessage::pointer mSendingPacket; + ripple::TMStatusChange mLastStatus; + ripple::TMHello mHello; + +private: + void handleShutdown(const boost::system::error_code& error) { ; } + void handleWrite(const boost::system::error_code& error, size_t bytes_transferred); + void handleReadHeader(const boost::system::error_code& error); + void handleReadBody(const boost::system::error_code& error); + + void handleStart(const boost::system::error_code& ecResult); + void handleVerifyTimer(const boost::system::error_code& ecResult); + void handlePingTimer(const boost::system::error_code& ecResult); + + void processReadBuffer(); + void startReadHeader(); + void startReadBody(unsigned msg_len); + + void sendPacketForce(const PackedMessage::pointer& packet); + + void sendHello(); + + void recvHello(ripple::TMHello& packet); + void recvTransaction(ripple::TMTransaction& packet, ScopedLock& MasterLockHolder); + void recvValidation(const boost::shared_ptr& packet, ScopedLock& MasterLockHolder); + void recvGetValidation(ripple::TMGetValidations& packet); + void recvContact(ripple::TMContact& packet); + void recvGetContacts(ripple::TMGetContacts& packet); + void recvGetPeers(ripple::TMGetPeers& packet, ScopedLock& MasterLockHolder); + void recvPeers(ripple::TMPeers& packet); + void recvGetObjectByHash(const boost::shared_ptr& packet); + void recvPing(ripple::TMPing& packet); + void recvErrorMessage(ripple::TMErrorMsg& packet); + void recvSearchTransaction(ripple::TMSearchTransaction& packet); + void recvGetAccount(ripple::TMGetAccount& packet); + void recvAccount(ripple::TMAccount& packet); + void recvGetLedger(ripple::TMGetLedger& packet, ScopedLock& MasterLockHolder); + void recvLedger(const boost::shared_ptr& packet, ScopedLock& MasterLockHolder); + void recvStatus(ripple::TMStatusChange& packet); + void recvPropose(const boost::shared_ptr& packet); + void recvHaveTxSet(ripple::TMHaveTransactionSet& packet); + void recvProofWork(ripple::TMProofWork& packet); + + void getSessionCookie(std::string& strDst); + + void addLedger(const uint256& ledger); + void addTxSet(const uint256& TxSet); + + void doFetchPack(const boost::shared_ptr& packet); + + // VFALCO: NOTE, why is this a static member instead of a regular member? + static void doProofOfWork (Job&, boost::weak_ptr , ProofOfWork::pointer); +}; + +PeerImp::PeerImp (boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 peerID, bool inbound) : mInbound(inbound), mHelloed(false), mDetaching(false), @@ -43,11 +170,11 @@ Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, WriteLog (lsDEBUG, Peer) << "CREATING PEER: " << ADDRESS(this); } -void Peer::handleWrite(const boost::system::error_code& error, size_t bytes_transferred) +void PeerImp::handleWrite(const boost::system::error_code& error, size_t bytes_transferred) { // Call on IO strand #ifdef DEBUG // if (!error) -// std::cerr << "Peer::handleWrite bytes: "<< bytes_transferred << std::endl; +// std::cerr << "PeerImp::handleWrite bytes: "<< bytes_transferred << std::endl; #endif mSendingPacket.reset(); @@ -75,7 +202,7 @@ void Peer::handleWrite(const boost::system::error_code& error, size_t bytes_tran } } -void Peer::setIpPort(const std::string& strIP, int iPort) +void PeerImp::setIpPort(const std::string& strIP, int iPort) { mIpPort = make_pair(strIP, iPort); mLoad.rename(strIP); @@ -85,11 +212,11 @@ void Peer::setIpPort(const std::string& strIP, int iPort) << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort(); } -void Peer::detach(const char *rsn, bool onIOStrand) +void PeerImp::detach(const char *rsn, bool onIOStrand) { if (!onIOStrand) { - mIOStrand.post(boost::bind(&Peer::detach, shared_from_this(), rsn, true)); + mIOStrand.post(boost::bind (&Peer::detach, shared_from_this(), rsn, true)); return; } if (!mDetaching) @@ -107,7 +234,8 @@ void Peer::detach(const char *rsn, bool onIOStrand) mSendQ.clear(); (void) mActivityTimer.cancel(); - mSocketSsl.async_shutdown(mIOStrand.wrap(boost::bind(&Peer::handleShutdown, shared_from_this(), + mSocketSsl.async_shutdown (mIOStrand.wrap(boost::bind + (&PeerImp::handleShutdown, boost::static_pointer_cast (shared_from_this()), boost::asio::placeholders::error))); if (mNodePublic.isValid()) @@ -134,7 +262,7 @@ void Peer::detach(const char *rsn, bool onIOStrand) } } -void Peer::handlePingTimer(const boost::system::error_code& ecResult) +void PeerImp::handlePingTimer(const boost::system::error_code& ecResult) { // called on IO strand if (ecResult || mDetaching) return; @@ -156,12 +284,14 @@ void Peer::handlePingTimer(const boost::system::error_code& ecResult) mActive = 0; mActivityTimer.expires_from_now(boost::posix_time::seconds(NODE_IDLE_SECONDS)); - mActivityTimer.async_wait(mIOStrand.wrap(boost::bind(&Peer::handlePingTimer, shared_from_this(), + mActivityTimer.async_wait (mIOStrand.wrap (boost::bind ( + &PeerImp::handlePingTimer, + boost::static_pointer_cast (shared_from_this()), boost::asio::placeholders::error))); } -void Peer::handleVerifyTimer(const boost::system::error_code& ecResult) +void PeerImp::handleVerifyTimer(const boost::system::error_code& ecResult) { if (ecResult == boost::asio::error::operation_aborted) { @@ -184,7 +314,7 @@ void Peer::handleVerifyTimer(const boost::system::error_code& ecResult) // Begin trying to connect. We are not connected till we know and accept peer's public key. // Only takes IP addresses (not domains). -void Peer::connect(const std::string& strIp, int iPort) +void PeerImp::connect(const std::string& strIp, int iPort) { int iPortAct = (iPort <= 0) ? SYSTEM_PEER_PORT : iPort; @@ -209,7 +339,10 @@ void Peer::connect(const std::string& strIp, int iPort) else { mActivityTimer.expires_from_now(boost::posix_time::seconds(NODE_VERIFY_SECONDS), err); - mActivityTimer.async_wait(mIOStrand.wrap(boost::bind(&Peer::handleVerifyTimer, shared_from_this(), + + mActivityTimer.async_wait (mIOStrand.wrap (boost::bind ( + &PeerImp::handleVerifyTimer, + boost::static_pointer_cast (shared_from_this()), boost::asio::placeholders::error))); if (err) @@ -228,8 +361,8 @@ void Peer::connect(const std::string& strIp, int iPort) getSocket(), itrEndpoint, mIOStrand.wrap(boost::bind( - &Peer::handleConnect, - shared_from_this(), + &PeerImp::handleConnect, + boost::static_pointer_cast (shared_from_this ()), boost::asio::placeholders::error, boost::asio::placeholders::iterator))); } @@ -239,7 +372,7 @@ void Peer::connect(const std::string& strIp, int iPort) // Have it say who it is so we know to avoid redundant connections. // Establish that it really who we are talking to by having it sign a connection detail. // Also need to establish no man in the middle attack is in progress. -void Peer::handleStart(const boost::system::error_code& error) +void PeerImp::handleStart(const boost::system::error_code& error) { if (error) { @@ -254,7 +387,7 @@ void Peer::handleStart(const boost::system::error_code& error) } // Connect ssl as client. -void Peer::handleConnect(const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it) +void PeerImp::handleConnect(const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it) { if (error) { @@ -267,14 +400,17 @@ void Peer::handleConnect(const boost::system::error_code& error, boost::asio::ip mSocketSsl.set_verify_mode(boost::asio::ssl::verify_none); - mSocketSsl.async_handshake(boost::asio::ssl::stream::client, - mIOStrand.wrap(boost::bind(&Peer::handleStart, shared_from_this(), boost::asio::placeholders::error))); + mSocketSsl.async_handshake (boost::asio::ssl::stream ::client, + mIOStrand.wrap (boost::bind ( + &PeerImp::handleStart, + boost::static_pointer_cast (shared_from_this()), + boost::asio::placeholders::error))); } } // Connect ssl as server to an inbound connection. // - We don't bother remembering the inbound IP or port. Only useful for debugging. -void Peer::connected(const boost::system::error_code& error) +void PeerImp::connected(const boost::system::error_code& error) { boost::asio::ip::tcp::endpoint ep = getSocket().remote_endpoint(); int iPort = ep.port(); @@ -296,7 +432,7 @@ void Peer::connected(const boost::system::error_code& error) mSocketSsl.set_verify_mode(boost::asio::ssl::verify_none); mSocketSsl.async_handshake(boost::asio::ssl::stream::server, - mIOStrand.wrap(boost::bind(&Peer::handleStart, shared_from_this(), boost::asio::placeholders::error))); + mIOStrand.wrap(boost::bind(&PeerImp::handleStart, boost::static_pointer_cast (shared_from_this()), boost::asio::placeholders::error))); } else if (!mDetaching) { @@ -306,26 +442,27 @@ void Peer::connected(const boost::system::error_code& error) } } -void Peer::sendPacketForce(const PackedMessage::pointer& packet) +void PeerImp::sendPacketForce(const PackedMessage::pointer& packet) { // must be on IO strand if (!mDetaching) { mSendingPacket = packet; boost::asio::async_write(mSocketSsl, boost::asio::buffer(packet->getBuffer()), - mIOStrand.wrap(boost::bind(&Peer::handleWrite, shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred))); + mIOStrand.wrap (boost::bind(&PeerImp::handleWrite, + boost::static_pointer_cast (shared_from_this()), + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred))); } } -void Peer::sendPacket(const PackedMessage::pointer& packet, bool onStrand) +void PeerImp::sendPacket(const PackedMessage::pointer& packet, bool onStrand) { if (packet) { if (!onStrand) { - mIOStrand.post(boost::bind(&Peer::sendPacket, shared_from_this(), packet, true)); + mIOStrand.post(boost::bind (&Peer::sendPacket, shared_from_this(), packet, true)); return; } if (mSendingPacket) @@ -339,19 +476,22 @@ void Peer::sendPacket(const PackedMessage::pointer& packet, bool onStrand) } } -void Peer::startReadHeader() +void PeerImp::startReadHeader() { if (!mDetaching) { mReadbuf.clear(); mReadbuf.resize(PackedMessage::kHeaderBytes); - boost::asio::async_read(mSocketSsl, boost::asio::buffer(mReadbuf), mIOStrand.wrap( - boost::bind(&Peer::handleReadHeader, shared_from_this(), boost::asio::placeholders::error))); + boost::asio::async_read(mSocketSsl, + boost::asio::buffer(mReadbuf), + mIOStrand.wrap (boost::bind (&PeerImp::handleReadHeader, + boost::static_pointer_cast (shared_from_this()), + boost::asio::placeholders::error))); } } -void Peer::startReadBody(unsigned msg_len) +void PeerImp::startReadBody(unsigned msg_len) { // m_readbuf already contains the header in its first PackedMessage::kHeaderBytes // bytes. Expand it to fit in the body as well, and start async @@ -361,12 +501,15 @@ void Peer::startReadBody(unsigned msg_len) { mReadbuf.resize(PackedMessage::kHeaderBytes + msg_len); - boost::asio::async_read(mSocketSsl, boost::asio::buffer(&mReadbuf[PackedMessage::kHeaderBytes], msg_len), - mIOStrand.wrap(boost::bind(&Peer::handleReadBody, shared_from_this(), boost::asio::placeholders::error))); + boost::asio::async_read (mSocketSsl, + boost::asio::buffer (&mReadbuf [PackedMessage::kHeaderBytes], msg_len), + mIOStrand.wrap (boost::bind (&PeerImp::handleReadBody, + boost::static_pointer_cast (shared_from_this()), + boost::asio::placeholders::error))); } } -void Peer::handleReadHeader(const boost::system::error_code& error) +void PeerImp::handleReadHeader(const boost::system::error_code& error) { if (mDetaching) { @@ -399,7 +542,7 @@ void Peer::handleReadHeader(const boost::system::error_code& error) } } -void Peer::handleReadBody(const boost::system::error_code& error) +void PeerImp::handleReadBody(const boost::system::error_code& error) { if (mDetaching) { @@ -425,16 +568,16 @@ void Peer::handleReadBody(const boost::system::error_code& error) startReadHeader(); } -void Peer::processReadBuffer() +void PeerImp::processReadBuffer() { // must not hold peer lock int type = PackedMessage::getType(mReadbuf); #ifdef DEBUG // std::cerr << "PRB(" << type << "), len=" << (mReadbuf.size()-PackedMessage::kHeaderBytes) << std::endl; #endif -// std::cerr << "Peer::processReadBuffer: " << mIpPort.first << " " << mIpPort.second << std::endl; +// std::cerr << "PeerImp::processReadBuffer: " << mIpPort.first << " " << mIpPort.second << std::endl; - LoadEvent::autoptr event(theApp->getJobQueue().getLoadEventAP(jtPEER, "Peer::read")); + LoadEvent::autoptr event(theApp->getJobQueue().getLoadEventAP(jtPEER, "PeerImp::read")); ScopedLock sl(theApp->getMasterLock()); @@ -450,7 +593,7 @@ void Peer::processReadBuffer() { case ripple::mtHELLO: { - event->reName("Peer::hello"); + event->reName("PeerImp::hello"); ripple::TMHello msg; if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvHello(msg); @@ -461,7 +604,7 @@ void Peer::processReadBuffer() case ripple::mtERROR_MSG: { - event->reName("Peer::errormessage"); + event->reName("PeerImp::errormessage"); ripple::TMErrorMsg msg; if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvErrorMessage(msg); @@ -472,7 +615,7 @@ void Peer::processReadBuffer() case ripple::mtPING: { - event->reName("Peer::ping"); + event->reName("PeerImp::ping"); ripple::TMPing msg; if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvPing(msg); @@ -483,7 +626,7 @@ void Peer::processReadBuffer() case ripple::mtGET_CONTACTS: { - event->reName("Peer::getcontacts"); + event->reName("PeerImp::getcontacts"); ripple::TMGetContacts msg; if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvGetContacts(msg); @@ -494,7 +637,7 @@ void Peer::processReadBuffer() case ripple::mtCONTACT: { - event->reName("Peer::contact"); + event->reName("PeerImp::contact"); ripple::TMContact msg; if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) @@ -506,7 +649,7 @@ void Peer::processReadBuffer() case ripple::mtGET_PEERS: { - event->reName("Peer::getpeers"); + event->reName("PeerImp::getpeers"); ripple::TMGetPeers msg; if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) @@ -518,7 +661,7 @@ void Peer::processReadBuffer() case ripple::mtPEERS: { - event->reName("Peer::peers"); + event->reName("PeerImp::peers"); ripple::TMPeers msg; if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) @@ -530,7 +673,7 @@ void Peer::processReadBuffer() case ripple::mtSEARCH_TRANSACTION: { - event->reName("Peer::searchtransaction"); + event->reName("PeerImp::searchtransaction"); ripple::TMSearchTransaction msg; if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvSearchTransaction(msg); @@ -541,7 +684,7 @@ void Peer::processReadBuffer() case ripple::mtGET_ACCOUNT: { - event->reName("Peer::getaccount"); + event->reName("PeerImp::getaccount"); ripple::TMGetAccount msg; if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvGetAccount(msg); @@ -552,7 +695,7 @@ void Peer::processReadBuffer() case ripple::mtACCOUNT: { - event->reName("Peer::account"); + event->reName("PeerImp::account"); ripple::TMAccount msg; if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvAccount(msg); @@ -563,7 +706,7 @@ void Peer::processReadBuffer() case ripple::mtTRANSACTION: { - event->reName("Peer::transaction"); + event->reName("PeerImp::transaction"); ripple::TMTransaction msg; if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvTransaction(msg, sl); @@ -574,7 +717,7 @@ void Peer::processReadBuffer() case ripple::mtSTATUS_CHANGE: { - event->reName("Peer::statuschange"); + event->reName("PeerImp::statuschange"); ripple::TMStatusChange msg; if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvStatus(msg); @@ -585,7 +728,7 @@ void Peer::processReadBuffer() case ripple::mtPROPOSE_LEDGER: { - event->reName("Peer::propose"); + event->reName("PeerImp::propose"); boost::shared_ptr msg = boost::make_shared(); if (msg->ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvPropose(msg); @@ -596,7 +739,7 @@ void Peer::processReadBuffer() case ripple::mtGET_LEDGER: { - event->reName("Peer::getledger"); + event->reName("PeerImp::getledger"); ripple::TMGetLedger msg; if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvGetLedger(msg, sl); @@ -607,7 +750,7 @@ void Peer::processReadBuffer() case ripple::mtLEDGER_DATA: { - event->reName("Peer::ledgerdata"); + event->reName("PeerImp::ledgerdata"); boost::shared_ptr msg = boost::make_shared(); if (msg->ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvLedger(msg, sl); @@ -618,7 +761,7 @@ void Peer::processReadBuffer() case ripple::mtHAVE_SET: { - event->reName("Peer::haveset"); + event->reName("PeerImp::haveset"); ripple::TMHaveTransactionSet msg; if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvHaveTxSet(msg); @@ -629,7 +772,7 @@ void Peer::processReadBuffer() case ripple::mtVALIDATION: { - event->reName("Peer::validation"); + event->reName("PeerImp::validation"); boost::shared_ptr msg = boost::make_shared(); if (msg->ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvValidation(msg, sl); @@ -651,7 +794,7 @@ void Peer::processReadBuffer() #endif case ripple::mtGET_OBJECTS: { - event->reName("Peer::getobjects"); + event->reName("PeerImp::getobjects"); boost::shared_ptr msg = boost::make_shared(); if (msg->ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvGetObjectByHash(msg); @@ -662,7 +805,7 @@ void Peer::processReadBuffer() case ripple::mtPROOFOFWORK: { - event->reName("Peer::proofofwork"); + event->reName("PeerImp::proofofwork"); ripple::TMProofWork msg; if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvProofWork(msg); @@ -673,27 +816,20 @@ void Peer::processReadBuffer() default: - event->reName("Peer::unknown"); + event->reName("PeerImp::unknown"); WriteLog (lsWARNING, Peer) << "Unknown Msg: " << type; WriteLog (lsWARNING, Peer) << strHex(&mReadbuf[0], mReadbuf.size()); } } } -void Peer::punishPeer(const boost::weak_ptr& wp, LoadType l) -{ - Peer::pointer p = wp.lock(); - if (p) - p->punishPeer(l); -} - -void Peer::recvHello(ripple::TMHello& packet) +void PeerImp::recvHello(ripple::TMHello& packet) { bool bDetach = true; (void) mActivityTimer.cancel(); mActivityTimer.expires_from_now(boost::posix_time::seconds(NODE_IDLE_SECONDS)); - mActivityTimer.async_wait(mIOStrand.wrap(boost::bind(&Peer::handlePingTimer, shared_from_this(), + mActivityTimer.async_wait(mIOStrand.wrap(boost::bind(&PeerImp::handlePingTimer, boost::static_pointer_cast (shared_from_this()), boost::asio::placeholders::error))); uint32 ourTime = theApp->getOPs().getNetworkTimeNC(); @@ -862,7 +998,7 @@ static void checkTransaction(Job&, int flags, SerializedTransaction::pointer stx #endif } -void Peer::recvTransaction(ripple::TMTransaction& packet, ScopedLock& MasterLockHolder) +void PeerImp::recvTransaction(ripple::TMTransaction& packet, ScopedLock& MasterLockHolder) { MasterLockHolder.unlock(); Transaction::pointer tx; @@ -909,9 +1045,10 @@ void Peer::recvTransaction(ripple::TMTransaction& packet, ScopedLock& MasterLock } +// Called from our JobQueue static void checkPropose(Job& job, boost::shared_ptr packet, LedgerProposal::pointer proposal, uint256 consensusLCL, RippleAddress nodePublic, boost::weak_ptr peer) -{ // Called from our JobQueue +{ bool sigGood = false; bool isTrusted = (job.getType() == jtPROPOSAL_t); @@ -922,7 +1059,8 @@ static void checkPropose(Job& job, boost::shared_ptr packe uint256 prevLedger; if (set.has_previousledger()) - { // proposal includes a previous ledger + { + // proposal includes a previous ledger WriteLog (lsTRACE, Peer) << "proposal with previous ledger"; memcpy(prevLedger.begin(), set.previousledger().data(), 256 / 8); if (!proposal->checkSign(set.signature())) @@ -968,7 +1106,7 @@ static void checkPropose(Job& job, boost::shared_ptr packe WriteLog (lsDEBUG, Peer) << "Not relaying untrusted proposal"; } -void Peer::recvPropose(const boost::shared_ptr& packet) +void PeerImp::recvPropose(const boost::shared_ptr& packet) { assert(packet); ripple::TMProposeSet& set = *packet; @@ -1028,7 +1166,7 @@ void Peer::recvPropose(const boost::shared_ptr& packet) mNodePublic, boost::weak_ptr(shared_from_this()))); } -void Peer::recvHaveTxSet(ripple::TMHaveTransactionSet& packet) +void PeerImp::recvHaveTxSet(ripple::TMHaveTransactionSet& packet) { uint256 hashes; if (packet.hash().size() != (256 / 8)) @@ -1082,7 +1220,7 @@ static void checkValidation(Job&, SerializedValidation::pointer val, uint256 sig #endif } -void Peer::recvValidation(const boost::shared_ptr& packet, ScopedLock& MasterLockHolder) +void PeerImp::recvValidation(const boost::shared_ptr& packet, ScopedLock& MasterLockHolder) { MasterLockHolder.unlock(); if (packet->validation().size() < 50) @@ -1121,22 +1259,22 @@ void Peer::recvValidation(const boost::shared_ptr& packet, #endif } -void Peer::recvGetValidation(ripple::TMGetValidations& packet) +void PeerImp::recvGetValidation(ripple::TMGetValidations& packet) { } -void Peer::recvContact(ripple::TMContact& packet) +void PeerImp::recvContact(ripple::TMContact& packet) { } -void Peer::recvGetContacts(ripple::TMGetContacts& packet) +void PeerImp::recvGetContacts(ripple::TMGetContacts& packet) { } // Return a list of your favorite people // TODO: filter out all the LAN peers // TODO: filter out the peer you are talking to -void Peer::recvGetPeers(ripple::TMGetPeers& packet, ScopedLock& MasterLockHolder) +void PeerImp::recvGetPeers(ripple::TMGetPeers& packet, ScopedLock& MasterLockHolder) { MasterLockHolder.unlock(); std::vector addrs; @@ -1168,7 +1306,7 @@ void Peer::recvGetPeers(ripple::TMGetPeers& packet, ScopedLock& MasterLockHolder } // TODO: filter out all the LAN peers -void Peer::recvPeers(ripple::TMPeers& packet) +void PeerImp::recvPeers(ripple::TMPeers& packet) { for (int i = 0; i < packet.nodes().size(); ++i) { @@ -1188,7 +1326,7 @@ void Peer::recvPeers(ripple::TMPeers& packet) } } -void Peer::recvGetObjectByHash(const boost::shared_ptr& ptr) +void PeerImp::recvGetObjectByHash(const boost::shared_ptr& ptr) { ripple::TMGetObjectByHash& packet = *ptr; @@ -1278,7 +1416,7 @@ void Peer::recvGetObjectByHash(const boost::shared_ptrgetJobQueue().addJob(jtPROOFWORK, "recvProof->doProof", - BIND_TYPE(&Peer::doProofOfWork, P_1, boost::weak_ptr(shared_from_this()), pow)); + theApp->getJobQueue ().addJob ( + jtPROOFWORK, + "recvProof->doProof", + BIND_TYPE (&PeerImp::doProofOfWork, P_1, boost::weak_ptr (shared_from_this()), pow)); return; } @@ -1366,7 +1506,7 @@ void Peer::recvProofWork(ripple::TMProofWork& packet) WriteLog (lsINFO, Peer) << "Received in valid proof of work object from peer"; } -void Peer::recvStatus(ripple::TMStatusChange& packet) +void PeerImp::recvStatus(ripple::TMStatusChange& packet) { WriteLog (lsTRACE, Peer) << "Received status change from peer " << getIP(); if (!packet.has_networktime()) @@ -1416,7 +1556,7 @@ void Peer::recvStatus(ripple::TMStatusChange& packet) mMaxLedger = packet.lastseq(); } -void Peer::recvGetLedger(ripple::TMGetLedger& packet, ScopedLock& MasterLockHolder) +void PeerImp::recvGetLedger(ripple::TMGetLedger& packet, ScopedLock& MasterLockHolder) { SHAMap::pointer map; ripple::TMLedgerData reply; @@ -1663,7 +1803,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet, ScopedLock& MasterLockHold sendPacket(oPacket, true); } -void Peer::recvLedger(const boost::shared_ptr& packet_ptr, ScopedLock& MasterLockHolder) +void PeerImp::recvLedger(const boost::shared_ptr& packet_ptr, ScopedLock& MasterLockHolder) { MasterLockHolder.unlock(); ripple::TMLedgerData& packet = *packet_ptr; @@ -1730,7 +1870,7 @@ void Peer::recvLedger(const boost::shared_ptr& packet_ptr, punishPeer(LT_UnwantedData); } -bool Peer::hasLedger(const uint256& hash, uint32 seq) const +bool PeerImp::hasLedger(const uint256& hash, uint32 seq) const { if ((seq != 0) && (seq >= mMinLedger) && (seq <= mMaxLedger)) return true; @@ -1740,7 +1880,7 @@ bool Peer::hasLedger(const uint256& hash, uint32 seq) const return false; } -void Peer::addLedger(const uint256& hash) +void PeerImp::addLedger(const uint256& hash) { BOOST_FOREACH(const uint256& ledger, mRecentLedgers) if (ledger == hash) @@ -1750,7 +1890,7 @@ void Peer::addLedger(const uint256& hash) mRecentLedgers.push_back(hash); } -bool Peer::hasTxSet(const uint256& hash) const +bool PeerImp::hasTxSet(const uint256& hash) const { BOOST_FOREACH(const uint256& set, mRecentTxSets) if (set == hash) @@ -1758,7 +1898,7 @@ bool Peer::hasTxSet(const uint256& hash) const return false; } -void Peer::addTxSet(const uint256& hash) +void PeerImp::addTxSet(const uint256& hash) { BOOST_FOREACH(const uint256& set, mRecentTxSets) if (set == hash) @@ -1770,7 +1910,7 @@ void Peer::addTxSet(const uint256& hash) // Get session information we can sign to prevent man in the middle attack. // (both sides get the same information, neither side controls it) -void Peer::getSessionCookie(std::string& strDst) +void PeerImp::getSessionCookie(std::string& strDst) { SSL* ssl = mSocketSsl.native_handle(); if (!ssl) throw std::runtime_error("No underlying connection"); @@ -1797,7 +1937,7 @@ void Peer::getSessionCookie(std::string& strDst) strDst.assign((char *) &sha1[0], sizeof(sha1)); } -void Peer::sendHello() +void PeerImp::sendHello() { std::string strCookie; std::vector vchSig; @@ -1832,7 +1972,7 @@ void Peer::sendHello() sendPacket(packet, true); } -void Peer::sendGetPeers() +void PeerImp::sendGetPeers() { // Ask peer for known other peers. ripple::TMGetPeers getPeers; @@ -1844,7 +1984,7 @@ void Peer::sendGetPeers() sendPacket(packet, true); } -void Peer::punishPeer(LoadType l) +void PeerImp::punishPeer(LoadType l) { if (theApp->getLoadManager().adjust(mLoad, l)) { @@ -1852,7 +1992,7 @@ void Peer::punishPeer(LoadType l) } } -void Peer::doProofOfWork(Job&, boost::weak_ptr peer, ProofOfWork::pointer pow) +void PeerImp::doProofOfWork(Job&, boost::weak_ptr peer, ProofOfWork::pointer pow) { if (peer.expired()) return; @@ -1879,7 +2019,7 @@ void Peer::doProofOfWork(Job&, boost::weak_ptr peer, ProofOfWork::pointer } } -void Peer::doFetchPack(const boost::shared_ptr& packet) +void PeerImp::doFetchPack(const boost::shared_ptr& packet) { if (theApp->getFeeTrack().isLoaded()) { @@ -1921,12 +2061,12 @@ void Peer::doFetchPack(const boost::shared_ptr& packe boost::weak_ptr(shared_from_this()), packet, wantLedger, haveLedger, UptimeTimer::getInstance().getElapsedSeconds ())); } -bool Peer::hasProto(int version) +bool PeerImp::hasProto(int version) { return mHello.has_protoversion() && (mHello.protoversion() >= version); } -Json::Value Peer::getJson() +Json::Value PeerImp::getJson() { Json::Value ret(Json::objectValue); @@ -1978,4 +2118,21 @@ Json::Value Peer::getJson() return ret; } +//------------------------------------------------------------------------------ + +Peer::pointer Peer::New (boost::asio::io_service& io_service, + boost::asio::ssl::context& ctx, + uint64 id, + bool inbound) +{ + return Peer::pointer (new PeerImp (io_service, ctx, id, inbound)); +} + +void Peer::punishPeer(const boost::weak_ptr& wp, LoadType l) +{ + Peer::pointer p = wp.lock(); + if (p) + p->punishPeer(l); +} + // vim:ts=4 diff --git a/src/cpp/ripple/ripple_Peer.h b/src/cpp/ripple/ripple_Peer.h new file mode 100644 index 000000000..f5146c7b5 --- /dev/null +++ b/src/cpp/ripple/ripple_Peer.h @@ -0,0 +1,95 @@ +#ifndef __PEER__ +#define __PEER__ + +#include +#include +#include +#include +#include +#include + +#include "Ledger.h" +#include "Transaction.h" +#include "LoadManager.h" + +typedef std::pair ipPort; + +class Peer : public boost::enable_shared_from_this +{ +public: + typedef boost::shared_ptr pointer; + typedef const boost::shared_ptr& ref; + + static int const psbGotHello = 0; + static int const psbSentHello = 1; + static int const psbInMap = 2; + static int const psbTrusted = 3; + static int const psbNoLedgers = 4; + static int const psbNoTransactions = 5; + static int const psbDownLevel = 6; + +public: + static pointer New (boost::asio::io_service& io_service, + boost::asio::ssl::context& ctx, + uint64 id, + bool inbound); + + // VFALCO: TODO see if this and below can be private + virtual void handleConnect (const boost::system::error_code& error, + boost::asio::ip::tcp::resolver::iterator it) = 0; + + virtual std::string& getIP () = 0; + + virtual std::string getDisplayName() = 0; + + virtual int getPort () = 0; + + virtual void setIpPort (const std::string& strIP, int iPort) = 0; + + virtual boost::asio::ssl::stream::lowest_layer_type& getSocket() = 0; + + virtual void connect (const std::string& strIp, int iPort) = 0; + + virtual void connected (const boost::system::error_code& error) = 0; + + virtual void detach (const char *, bool onIOStrand) = 0; + + //virtual bool samePeer (Peer::ref p) = 0; + //virtual bool samePeer (const Peer& p) = 0; + + virtual void sendPacket (const PackedMessage::pointer& packet, bool onStrand) = 0; + + virtual void sendGetPeers () = 0; + + virtual void punishPeer (LoadType) = 0; + + // VFALCO: NOTE, what's with this odd parameter passing? Why the static member? + static void punishPeer (const boost::weak_ptr&, LoadType); + + virtual Json::Value getJson () = 0; + + virtual bool isConnected () const = 0; + + virtual bool isInbound () const = 0; + + virtual bool isOutbound () const = 0; + + virtual const uint256& getClosedLedgerHash () const = 0; + + virtual bool hasLedger (const uint256& hash, uint32 seq) const = 0; + + virtual bool hasTxSet (const uint256& hash) const = 0; + + virtual uint64 getPeerId () const = 0; + + virtual const RippleAddress& getNodePublic () const = 0; + + virtual void cycleStatus () = 0; + + virtual bool hasProto (int version) = 0; + + virtual bool hasRange (uint32 uMin, uint32 uMax) = 0; +}; + +#endif +// vim:ts=4