Refactor Peer

This commit is contained in:
Vinnie Falco
2013-06-05 08:38:27 -07:00
parent 38edcc8f0d
commit d8c97c2149
10 changed files with 460 additions and 287 deletions

View File

@@ -309,8 +309,10 @@ Peer::pointer ConnectionPool::peerConnect(const std::string& strIp, int iPort)
boost::recursive_mutex::scoped_lock sl(mPeerLock);
if (mIpMap.find(pipPeer) == mIpMap.end())
{
ppResult = Peer::create(theApp->getIOService(), theApp->getPeerDoor().getSSLContext(),
++mLastPeer, false);
ppResult = Peer::New (theApp->getIOService(),
theApp->getPeerDoor().getSSLContext(),
++mLastPeer,
false);
mIpMap[pipPeer] = ppResult;
// ++miConnectStarting;

View File

@@ -1,23 +1,9 @@
#ifndef __PEER__
#define __PEER__
#ifndef RIPPLE_PEER_H
#define RIPPLE_PEER_H
#include <bitset>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
#include "Ledger.h"
#include "Transaction.h"
#include "LoadManager.h"
typedef std::pair<std::string,int> ipPort;
DEFINE_INSTANCE(Peer);
typedef std::pair <std::string,int> ipPort;
class Peer : public boost::enable_shared_from_this <Peer>
, public IS_INSTANCE (Peer)
{
public:
typedef boost::shared_ptr<Peer> pointer;
@@ -32,141 +18,66 @@ public:
static int const psbDownLevel = 6;
public:
//bool operator == (const Peer& other);
static pointer New (boost::asio::io_service& io_service,
boost::asio::ssl::context& ctx,
uint64 id,
bool inbound);
void handleConnect (const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it);
// VFALCO: TODO see if this and below can be private
virtual void handleConnect (const boost::system::error_code& error,
boost::asio::ip::tcp::resolver::iterator it) = 0;
std::string& getIP() { return mIpPort.first; }
std::string getDisplayName() { return mCluster ? mNodeName : mIpPort.first; }
int getPort() { return mIpPort.second; }
virtual std::string& getIP () = 0;
virtual std::string getDisplayName() = 0;
virtual int getPort () = 0;
void setIpPort(const std::string& strIP, int iPort);
virtual void setIpPort (const std::string& strIP, int iPort) = 0;
static pointer create(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 id, bool inbound)
{
return pointer(new Peer(io_service, ctx, id, inbound));
}
virtual boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::lowest_layer_type& getSocket() = 0;
boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::lowest_layer_type& getSocket()
{
return mSocketSsl.lowest_layer();
}
virtual void connect (const std::string& strIp, int iPort) = 0;
virtual void connected (const boost::system::error_code& error) = 0;
virtual void detach (const char *, bool onIOStrand) = 0;
//virtual bool samePeer (Peer::ref p) = 0;
//virtual bool samePeer (const Peer& p) = 0;
void connect(const std::string& strIp, int iPort);
void connected(const boost::system::error_code& error);
void detach(const char *, bool onIOStrand);
bool samePeer(Peer::ref p) { return samePeer(*p); }
bool samePeer(const Peer& p) { return this == &p; }
virtual void sendPacket (const PackedMessage::pointer& packet, bool onStrand) = 0;
virtual void sendGetPeers () = 0;
void sendPacket(const PackedMessage::pointer& packet, bool onStrand);
void sendLedgerProposal(Ledger::ref ledger);
void sendFullLedger(Ledger::ref ledger);
void sendGetFullLedger(uint256& hash);
void sendGetPeers();
void punishPeer(LoadType);
virtual void punishPeer (LoadType) = 0;
// VFALCO: NOTE, what's with this odd parameter passing? Why the static member?
static void punishPeer(const boost::weak_ptr<Peer>&, LoadType);
static void punishPeer (const boost::weak_ptr<Peer>&, LoadType);
Json::Value getJson();
bool isConnected() const { return mHelloed && !mDetaching; }
bool isInbound() const { return mInbound; }
bool isOutbound() const { return !mInbound; }
virtual Json::Value getJson () = 0;
const uint256& getClosedLedgerHash() const { return mClosedLedgerHash; }
bool hasLedger(const uint256& hash, uint32 seq) const;
bool hasTxSet(const uint256& hash) const;
uint64 getPeerId() const { return mPeerId; }
virtual bool isConnected () const = 0;
virtual bool isInbound () const = 0;
virtual bool isOutbound () const = 0;
const RippleAddress& getNodePublic() const { return mNodePublic; }
void cycleStatus() { mPreviousLedgerHash = mClosedLedgerHash; mClosedLedgerHash.zero(); }
bool hasProto(int version);
bool hasRange(uint32 uMin, uint32 uMax) { return (uMin >= mMinLedger) && (uMax <= mMaxLedger); }
virtual const uint256& getClosedLedgerHash () const = 0;
private:
bool mInbound; // Connection is inbound
bool mClientConnect; // In process of connecting as client.
bool mHelloed; // True, if hello accepted.
bool mDetaching; // True, if detaching.
int mActive; // 0=idle, 1=pingsent, 2=active
bool mCluster; // Node in our cluster
RippleAddress mNodePublic; // Node public key of peer.
std::string mNodeName;
ipPort mIpPort;
ipPort mIpPortConnect;
uint256 mCookieHash;
uint64 mPeerId;
bool mPrivate; // Keep peer IP private.
LoadSource mLoad;
uint32 mMinLedger, mMaxLedger;
virtual bool hasLedger (const uint256& hash, uint32 seq) const = 0;
virtual bool hasTxSet (const uint256& hash) const = 0;
virtual uint64 getPeerId () const = 0;
uint256 mClosedLedgerHash;
uint256 mPreviousLedgerHash;
std::list<uint256> mRecentLedgers;
std::list<uint256> mRecentTxSets;
boost::asio::ssl::stream<boost::asio::ip::tcp::socket> mSocketSsl;
boost::asio::deadline_timer mActivityTimer;
void handleStart(const boost::system::error_code& ecResult);
void handleVerifyTimer(const boost::system::error_code& ecResult);
void handlePingTimer(const boost::system::error_code& ecResult);
private:
boost::asio::io_service::strand mIOStrand;
std::vector<uint8_t> mReadbuf;
std::list<PackedMessage::pointer> mSendQ;
PackedMessage::pointer mSendingPacket;
ripple::TMStatusChange mLastStatus;
ripple::TMHello mHello;
Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 peerId, bool inbound);
void handleShutdown(const boost::system::error_code& error) { ; }
void handleWrite(const boost::system::error_code& error, size_t bytes_transferred);
void handleReadHeader(const boost::system::error_code& error);
void handleReadBody(const boost::system::error_code& error);
void processReadBuffer();
void startReadHeader();
void startReadBody(unsigned msg_len);
void sendPacketForce(const PackedMessage::pointer& packet);
void sendHello();
void recvHello(ripple::TMHello& packet);
void recvTransaction(ripple::TMTransaction& packet, ScopedLock& MasterLockHolder);
void recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet, ScopedLock& MasterLockHolder);
void recvGetValidation(ripple::TMGetValidations& packet);
void recvContact(ripple::TMContact& packet);
void recvGetContacts(ripple::TMGetContacts& packet);
void recvGetPeers(ripple::TMGetPeers& packet, ScopedLock& MasterLockHolder);
void recvPeers(ripple::TMPeers& packet);
void recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash>& packet);
void recvPing(ripple::TMPing& packet);
void recvErrorMessage(ripple::TMErrorMsg& packet);
void recvSearchTransaction(ripple::TMSearchTransaction& packet);
void recvGetAccount(ripple::TMGetAccount& packet);
void recvAccount(ripple::TMAccount& packet);
void recvGetLedger(ripple::TMGetLedger& packet, ScopedLock& MasterLockHolder);
void recvLedger(const boost::shared_ptr<ripple::TMLedgerData>& packet, ScopedLock& MasterLockHolder);
void recvStatus(ripple::TMStatusChange& packet);
void recvPropose(const boost::shared_ptr<ripple::TMProposeSet>& packet);
void recvHaveTxSet(ripple::TMHaveTransactionSet& packet);
void recvProofWork(ripple::TMProofWork& packet);
void getSessionCookie(std::string& strDst);
void addLedger(const uint256& ledger);
void addTxSet(const uint256& TxSet);
void doFetchPack(const boost::shared_ptr<ripple::TMGetObjectByHash>& packet);
// VFALCO: NOTE, why is this a static member instead of a regular member?
static void doProofOfWork(Job&, boost::weak_ptr<Peer>, ProofOfWork::pointer);
virtual const RippleAddress& getNodePublic () const = 0;
virtual void cycleStatus () = 0;
virtual bool hasProto (int version) = 0;
virtual bool hasRange (uint32 uMin, uint32 uMax) = 0;
};
#endif

View File

@@ -38,8 +38,11 @@ PeerDoor::PeerDoor(boost::asio::io_service& io_service) :
void PeerDoor::startListening()
{
Peer::pointer new_connection = Peer::create(mAcceptor.get_io_service(), mCtx,
theApp->getConnectionPool().assignPeerId(), true);
Peer::pointer new_connection = Peer::New (
mAcceptor.get_io_service(),
mCtx,
theApp->getConnectionPool().assignPeerId(),
true);
mAcceptor.async_accept(new_connection->getSocket(),
boost::bind(&PeerDoor::handleConnect, this, new_connection,

View File

@@ -888,7 +888,8 @@ Json::Value RPCHandler::doProofCreate(Json::Value jvRequest, int& cost, ScopedLo
if (jvRequest.isMember("difficulty") || jvRequest.isMember("secret"))
{
ProofOfWorkFactory pgGen;
// VFALCO: TODO, why aren't we using the app's factory?
beast::ScopedPointer <IProofOfWorkFactory> pgGen (IProofOfWorkFactory::New ());
if (jvRequest.isMember("difficulty"))
{
@@ -900,18 +901,20 @@ Json::Value RPCHandler::doProofCreate(Json::Value jvRequest, int& cost, ScopedLo
if (iDifficulty < 0 || iDifficulty > ProofOfWork::sMaxDifficulty)
return rpcError(rpcINVALID_PARAMS);
pgGen.setDifficulty(iDifficulty);
pgGen->setDifficulty(iDifficulty);
}
if (jvRequest.isMember("secret"))
{
uint256 uSecret(jvRequest["secret"].asString());
pgGen.setSecret(uSecret);
pgGen->setSecret(uSecret);
}
jvResult["token"] = pgGen.getProof().getToken();
jvResult["secret"] = pgGen.getSecret().GetHex();
} else {
jvResult["token"] = pgGen->getProof().getToken();
jvResult["secret"] = pgGen->getSecret().GetHex();
}
else
{
jvResult["token"] = theApp->getProofOfWorkFactory().getProof().getToken();
}
@@ -970,7 +973,8 @@ Json::Value RPCHandler::doProofVerify(Json::Value jvRequest, int& cost, ScopedLo
POWResult prResult;
if (jvRequest.isMember("difficulty") || jvRequest.isMember("secret"))
{
ProofOfWorkFactory pgGen;
// VFALCO: TODO, why aren't we using the app's factory?
beast::ScopedPointer <IProofOfWorkFactory> pgGen (IProofOfWorkFactory::New ());
if (jvRequest.isMember("difficulty"))
{
@@ -982,18 +986,18 @@ Json::Value RPCHandler::doProofVerify(Json::Value jvRequest, int& cost, ScopedLo
if (iDifficulty < 0 || iDifficulty > ProofOfWork::sMaxDifficulty)
return rpcError(rpcINVALID_PARAMS);
pgGen.setDifficulty(iDifficulty);
pgGen->setDifficulty(iDifficulty);
}
if (jvRequest.isMember("secret"))
{
uint256 uSecret(jvRequest["secret"].asString());
pgGen.setSecret(uSecret);
pgGen->setSecret(uSecret);
}
prResult = pgGen.checkProof(strToken, uSolution);
prResult = pgGen->checkProof(strToken, uSolution);
jvResult["secret"] = pgGen.getSecret().GetHex();
jvResult["secret"] = pgGen->getSecret().GetHex();
}
else
{

View File

@@ -1,20 +1,13 @@
#include <iostream>
#include <boost/bind.hpp>
#include <boost/foreach.hpp>
#include <boost/make_shared.hpp>
#include <boost/ref.hpp>
#include "Version.h"
#include "Application.h"
#include "SerializedTransaction.h"
// VFALCO: TODO, make this an inline function
#define ADDRESS(p) strHex(uint64( ((char*) p) - ((char*) 0)))
SETUP_LOG (Peer)
DECLARE_INSTANCE(Peer);
class PeerImp;
DEFINE_INSTANCE (PeerImp);
DECLARE_INSTANCE(PeerImp);
// Don't try to run past receiving nonsense from a peer
#define TRUST_NETWORK
@@ -25,7 +18,141 @@ DECLARE_INSTANCE(Peer);
// Idle nodes are probed this often
#define NODE_IDLE_SECONDS 120
Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 peerID, bool inbound) :
class PeerImp : public Peer
, public IS_INSTANCE (PeerImp)
{
public:
PeerImp (boost::asio::io_service& io_service,
boost::asio::ssl::context& ctx,
uint64 peerId,
bool inbound);
void handleConnect (const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it);
std::string& getIP() { return mIpPort.first; }
std::string getDisplayName() { return mCluster ? mNodeName : mIpPort.first; }
int getPort() { return mIpPort.second; }
void setIpPort(const std::string& strIP, int iPort);
boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::lowest_layer_type& getSocket()
{
return mSocketSsl.lowest_layer();
}
void connect(const std::string& strIp, int iPort);
void connected(const boost::system::error_code& error);
void detach(const char *, bool onIOStrand);
// VFALCO: Seems no one is using these
//bool samePeer (Peer::ref p) { return samePeer(*p); }
//bool samePeer (const Peer& p) { return this == &p; }
void sendPacket(const PackedMessage::pointer& packet, bool onStrand);
void sendGetPeers();
void punishPeer(LoadType);
Json::Value getJson();
bool isConnected() const { return mHelloed && !mDetaching; }
bool isInbound() const { return mInbound; }
bool isOutbound() const { return !mInbound; }
const uint256& getClosedLedgerHash() const { return mClosedLedgerHash; }
bool hasLedger(const uint256& hash, uint32 seq) const;
bool hasTxSet(const uint256& hash) const;
uint64 getPeerId() const { return mPeerId; }
const RippleAddress& getNodePublic() const { return mNodePublic; }
void cycleStatus() { mPreviousLedgerHash = mClosedLedgerHash; mClosedLedgerHash.zero(); }
bool hasProto(int version);
bool hasRange(uint32 uMin, uint32 uMax) { return (uMin >= mMinLedger) && (uMax <= mMaxLedger); }
private:
bool mInbound; // Connection is inbound
bool mClientConnect; // In process of connecting as client.
bool mHelloed; // True, if hello accepted.
bool mDetaching; // True, if detaching.
int mActive; // 0=idle, 1=pingsent, 2=active
bool mCluster; // Node in our cluster
RippleAddress mNodePublic; // Node public key of peer.
std::string mNodeName;
ipPort mIpPort;
ipPort mIpPortConnect;
uint256 mCookieHash;
uint64 mPeerId;
bool mPrivate; // Keep peer IP private.
LoadSource mLoad;
uint32 mMinLedger, mMaxLedger;
uint256 mClosedLedgerHash;
uint256 mPreviousLedgerHash;
std::list<uint256> mRecentLedgers;
std::list<uint256> mRecentTxSets;
boost::asio::ssl::stream<boost::asio::ip::tcp::socket> mSocketSsl;
boost::asio::deadline_timer mActivityTimer;
boost::asio::io_service::strand mIOStrand;
std::vector<uint8_t> mReadbuf;
std::list<PackedMessage::pointer> mSendQ;
PackedMessage::pointer mSendingPacket;
ripple::TMStatusChange mLastStatus;
ripple::TMHello mHello;
private:
void handleShutdown(const boost::system::error_code& error) { ; }
void handleWrite(const boost::system::error_code& error, size_t bytes_transferred);
void handleReadHeader(const boost::system::error_code& error);
void handleReadBody(const boost::system::error_code& error);
void handleStart(const boost::system::error_code& ecResult);
void handleVerifyTimer(const boost::system::error_code& ecResult);
void handlePingTimer(const boost::system::error_code& ecResult);
void processReadBuffer();
void startReadHeader();
void startReadBody(unsigned msg_len);
void sendPacketForce(const PackedMessage::pointer& packet);
void sendHello();
void recvHello(ripple::TMHello& packet);
void recvTransaction(ripple::TMTransaction& packet, ScopedLock& MasterLockHolder);
void recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet, ScopedLock& MasterLockHolder);
void recvGetValidation(ripple::TMGetValidations& packet);
void recvContact(ripple::TMContact& packet);
void recvGetContacts(ripple::TMGetContacts& packet);
void recvGetPeers(ripple::TMGetPeers& packet, ScopedLock& MasterLockHolder);
void recvPeers(ripple::TMPeers& packet);
void recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash>& packet);
void recvPing(ripple::TMPing& packet);
void recvErrorMessage(ripple::TMErrorMsg& packet);
void recvSearchTransaction(ripple::TMSearchTransaction& packet);
void recvGetAccount(ripple::TMGetAccount& packet);
void recvAccount(ripple::TMAccount& packet);
void recvGetLedger(ripple::TMGetLedger& packet, ScopedLock& MasterLockHolder);
void recvLedger(const boost::shared_ptr<ripple::TMLedgerData>& packet, ScopedLock& MasterLockHolder);
void recvStatus(ripple::TMStatusChange& packet);
void recvPropose(const boost::shared_ptr<ripple::TMProposeSet>& packet);
void recvHaveTxSet(ripple::TMHaveTransactionSet& packet);
void recvProofWork(ripple::TMProofWork& packet);
void getSessionCookie(std::string& strDst);
void addLedger(const uint256& ledger);
void addTxSet(const uint256& TxSet);
void doFetchPack(const boost::shared_ptr<ripple::TMGetObjectByHash>& packet);
// VFALCO: NOTE, why is this a static member instead of a regular member?
static void doProofOfWork (Job&, boost::weak_ptr <Peer>, ProofOfWork::pointer);
};
PeerImp::PeerImp (boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 peerID, bool inbound) :
mInbound(inbound),
mHelloed(false),
mDetaching(false),
@@ -43,11 +170,11 @@ Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx,
WriteLog (lsDEBUG, Peer) << "CREATING PEER: " << ADDRESS(this);
}
void Peer::handleWrite(const boost::system::error_code& error, size_t bytes_transferred)
void PeerImp::handleWrite(const boost::system::error_code& error, size_t bytes_transferred)
{ // Call on IO strand
#ifdef DEBUG
// if (!error)
// std::cerr << "Peer::handleWrite bytes: "<< bytes_transferred << std::endl;
// std::cerr << "PeerImp::handleWrite bytes: "<< bytes_transferred << std::endl;
#endif
mSendingPacket.reset();
@@ -75,7 +202,7 @@ void Peer::handleWrite(const boost::system::error_code& error, size_t bytes_tran
}
}
void Peer::setIpPort(const std::string& strIP, int iPort)
void PeerImp::setIpPort(const std::string& strIP, int iPort)
{
mIpPort = make_pair(strIP, iPort);
mLoad.rename(strIP);
@@ -85,11 +212,11 @@ void Peer::setIpPort(const std::string& strIP, int iPort)
<< (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort();
}
void Peer::detach(const char *rsn, bool onIOStrand)
void PeerImp::detach(const char *rsn, bool onIOStrand)
{
if (!onIOStrand)
{
mIOStrand.post(boost::bind(&Peer::detach, shared_from_this(), rsn, true));
mIOStrand.post(boost::bind (&Peer::detach, shared_from_this(), rsn, true));
return;
}
if (!mDetaching)
@@ -107,7 +234,8 @@ void Peer::detach(const char *rsn, bool onIOStrand)
mSendQ.clear();
(void) mActivityTimer.cancel();
mSocketSsl.async_shutdown(mIOStrand.wrap(boost::bind(&Peer::handleShutdown, shared_from_this(),
mSocketSsl.async_shutdown (mIOStrand.wrap(boost::bind
(&PeerImp::handleShutdown, boost::static_pointer_cast <PeerImp> (shared_from_this()),
boost::asio::placeholders::error)));
if (mNodePublic.isValid())
@@ -134,7 +262,7 @@ void Peer::detach(const char *rsn, bool onIOStrand)
}
}
void Peer::handlePingTimer(const boost::system::error_code& ecResult)
void PeerImp::handlePingTimer(const boost::system::error_code& ecResult)
{ // called on IO strand
if (ecResult || mDetaching)
return;
@@ -156,12 +284,14 @@ void Peer::handlePingTimer(const boost::system::error_code& ecResult)
mActive = 0;
mActivityTimer.expires_from_now(boost::posix_time::seconds(NODE_IDLE_SECONDS));
mActivityTimer.async_wait(mIOStrand.wrap(boost::bind(&Peer::handlePingTimer, shared_from_this(),
mActivityTimer.async_wait (mIOStrand.wrap (boost::bind (
&PeerImp::handlePingTimer,
boost::static_pointer_cast <PeerImp> (shared_from_this()),
boost::asio::placeholders::error)));
}
void Peer::handleVerifyTimer(const boost::system::error_code& ecResult)
void PeerImp::handleVerifyTimer(const boost::system::error_code& ecResult)
{
if (ecResult == boost::asio::error::operation_aborted)
{
@@ -184,7 +314,7 @@ void Peer::handleVerifyTimer(const boost::system::error_code& ecResult)
// Begin trying to connect. We are not connected till we know and accept peer's public key.
// Only takes IP addresses (not domains).
void Peer::connect(const std::string& strIp, int iPort)
void PeerImp::connect(const std::string& strIp, int iPort)
{
int iPortAct = (iPort <= 0) ? SYSTEM_PEER_PORT : iPort;
@@ -209,7 +339,10 @@ void Peer::connect(const std::string& strIp, int iPort)
else
{
mActivityTimer.expires_from_now(boost::posix_time::seconds(NODE_VERIFY_SECONDS), err);
mActivityTimer.async_wait(mIOStrand.wrap(boost::bind(&Peer::handleVerifyTimer, shared_from_this(),
mActivityTimer.async_wait (mIOStrand.wrap (boost::bind (
&PeerImp::handleVerifyTimer,
boost::static_pointer_cast <PeerImp> (shared_from_this()),
boost::asio::placeholders::error)));
if (err)
@@ -228,8 +361,8 @@ void Peer::connect(const std::string& strIp, int iPort)
getSocket(),
itrEndpoint,
mIOStrand.wrap(boost::bind(
&Peer::handleConnect,
shared_from_this(),
&PeerImp::handleConnect,
boost::static_pointer_cast <PeerImp> (shared_from_this ()),
boost::asio::placeholders::error,
boost::asio::placeholders::iterator)));
}
@@ -239,7 +372,7 @@ void Peer::connect(const std::string& strIp, int iPort)
// Have it say who it is so we know to avoid redundant connections.
// Establish that it really who we are talking to by having it sign a connection detail.
// Also need to establish no man in the middle attack is in progress.
void Peer::handleStart(const boost::system::error_code& error)
void PeerImp::handleStart(const boost::system::error_code& error)
{
if (error)
{
@@ -254,7 +387,7 @@ void Peer::handleStart(const boost::system::error_code& error)
}
// Connect ssl as client.
void Peer::handleConnect(const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it)
void PeerImp::handleConnect(const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it)
{
if (error)
{
@@ -267,14 +400,17 @@ void Peer::handleConnect(const boost::system::error_code& error, boost::asio::ip
mSocketSsl.set_verify_mode(boost::asio::ssl::verify_none);
mSocketSsl.async_handshake(boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::client,
mIOStrand.wrap(boost::bind(&Peer::handleStart, shared_from_this(), boost::asio::placeholders::error)));
mSocketSsl.async_handshake (boost::asio::ssl::stream <boost::asio::ip::tcp::socket>::client,
mIOStrand.wrap (boost::bind (
&PeerImp::handleStart,
boost::static_pointer_cast <PeerImp> (shared_from_this()),
boost::asio::placeholders::error)));
}
}
// Connect ssl as server to an inbound connection.
// - We don't bother remembering the inbound IP or port. Only useful for debugging.
void Peer::connected(const boost::system::error_code& error)
void PeerImp::connected(const boost::system::error_code& error)
{
boost::asio::ip::tcp::endpoint ep = getSocket().remote_endpoint();
int iPort = ep.port();
@@ -296,7 +432,7 @@ void Peer::connected(const boost::system::error_code& error)
mSocketSsl.set_verify_mode(boost::asio::ssl::verify_none);
mSocketSsl.async_handshake(boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::server,
mIOStrand.wrap(boost::bind(&Peer::handleStart, shared_from_this(), boost::asio::placeholders::error)));
mIOStrand.wrap(boost::bind(&PeerImp::handleStart, boost::static_pointer_cast <PeerImp> (shared_from_this()), boost::asio::placeholders::error)));
}
else if (!mDetaching)
{
@@ -306,26 +442,27 @@ void Peer::connected(const boost::system::error_code& error)
}
}
void Peer::sendPacketForce(const PackedMessage::pointer& packet)
void PeerImp::sendPacketForce(const PackedMessage::pointer& packet)
{ // must be on IO strand
if (!mDetaching)
{
mSendingPacket = packet;
boost::asio::async_write(mSocketSsl, boost::asio::buffer(packet->getBuffer()),
mIOStrand.wrap(boost::bind(&Peer::handleWrite, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
mIOStrand.wrap (boost::bind(&PeerImp::handleWrite,
boost::static_pointer_cast <PeerImp> (shared_from_this()),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
}
}
void Peer::sendPacket(const PackedMessage::pointer& packet, bool onStrand)
void PeerImp::sendPacket(const PackedMessage::pointer& packet, bool onStrand)
{
if (packet)
{
if (!onStrand)
{
mIOStrand.post(boost::bind(&Peer::sendPacket, shared_from_this(), packet, true));
mIOStrand.post(boost::bind (&Peer::sendPacket, shared_from_this(), packet, true));
return;
}
if (mSendingPacket)
@@ -339,19 +476,22 @@ void Peer::sendPacket(const PackedMessage::pointer& packet, bool onStrand)
}
}
void Peer::startReadHeader()
void PeerImp::startReadHeader()
{
if (!mDetaching)
{
mReadbuf.clear();
mReadbuf.resize(PackedMessage::kHeaderBytes);
boost::asio::async_read(mSocketSsl, boost::asio::buffer(mReadbuf), mIOStrand.wrap(
boost::bind(&Peer::handleReadHeader, shared_from_this(), boost::asio::placeholders::error)));
boost::asio::async_read(mSocketSsl,
boost::asio::buffer(mReadbuf),
mIOStrand.wrap (boost::bind (&PeerImp::handleReadHeader,
boost::static_pointer_cast <PeerImp> (shared_from_this()),
boost::asio::placeholders::error)));
}
}
void Peer::startReadBody(unsigned msg_len)
void PeerImp::startReadBody(unsigned msg_len)
{
// m_readbuf already contains the header in its first PackedMessage::kHeaderBytes
// bytes. Expand it to fit in the body as well, and start async
@@ -361,12 +501,15 @@ void Peer::startReadBody(unsigned msg_len)
{
mReadbuf.resize(PackedMessage::kHeaderBytes + msg_len);
boost::asio::async_read(mSocketSsl, boost::asio::buffer(&mReadbuf[PackedMessage::kHeaderBytes], msg_len),
mIOStrand.wrap(boost::bind(&Peer::handleReadBody, shared_from_this(), boost::asio::placeholders::error)));
boost::asio::async_read (mSocketSsl,
boost::asio::buffer (&mReadbuf [PackedMessage::kHeaderBytes], msg_len),
mIOStrand.wrap (boost::bind (&PeerImp::handleReadBody,
boost::static_pointer_cast <PeerImp> (shared_from_this()),
boost::asio::placeholders::error)));
}
}
void Peer::handleReadHeader(const boost::system::error_code& error)
void PeerImp::handleReadHeader(const boost::system::error_code& error)
{
if (mDetaching)
{
@@ -399,7 +542,7 @@ void Peer::handleReadHeader(const boost::system::error_code& error)
}
}
void Peer::handleReadBody(const boost::system::error_code& error)
void PeerImp::handleReadBody(const boost::system::error_code& error)
{
if (mDetaching)
{
@@ -425,16 +568,16 @@ void Peer::handleReadBody(const boost::system::error_code& error)
startReadHeader();
}
void Peer::processReadBuffer()
void PeerImp::processReadBuffer()
{ // must not hold peer lock
int type = PackedMessage::getType(mReadbuf);
#ifdef DEBUG
// std::cerr << "PRB(" << type << "), len=" << (mReadbuf.size()-PackedMessage::kHeaderBytes) << std::endl;
#endif
// std::cerr << "Peer::processReadBuffer: " << mIpPort.first << " " << mIpPort.second << std::endl;
// std::cerr << "PeerImp::processReadBuffer: " << mIpPort.first << " " << mIpPort.second << std::endl;
LoadEvent::autoptr event(theApp->getJobQueue().getLoadEventAP(jtPEER, "Peer::read"));
LoadEvent::autoptr event(theApp->getJobQueue().getLoadEventAP(jtPEER, "PeerImp::read"));
ScopedLock sl(theApp->getMasterLock());
@@ -450,7 +593,7 @@ void Peer::processReadBuffer()
{
case ripple::mtHELLO:
{
event->reName("Peer::hello");
event->reName("PeerImp::hello");
ripple::TMHello msg;
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvHello(msg);
@@ -461,7 +604,7 @@ void Peer::processReadBuffer()
case ripple::mtERROR_MSG:
{
event->reName("Peer::errormessage");
event->reName("PeerImp::errormessage");
ripple::TMErrorMsg msg;
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvErrorMessage(msg);
@@ -472,7 +615,7 @@ void Peer::processReadBuffer()
case ripple::mtPING:
{
event->reName("Peer::ping");
event->reName("PeerImp::ping");
ripple::TMPing msg;
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvPing(msg);
@@ -483,7 +626,7 @@ void Peer::processReadBuffer()
case ripple::mtGET_CONTACTS:
{
event->reName("Peer::getcontacts");
event->reName("PeerImp::getcontacts");
ripple::TMGetContacts msg;
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvGetContacts(msg);
@@ -494,7 +637,7 @@ void Peer::processReadBuffer()
case ripple::mtCONTACT:
{
event->reName("Peer::contact");
event->reName("PeerImp::contact");
ripple::TMContact msg;
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
@@ -506,7 +649,7 @@ void Peer::processReadBuffer()
case ripple::mtGET_PEERS:
{
event->reName("Peer::getpeers");
event->reName("PeerImp::getpeers");
ripple::TMGetPeers msg;
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
@@ -518,7 +661,7 @@ void Peer::processReadBuffer()
case ripple::mtPEERS:
{
event->reName("Peer::peers");
event->reName("PeerImp::peers");
ripple::TMPeers msg;
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
@@ -530,7 +673,7 @@ void Peer::processReadBuffer()
case ripple::mtSEARCH_TRANSACTION:
{
event->reName("Peer::searchtransaction");
event->reName("PeerImp::searchtransaction");
ripple::TMSearchTransaction msg;
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvSearchTransaction(msg);
@@ -541,7 +684,7 @@ void Peer::processReadBuffer()
case ripple::mtGET_ACCOUNT:
{
event->reName("Peer::getaccount");
event->reName("PeerImp::getaccount");
ripple::TMGetAccount msg;
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvGetAccount(msg);
@@ -552,7 +695,7 @@ void Peer::processReadBuffer()
case ripple::mtACCOUNT:
{
event->reName("Peer::account");
event->reName("PeerImp::account");
ripple::TMAccount msg;
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvAccount(msg);
@@ -563,7 +706,7 @@ void Peer::processReadBuffer()
case ripple::mtTRANSACTION:
{
event->reName("Peer::transaction");
event->reName("PeerImp::transaction");
ripple::TMTransaction msg;
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvTransaction(msg, sl);
@@ -574,7 +717,7 @@ void Peer::processReadBuffer()
case ripple::mtSTATUS_CHANGE:
{
event->reName("Peer::statuschange");
event->reName("PeerImp::statuschange");
ripple::TMStatusChange msg;
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvStatus(msg);
@@ -585,7 +728,7 @@ void Peer::processReadBuffer()
case ripple::mtPROPOSE_LEDGER:
{
event->reName("Peer::propose");
event->reName("PeerImp::propose");
boost::shared_ptr<ripple::TMProposeSet> msg = boost::make_shared<ripple::TMProposeSet>();
if (msg->ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvPropose(msg);
@@ -596,7 +739,7 @@ void Peer::processReadBuffer()
case ripple::mtGET_LEDGER:
{
event->reName("Peer::getledger");
event->reName("PeerImp::getledger");
ripple::TMGetLedger msg;
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvGetLedger(msg, sl);
@@ -607,7 +750,7 @@ void Peer::processReadBuffer()
case ripple::mtLEDGER_DATA:
{
event->reName("Peer::ledgerdata");
event->reName("PeerImp::ledgerdata");
boost::shared_ptr<ripple::TMLedgerData> msg = boost::make_shared<ripple::TMLedgerData>();
if (msg->ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvLedger(msg, sl);
@@ -618,7 +761,7 @@ void Peer::processReadBuffer()
case ripple::mtHAVE_SET:
{
event->reName("Peer::haveset");
event->reName("PeerImp::haveset");
ripple::TMHaveTransactionSet msg;
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvHaveTxSet(msg);
@@ -629,7 +772,7 @@ void Peer::processReadBuffer()
case ripple::mtVALIDATION:
{
event->reName("Peer::validation");
event->reName("PeerImp::validation");
boost::shared_ptr<ripple::TMValidation> msg = boost::make_shared<ripple::TMValidation>();
if (msg->ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvValidation(msg, sl);
@@ -651,7 +794,7 @@ void Peer::processReadBuffer()
#endif
case ripple::mtGET_OBJECTS:
{
event->reName("Peer::getobjects");
event->reName("PeerImp::getobjects");
boost::shared_ptr<ripple::TMGetObjectByHash> msg = boost::make_shared<ripple::TMGetObjectByHash>();
if (msg->ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvGetObjectByHash(msg);
@@ -662,7 +805,7 @@ void Peer::processReadBuffer()
case ripple::mtPROOFOFWORK:
{
event->reName("Peer::proofofwork");
event->reName("PeerImp::proofofwork");
ripple::TMProofWork msg;
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvProofWork(msg);
@@ -673,27 +816,20 @@ void Peer::processReadBuffer()
default:
event->reName("Peer::unknown");
event->reName("PeerImp::unknown");
WriteLog (lsWARNING, Peer) << "Unknown Msg: " << type;
WriteLog (lsWARNING, Peer) << strHex(&mReadbuf[0], mReadbuf.size());
}
}
}
void Peer::punishPeer(const boost::weak_ptr<Peer>& wp, LoadType l)
{
Peer::pointer p = wp.lock();
if (p)
p->punishPeer(l);
}
void Peer::recvHello(ripple::TMHello& packet)
void PeerImp::recvHello(ripple::TMHello& packet)
{
bool bDetach = true;
(void) mActivityTimer.cancel();
mActivityTimer.expires_from_now(boost::posix_time::seconds(NODE_IDLE_SECONDS));
mActivityTimer.async_wait(mIOStrand.wrap(boost::bind(&Peer::handlePingTimer, shared_from_this(),
mActivityTimer.async_wait(mIOStrand.wrap(boost::bind(&PeerImp::handlePingTimer, boost::static_pointer_cast <PeerImp> (shared_from_this()),
boost::asio::placeholders::error)));
uint32 ourTime = theApp->getOPs().getNetworkTimeNC();
@@ -862,7 +998,7 @@ static void checkTransaction(Job&, int flags, SerializedTransaction::pointer stx
#endif
}
void Peer::recvTransaction(ripple::TMTransaction& packet, ScopedLock& MasterLockHolder)
void PeerImp::recvTransaction(ripple::TMTransaction& packet, ScopedLock& MasterLockHolder)
{
MasterLockHolder.unlock();
Transaction::pointer tx;
@@ -909,9 +1045,10 @@ void Peer::recvTransaction(ripple::TMTransaction& packet, ScopedLock& MasterLock
}
// Called from our JobQueue
static void checkPropose(Job& job, boost::shared_ptr<ripple::TMProposeSet> packet,
LedgerProposal::pointer proposal, uint256 consensusLCL, RippleAddress nodePublic, boost::weak_ptr<Peer> peer)
{ // Called from our JobQueue
{
bool sigGood = false;
bool isTrusted = (job.getType() == jtPROPOSAL_t);
@@ -922,7 +1059,8 @@ static void checkPropose(Job& job, boost::shared_ptr<ripple::TMProposeSet> packe
uint256 prevLedger;
if (set.has_previousledger())
{ // proposal includes a previous ledger
{
// proposal includes a previous ledger
WriteLog (lsTRACE, Peer) << "proposal with previous ledger";
memcpy(prevLedger.begin(), set.previousledger().data(), 256 / 8);
if (!proposal->checkSign(set.signature()))
@@ -968,7 +1106,7 @@ static void checkPropose(Job& job, boost::shared_ptr<ripple::TMProposeSet> packe
WriteLog (lsDEBUG, Peer) << "Not relaying untrusted proposal";
}
void Peer::recvPropose(const boost::shared_ptr<ripple::TMProposeSet>& packet)
void PeerImp::recvPropose(const boost::shared_ptr<ripple::TMProposeSet>& packet)
{
assert(packet);
ripple::TMProposeSet& set = *packet;
@@ -1028,7 +1166,7 @@ void Peer::recvPropose(const boost::shared_ptr<ripple::TMProposeSet>& packet)
mNodePublic, boost::weak_ptr<Peer>(shared_from_this())));
}
void Peer::recvHaveTxSet(ripple::TMHaveTransactionSet& packet)
void PeerImp::recvHaveTxSet(ripple::TMHaveTransactionSet& packet)
{
uint256 hashes;
if (packet.hash().size() != (256 / 8))
@@ -1082,7 +1220,7 @@ static void checkValidation(Job&, SerializedValidation::pointer val, uint256 sig
#endif
}
void Peer::recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet, ScopedLock& MasterLockHolder)
void PeerImp::recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet, ScopedLock& MasterLockHolder)
{
MasterLockHolder.unlock();
if (packet->validation().size() < 50)
@@ -1121,22 +1259,22 @@ void Peer::recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet,
#endif
}
void Peer::recvGetValidation(ripple::TMGetValidations& packet)
void PeerImp::recvGetValidation(ripple::TMGetValidations& packet)
{
}
void Peer::recvContact(ripple::TMContact& packet)
void PeerImp::recvContact(ripple::TMContact& packet)
{
}
void Peer::recvGetContacts(ripple::TMGetContacts& packet)
void PeerImp::recvGetContacts(ripple::TMGetContacts& packet)
{
}
// Return a list of your favorite people
// TODO: filter out all the LAN peers
// TODO: filter out the peer you are talking to
void Peer::recvGetPeers(ripple::TMGetPeers& packet, ScopedLock& MasterLockHolder)
void PeerImp::recvGetPeers(ripple::TMGetPeers& packet, ScopedLock& MasterLockHolder)
{
MasterLockHolder.unlock();
std::vector<std::string> addrs;
@@ -1168,7 +1306,7 @@ void Peer::recvGetPeers(ripple::TMGetPeers& packet, ScopedLock& MasterLockHolder
}
// TODO: filter out all the LAN peers
void Peer::recvPeers(ripple::TMPeers& packet)
void PeerImp::recvPeers(ripple::TMPeers& packet)
{
for (int i = 0; i < packet.nodes().size(); ++i)
{
@@ -1188,7 +1326,7 @@ void Peer::recvPeers(ripple::TMPeers& packet)
}
}
void Peer::recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash>& ptr)
void PeerImp::recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash>& ptr)
{
ripple::TMGetObjectByHash& packet = *ptr;
@@ -1278,7 +1416,7 @@ void Peer::recvGetObjectByHash(const boost::shared_ptr<ripple::TMGetObjectByHash
}
}
void Peer::recvPing(ripple::TMPing& packet)
void PeerImp::recvPing(ripple::TMPing& packet)
{
if (packet.type() == ripple::TMPing::ptPING)
{
@@ -1291,23 +1429,23 @@ void Peer::recvPing(ripple::TMPing& packet)
}
}
void Peer::recvErrorMessage(ripple::TMErrorMsg& packet)
void PeerImp::recvErrorMessage(ripple::TMErrorMsg& packet)
{
}
void Peer::recvSearchTransaction(ripple::TMSearchTransaction& packet)
void PeerImp::recvSearchTransaction(ripple::TMSearchTransaction& packet)
{
}
void Peer::recvGetAccount(ripple::TMGetAccount& packet)
void PeerImp::recvGetAccount(ripple::TMGetAccount& packet)
{
}
void Peer::recvAccount(ripple::TMAccount& packet)
void PeerImp::recvAccount(ripple::TMAccount& packet)
{
}
void Peer::recvProofWork(ripple::TMProofWork& packet)
void PeerImp::recvProofWork(ripple::TMProofWork& packet)
{
if (packet.has_response())
{ // this is an answer to a proof of work we requested
@@ -1357,8 +1495,10 @@ void Peer::recvProofWork(ripple::TMProofWork& packet)
return;
}
theApp->getJobQueue().addJob(jtPROOFWORK, "recvProof->doProof",
BIND_TYPE(&Peer::doProofOfWork, P_1, boost::weak_ptr<Peer>(shared_from_this()), pow));
theApp->getJobQueue ().addJob (
jtPROOFWORK,
"recvProof->doProof",
BIND_TYPE (&PeerImp::doProofOfWork, P_1, boost::weak_ptr <Peer> (shared_from_this()), pow));
return;
}
@@ -1366,7 +1506,7 @@ void Peer::recvProofWork(ripple::TMProofWork& packet)
WriteLog (lsINFO, Peer) << "Received in valid proof of work object from peer";
}
void Peer::recvStatus(ripple::TMStatusChange& packet)
void PeerImp::recvStatus(ripple::TMStatusChange& packet)
{
WriteLog (lsTRACE, Peer) << "Received status change from peer " << getIP();
if (!packet.has_networktime())
@@ -1416,7 +1556,7 @@ void Peer::recvStatus(ripple::TMStatusChange& packet)
mMaxLedger = packet.lastseq();
}
void Peer::recvGetLedger(ripple::TMGetLedger& packet, ScopedLock& MasterLockHolder)
void PeerImp::recvGetLedger(ripple::TMGetLedger& packet, ScopedLock& MasterLockHolder)
{
SHAMap::pointer map;
ripple::TMLedgerData reply;
@@ -1663,7 +1803,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet, ScopedLock& MasterLockHold
sendPacket(oPacket, true);
}
void Peer::recvLedger(const boost::shared_ptr<ripple::TMLedgerData>& packet_ptr, ScopedLock& MasterLockHolder)
void PeerImp::recvLedger(const boost::shared_ptr<ripple::TMLedgerData>& packet_ptr, ScopedLock& MasterLockHolder)
{
MasterLockHolder.unlock();
ripple::TMLedgerData& packet = *packet_ptr;
@@ -1730,7 +1870,7 @@ void Peer::recvLedger(const boost::shared_ptr<ripple::TMLedgerData>& packet_ptr,
punishPeer(LT_UnwantedData);
}
bool Peer::hasLedger(const uint256& hash, uint32 seq) const
bool PeerImp::hasLedger(const uint256& hash, uint32 seq) const
{
if ((seq != 0) && (seq >= mMinLedger) && (seq <= mMaxLedger))
return true;
@@ -1740,7 +1880,7 @@ bool Peer::hasLedger(const uint256& hash, uint32 seq) const
return false;
}
void Peer::addLedger(const uint256& hash)
void PeerImp::addLedger(const uint256& hash)
{
BOOST_FOREACH(const uint256& ledger, mRecentLedgers)
if (ledger == hash)
@@ -1750,7 +1890,7 @@ void Peer::addLedger(const uint256& hash)
mRecentLedgers.push_back(hash);
}
bool Peer::hasTxSet(const uint256& hash) const
bool PeerImp::hasTxSet(const uint256& hash) const
{
BOOST_FOREACH(const uint256& set, mRecentTxSets)
if (set == hash)
@@ -1758,7 +1898,7 @@ bool Peer::hasTxSet(const uint256& hash) const
return false;
}
void Peer::addTxSet(const uint256& hash)
void PeerImp::addTxSet(const uint256& hash)
{
BOOST_FOREACH(const uint256& set, mRecentTxSets)
if (set == hash)
@@ -1770,7 +1910,7 @@ void Peer::addTxSet(const uint256& hash)
// Get session information we can sign to prevent man in the middle attack.
// (both sides get the same information, neither side controls it)
void Peer::getSessionCookie(std::string& strDst)
void PeerImp::getSessionCookie(std::string& strDst)
{
SSL* ssl = mSocketSsl.native_handle();
if (!ssl) throw std::runtime_error("No underlying connection");
@@ -1797,7 +1937,7 @@ void Peer::getSessionCookie(std::string& strDst)
strDst.assign((char *) &sha1[0], sizeof(sha1));
}
void Peer::sendHello()
void PeerImp::sendHello()
{
std::string strCookie;
std::vector<unsigned char> vchSig;
@@ -1832,7 +1972,7 @@ void Peer::sendHello()
sendPacket(packet, true);
}
void Peer::sendGetPeers()
void PeerImp::sendGetPeers()
{
// Ask peer for known other peers.
ripple::TMGetPeers getPeers;
@@ -1844,7 +1984,7 @@ void Peer::sendGetPeers()
sendPacket(packet, true);
}
void Peer::punishPeer(LoadType l)
void PeerImp::punishPeer(LoadType l)
{
if (theApp->getLoadManager().adjust(mLoad, l))
{
@@ -1852,7 +1992,7 @@ void Peer::punishPeer(LoadType l)
}
}
void Peer::doProofOfWork(Job&, boost::weak_ptr<Peer> peer, ProofOfWork::pointer pow)
void PeerImp::doProofOfWork(Job&, boost::weak_ptr <Peer> peer, ProofOfWork::pointer pow)
{
if (peer.expired())
return;
@@ -1879,7 +2019,7 @@ void Peer::doProofOfWork(Job&, boost::weak_ptr<Peer> peer, ProofOfWork::pointer
}
}
void Peer::doFetchPack(const boost::shared_ptr<ripple::TMGetObjectByHash>& packet)
void PeerImp::doFetchPack(const boost::shared_ptr<ripple::TMGetObjectByHash>& packet)
{
if (theApp->getFeeTrack().isLoaded())
{
@@ -1921,12 +2061,12 @@ void Peer::doFetchPack(const boost::shared_ptr<ripple::TMGetObjectByHash>& packe
boost::weak_ptr<Peer>(shared_from_this()), packet, wantLedger, haveLedger, UptimeTimer::getInstance().getElapsedSeconds ()));
}
bool Peer::hasProto(int version)
bool PeerImp::hasProto(int version)
{
return mHello.has_protoversion() && (mHello.protoversion() >= version);
}
Json::Value Peer::getJson()
Json::Value PeerImp::getJson()
{
Json::Value ret(Json::objectValue);
@@ -1978,4 +2118,21 @@ Json::Value Peer::getJson()
return ret;
}
//------------------------------------------------------------------------------
Peer::pointer Peer::New (boost::asio::io_service& io_service,
boost::asio::ssl::context& ctx,
uint64 id,
bool inbound)
{
return Peer::pointer (new PeerImp (io_service, ctx, id, inbound));
}
void Peer::punishPeer(const boost::weak_ptr<Peer>& wp, LoadType l)
{
Peer::pointer p = wp.lock();
if (p)
p->punishPeer(l);
}
// vim:ts=4

View File

@@ -0,0 +1,95 @@
#ifndef __PEER__
#define __PEER__
#include <bitset>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
#include "Ledger.h"
#include "Transaction.h"
#include "LoadManager.h"
typedef std::pair <std::string,int> ipPort;
class Peer : public boost::enable_shared_from_this <Peer>
{
public:
typedef boost::shared_ptr<Peer> pointer;
typedef const boost::shared_ptr<Peer>& ref;
static int const psbGotHello = 0;
static int const psbSentHello = 1;
static int const psbInMap = 2;
static int const psbTrusted = 3;
static int const psbNoLedgers = 4;
static int const psbNoTransactions = 5;
static int const psbDownLevel = 6;
public:
static pointer New (boost::asio::io_service& io_service,
boost::asio::ssl::context& ctx,
uint64 id,
bool inbound);
// VFALCO: TODO see if this and below can be private
virtual void handleConnect (const boost::system::error_code& error,
boost::asio::ip::tcp::resolver::iterator it) = 0;
virtual std::string& getIP () = 0;
virtual std::string getDisplayName() = 0;
virtual int getPort () = 0;
virtual void setIpPort (const std::string& strIP, int iPort) = 0;
virtual boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::lowest_layer_type& getSocket() = 0;
virtual void connect (const std::string& strIp, int iPort) = 0;
virtual void connected (const boost::system::error_code& error) = 0;
virtual void detach (const char *, bool onIOStrand) = 0;
//virtual bool samePeer (Peer::ref p) = 0;
//virtual bool samePeer (const Peer& p) = 0;
virtual void sendPacket (const PackedMessage::pointer& packet, bool onStrand) = 0;
virtual void sendGetPeers () = 0;
virtual void punishPeer (LoadType) = 0;
// VFALCO: NOTE, what's with this odd parameter passing? Why the static member?
static void punishPeer (const boost::weak_ptr<Peer>&, LoadType);
virtual Json::Value getJson () = 0;
virtual bool isConnected () const = 0;
virtual bool isInbound () const = 0;
virtual bool isOutbound () const = 0;
virtual const uint256& getClosedLedgerHash () const = 0;
virtual bool hasLedger (const uint256& hash, uint32 seq) const = 0;
virtual bool hasTxSet (const uint256& hash) const = 0;
virtual uint64 getPeerId () const = 0;
virtual const RippleAddress& getNodePublic () const = 0;
virtual void cycleStatus () = 0;
virtual bool hasProto (int version) = 0;
virtual bool hasRange (uint32 uMin, uint32 uMax) = 0;
};
#endif
// vim:ts=4