Move PackedMessage to ripple_data

This commit is contained in:
Vinnie Falco
2013-06-05 06:15:26 -07:00
parent b523b6c8d4
commit 4d1bf35236
23 changed files with 405 additions and 401 deletions

View File

@@ -127,26 +127,6 @@ static void runIO(boost::asio::io_service& io)
io.run();
}
bool Application::isNew(const uint256& s)
{
return mHashRouter->addSuppression(s);
}
bool Application::isNew(const uint256& s, uint64 p)
{
return mHashRouter->addSuppressionPeer(s, p);
}
bool Application::isNew(const uint256& s, uint64 p, int& f)
{
return mHashRouter->addSuppressionPeer(s, p, f);
}
bool Application::isNewFlag(const uint256& s, int f)
{
return mHashRouter->setFlag(s, f);
}
void Application::setup()
{
mJobQueue.setThreadCount();

View File

@@ -110,7 +110,6 @@ public:
NodeCache& getTempNodeCache() { return mTempNodeCache; }
HashedObjectStore& getHashedObjectStore() { return mHashedObjectStore; }
JobQueue& getJobQueue() { return mJobQueue; }
IHashRouter& getHashRouter() { return *mHashRouter; }
boost::recursive_mutex& getMasterLock() { return mMasterLock; }
ProofOfWorkGenerator& getPowGen() { return mPOWGen; }
LoadManager& getLoadManager() { return mLoadMgr; }
@@ -118,18 +117,13 @@ public:
PeerDoor& getPeerDoor() { return *mPeerDoor; }
OrderBookDB& getOrderBookDB() { return mOrderBookDB; }
SLECache& getSLECache() { return mSLECache; }
IFeatures& getFeatureTable() { return *mFeatures; }
IFeeVote& getFeeVote() { return *mFeeVote; }
IFeatures& getFeatureTable() { return *mFeatures; }
ILoadFeeTrack& getFeeTrack() { return *mFeeTrack; }
IFeeVote& getFeeVote() { return *mFeeVote; }
IHashRouter& getHashRouter() { return *mHashRouter; }
IValidations& getValidations() { return *mValidations; }
// VFALCO: TODO, eliminate these, change callers to just call IHashRouter directly!
bool isNew(const uint256& s);
bool isNew(const uint256& s, uint64 p);
bool isNew(const uint256& s, uint64 p, int& f);
bool isNewFlag(const uint256& s, int f);
// VFALCO: TODO, Move these to the .cpp
bool running() { return mTxnDB != NULL; } // VFALCO: TODO, replace with nullptr when beast is available
bool getSystemTimeOffset(int& offset) { return mSNTPClient.getOffset(offset); }

View File

@@ -7,7 +7,6 @@
#include <boost/thread/mutex.hpp>
#include "Peer.h"
#include "PackedMessage.h"
//
// Access to the Ripple network.

View File

@@ -1574,7 +1574,7 @@ uint32 Ledger::roundCloseTime(uint32 closeTime, uint32 closeResolution)
void Ledger::pendSave(bool fromConsensus)
{
if (!fromConsensus && !theApp->isNewFlag(getHash(), SF_SAVED))
if (!fromConsensus && !theApp->getHashRouter ().setFlag (getHash(), SF_SAVED))
return;
assert(isImmutable());

View File

@@ -188,7 +188,7 @@ void LedgerConsensus::checkOurValidation()
(mPreviousLedger->getHash(), theApp->getOPs().getValidationTimeNC(), mValPublic, false);
v->setTrusted();
v->sign(signingHash, mValPrivate);
theApp->isNew(signingHash);
theApp->getHashRouter ().addSuppression (signingHash);
theApp->getValidations().addValidation(v, "localMissing");
std::vector<unsigned char> validation = v->getSigned();
ripple::TMValidation val;
@@ -561,6 +561,7 @@ void LedgerConsensus::stateAccepted()
endConsensus();
}
// VFALCO: TODO implement shutdown without a naked global
extern volatile bool doShutdown;
void LedgerConsensus::timerEntry()
@@ -858,7 +859,7 @@ void LedgerConsensus::addDisputedTransaction(const uint256& txID, const std::vec
txn->setVote(pit.first, cit->second->hasItem(txID));
}
if (theApp->isNewFlag(txID, SF_RELAYED))
if (theApp->getHashRouter ().setFlag (txID, SF_RELAYED))
{
ripple::TMTransaction msg;
msg.set_rawtransaction(&(tx.front()), tx.size());
@@ -1033,7 +1034,7 @@ int LedgerConsensus::applyTransaction(TransactionEngine& engine, SerializedTrans
TransactionEngineParams parms = openLedger ? tapOPEN_LEDGER : tapNONE;
if (retryAssured)
parms = static_cast<TransactionEngineParams>(parms | tapRETRY);
if (theApp->isNewFlag(txn->getTransactionID(), SF_SIGGOOD))
if (theApp->getHashRouter ().setFlag (txn->getTransactionID(), SF_SIGGOOD))
parms = static_cast<TransactionEngineParams>(parms | tapNO_CHECK_SIGN);
WriteLog (lsDEBUG, LedgerConsensus) << "TXN " << txn->getTransactionID()
@@ -1223,7 +1224,7 @@ void LedgerConsensus::accept(SHAMap::ref set, LoadEvent::pointer)
}
v->sign(signingHash, mValPrivate);
v->setTrusted();
theApp->isNew(signingHash); // suppress it if we receive it
theApp->getHashRouter ().addSuppression (signingHash); // suppress it if we receive it
theApp->getValidations().addValidation(v, "local");
theApp->getOPs().setLastValidation(v);
std::vector<unsigned char> validation = v->getSigned();

View File

@@ -27,7 +27,16 @@ class TransactionAcquire
public:
typedef boost::shared_ptr<TransactionAcquire> pointer;
protected:
public:
TransactionAcquire(const uint256& hash);
virtual ~TransactionAcquire() { ; }
SHAMap::ref getMap() { return mMap; }
SMAddNode takeNodes(const std::list<SHAMapNode>& IDs,
const std::list< std::vector<unsigned char> >& data, Peer::ref);
private:
SHAMap::pointer mMap;
bool mHaveRoot;
@@ -37,26 +46,11 @@ protected:
void done();
void trigger(Peer::ref);
boost::weak_ptr<PeerSet> pmDowncast();
public:
TransactionAcquire(const uint256& hash);
virtual ~TransactionAcquire() { ; }
SHAMap::ref getMap() { return mMap; }
SMAddNode takeNodes(const std::list<SHAMapNode>& IDs,
const std::list< std::vector<unsigned char> >& data, Peer::ref);
};
// A transaction that may be disputed
class LCTransaction
{ // A transaction that may be disputed
protected:
uint256 mTransactionID;
int mYays, mNays;
bool mOurVote;
Serializer transaction;
boost::unordered_map<uint160, bool> mVotes;
{
public:
typedef boost::shared_ptr<LCTransaction> pointer;
@@ -73,6 +67,13 @@ public:
bool updateVote(int percentTime, bool proposing);
Json::Value getJson();
private:
uint256 mTransactionID;
int mYays, mNays;
bool mOurVote;
Serializer transaction;
boost::unordered_map<uint160, bool> mVotes;
};
enum LCState
@@ -85,76 +86,6 @@ enum LCState
class LedgerConsensus : public boost::enable_shared_from_this<LedgerConsensus>, IS_INSTANCE(LedgerConsensus)
{
protected:
LCState mState;
uint32 mCloseTime; // The wall time this ledger closed
uint256 mPrevLedgerHash, mNewLedgerHash;
Ledger::pointer mPreviousLedger;
LedgerAcquire::pointer mAcquiringLedger;
LedgerProposal::pointer mOurPosition;
RippleAddress mValPublic, mValPrivate;
bool mProposing, mValidating, mHaveCorrectLCL, mConsensusFail;
int mCurrentMSeconds, mClosePercent, mCloseResolution;
bool mHaveCloseTimeConsensus;
boost::posix_time::ptime mConsensusStartTime;
int mPreviousProposers;
int mPreviousMSeconds;
// Convergence tracking, trusted peers indexed by hash of public key
boost::unordered_map<uint160, LedgerProposal::pointer> mPeerPositions;
// Transaction Sets, indexed by hash of transaction tree
boost::unordered_map<uint256, SHAMap::pointer> mAcquired;
boost::unordered_map<uint256, TransactionAcquire::pointer> mAcquiring;
// Peer sets
boost::unordered_map<uint256, std::vector< boost::weak_ptr<Peer> > > mPeerData;
// Disputed transactions
boost::unordered_map<uint256, LCTransaction::pointer> mDisputes;
// Close time estimates
std::map<uint32, int> mCloseTimes;
// nodes that have bowed out of this consensus process
boost::unordered_set<uint160> mDeadNodes;
// final accept logic
void accept(SHAMap::ref txSet, LoadEvent::pointer);
void weHave(const uint256& id, Peer::ref avoidPeer);
void startAcquiring(TransactionAcquire::pointer);
SHAMap::pointer find(const uint256& hash);
void createDisputes(SHAMap::ref, SHAMap::ref);
void addDisputedTransaction(const uint256&, const std::vector<unsigned char>& transaction);
void adjustCount(SHAMap::ref map, const std::vector<uint160>& peers);
void propose();
void addPosition(LedgerProposal&, bool ours);
void removePosition(LedgerProposal&, bool ours);
void sendHaveTxSet(const uint256& set, bool direct);
void applyTransactions(SHAMap::ref transactionSet, Ledger::ref targetLedger,
Ledger::ref checkLedger, CanonicalTXSet& failedTransactions, bool openLgr);
int applyTransaction(TransactionEngine& engine, SerializedTransaction::ref txn, Ledger::ref targetLedger,
bool openLgr, bool retryAssured);
uint32 roundCloseTime(uint32 closeTime);
// manipulating our own position
void statusChange(ripple::NodeEvent, Ledger& ledger);
void takeInitialPosition(Ledger& initialLedger);
void updateOurPositions();
void playbackProposals();
int getThreshold();
void closeLedger();
void checkOurValidation();
void beginAccept(bool synchronous);
void endConsensus();
public:
LedgerConsensus(const uint256& prevLCLHash, Ledger::ref previousLedger, uint32 closeTime);
@@ -193,6 +124,77 @@ public:
// test/debug
void simulate();
private:
// final accept logic
void accept(SHAMap::ref txSet, LoadEvent::pointer);
void weHave(const uint256& id, Peer::ref avoidPeer);
void startAcquiring(TransactionAcquire::pointer);
SHAMap::pointer find(const uint256& hash);
void createDisputes(SHAMap::ref, SHAMap::ref);
void addDisputedTransaction(const uint256&, const std::vector<unsigned char>& transaction);
void adjustCount(SHAMap::ref map, const std::vector<uint160>& peers);
void propose();
void addPosition(LedgerProposal&, bool ours);
void removePosition(LedgerProposal&, bool ours);
void sendHaveTxSet(const uint256& set, bool direct);
void applyTransactions(SHAMap::ref transactionSet, Ledger::ref targetLedger,
Ledger::ref checkLedger, CanonicalTXSet& failedTransactions, bool openLgr);
int applyTransaction(TransactionEngine& engine, SerializedTransaction::ref txn, Ledger::ref targetLedger,
bool openLgr, bool retryAssured);
uint32 roundCloseTime(uint32 closeTime);
// manipulating our own position
void statusChange(ripple::NodeEvent, Ledger& ledger);
void takeInitialPosition(Ledger& initialLedger);
void updateOurPositions();
void playbackProposals();
int getThreshold();
void closeLedger();
void checkOurValidation();
void beginAccept(bool synchronous);
void endConsensus();
private:
LCState mState;
uint32 mCloseTime; // The wall time this ledger closed
uint256 mPrevLedgerHash, mNewLedgerHash;
Ledger::pointer mPreviousLedger;
LedgerAcquire::pointer mAcquiringLedger;
LedgerProposal::pointer mOurPosition;
RippleAddress mValPublic, mValPrivate;
bool mProposing, mValidating, mHaveCorrectLCL, mConsensusFail;
int mCurrentMSeconds, mClosePercent, mCloseResolution;
bool mHaveCloseTimeConsensus;
boost::posix_time::ptime mConsensusStartTime;
int mPreviousProposers;
int mPreviousMSeconds;
// Convergence tracking, trusted peers indexed by hash of public key
boost::unordered_map<uint160, LedgerProposal::pointer> mPeerPositions;
// Transaction Sets, indexed by hash of transaction tree
boost::unordered_map<uint256, SHAMap::pointer> mAcquired;
boost::unordered_map<uint256, TransactionAcquire::pointer> mAcquiring;
// Peer sets
boost::unordered_map<uint256, std::vector< boost::weak_ptr<Peer> > > mPeerData;
// Disputed transactions
boost::unordered_map<uint256, LCTransaction::pointer> mDisputes;
// Close time estimates
std::map<uint32, int> mCloseTimes;
// nodes that have bowed out of this consensus process
boost::unordered_set<uint160> mDeadNodes;
};

View File

@@ -118,7 +118,7 @@ Ledger::pointer LedgerMaster::closeLedger(bool recover)
{
TransactionEngineParams tepFlags = tapOPEN_LEDGER;
if (theApp->isNew(it->first.getTXID(), SF_SIGGOOD))
if (theApp->getHashRouter ().addSuppressionPeer (it->first.getTXID(), SF_SIGGOOD))
tepFlags = static_cast<TransactionEngineParams>(tepFlags | tapNO_CHECK_SIGN);
bool didApply;

View File

@@ -5,6 +5,7 @@
#include <boost/thread/mutex.hpp>
// VFALCO: TODO replace LT_ with loadType in constants
enum LoadType
{ // types of load that can be placed on the server
@@ -44,8 +45,9 @@ public:
LoadCost(LoadType t, int cost, int cat) : mType(t), mCost(cost), mCategories(cat) { ; }
};
// a single endpoint that can impose load
class LoadSource
{ // a single endpoint that can impose load
{
private:
// VFALCO: Make this not a friend
friend class LoadManager;
@@ -97,9 +99,9 @@ private:
bool mLogged;
};
// a collection of load sources
class LoadManager
{ // a collection of load sources
{
public:
LoadManager(int creditRate = 100, int creditLimit = 500, int debitWarn = -500, int debitLimit = -1000);

View File

@@ -167,7 +167,7 @@ void NetworkOPs::submitTransaction(Job&, SerializedTransaction::pointer iTrans,
uint256 suppress = trans->getTransactionID();
int flags;
if (theApp->isNew(suppress, 0, flags) && ((flags & SF_RETRY) != 0))
if (theApp->getHashRouter ().addSuppressionPeer (suppress, 0, flags) && ((flags & SF_RETRY) != 0))
{
WriteLog (lsWARNING, NetworkOPs) << "Redundant transactions submitted";
return;
@@ -186,10 +186,10 @@ void NetworkOPs::submitTransaction(Job&, SerializedTransaction::pointer iTrans,
if (!trans->checkSign())
{
WriteLog (lsWARNING, NetworkOPs) << "Submitted transaction has bad signature";
theApp->isNewFlag(suppress, SF_BAD);
theApp->getHashRouter ().setFlag (suppress, SF_BAD);
return;
}
theApp->isNewFlag(suppress, SF_SIGGOOD);
theApp->getHashRouter ().setFlag (suppress, SF_SIGGOOD);
}
catch (...)
{
@@ -260,9 +260,9 @@ void NetworkOPs::runTransactionQueue()
dbtx->setResult(r);
if (isTemMalformed(r)) // malformed, cache bad
theApp->isNewFlag(txn->getID(), SF_BAD);
theApp->getHashRouter ().setFlag (txn->getID(), SF_BAD);
else if(isTelLocal(r) || isTerRetry(r)) // can be retried
theApp->isNewFlag(txn->getID(), SF_RETRY);
theApp->getHashRouter ().setFlag (txn->getID(), SF_RETRY);
if (isTerRetry(r))
@@ -333,10 +333,10 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans,
WriteLog (lsINFO, NetworkOPs) << "Transaction has bad signature";
trans->setStatus(INVALID);
trans->setResult(temBAD_SIGNATURE);
theApp->isNewFlag(trans->getID(), SF_BAD);
theApp->getHashRouter ().setFlag (trans->getID(), SF_BAD);
return trans;
}
theApp->isNewFlag(trans->getID(), SF_SIGGOOD);
theApp->getHashRouter ().setFlag (trans->getID(), SF_SIGGOOD);
}
boost::recursive_mutex::scoped_lock sl(theApp->getMasterLock());
@@ -347,9 +347,9 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans,
trans->setResult(r);
if (isTemMalformed(r)) // malformed, cache bad
theApp->isNewFlag(trans->getID(), SF_BAD);
theApp->getHashRouter ().setFlag (trans->getID(), SF_BAD);
else if(isTelLocal(r) || isTerRetry(r)) // can be retried
theApp->isNewFlag(trans->getID(), SF_RETRY);
theApp->getHashRouter ().setFlag (trans->getID(), SF_RETRY);
#ifdef DEBUG
if (r != tesSUCCESS)

View File

@@ -1,53 +0,0 @@
#include "PackedMessage.h"
void PackedMessage::encodeHeader(unsigned size, int type)
{
assert(mBuffer.size() >= HEADER_SIZE);
mBuffer[0] = static_cast<boost::uint8_t>((size >> 24) & 0xFF);
mBuffer[1] = static_cast<boost::uint8_t>((size >> 16) & 0xFF);
mBuffer[2] = static_cast<boost::uint8_t>((size >> 8) & 0xFF);
mBuffer[3] = static_cast<boost::uint8_t>(size & 0xFF);
mBuffer[4] = static_cast<boost::uint8_t>((type >> 8) & 0xFF);
mBuffer[5] = static_cast<boost::uint8_t>(type & 0xFF);
}
PackedMessage::PackedMessage(const ::google::protobuf::Message &message, int type)
{
unsigned msg_size = message.ByteSize();
assert(msg_size);
mBuffer.resize(HEADER_SIZE + msg_size);
encodeHeader(msg_size, type);
if (msg_size)
{
message.SerializeToArray(&mBuffer[HEADER_SIZE], msg_size);
#ifdef DEBUG
// std::cerr << "PackedMessage: type=" << type << ", datalen=" << msg_size << std::endl;
#endif
}
}
bool PackedMessage::operator == (const PackedMessage& other)
{
return (mBuffer == other.mBuffer);
}
unsigned PackedMessage::getLength(std::vector<uint8_t>& buf)
{
if(buf.size() < HEADER_SIZE)
return 0;
int ret = buf[0];
ret <<= 8; ret |= buf[1]; ret <<= 8; ret |= buf[2]; ret <<= 8; ret |= buf[3];
return ret;
}
int PackedMessage::getType(std::vector<uint8_t>& buf)
{
if(buf.size() < HEADER_SIZE)
return 0;
int ret = buf[4];
ret <<= 8; ret |= buf[5];
return ret;
}

View File

@@ -1,74 +0,0 @@
//
// packaging of messages into length/type-prepended buffers
// ready for transmission.
#ifndef PACKEDMESSAGE_H
#define PACKEDMESSAGE_H
#include <string>
#include <cassert>
#include <vector>
#include <cstdio>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/cstdint.hpp>
// The header size for packed messages
// len(4)+type(2)
const unsigned HEADER_SIZE = 6;
// PackedMessage implements simple "packing" of protocol buffers Messages into
// a string prepended by a header specifying the message length.
// MessageType should be a Message class generated by the protobuf compiler.
//
class PackedMessage : public boost::enable_shared_from_this<PackedMessage>
{
std::vector<uint8_t> mBuffer;
// Encodes the size and type into a header at the beginning of buf
//
void encodeHeader(unsigned size, int type);
public:
typedef boost::shared_ptr< ::google::protobuf::Message > MessagePointer;
typedef boost::shared_ptr<PackedMessage> pointer;
PackedMessage(const ::google::protobuf::Message& message, int type);
std::vector<uint8_t>& getBuffer() { return(mBuffer); }
static unsigned getLength(std::vector<uint8_t>& buf);
static int getType(std::vector<uint8_t>& buf);
bool operator == (const PackedMessage& other);
/*
void setMsg(MessagePointer msg, int type);
MessagePointer getMsg();
// Pack the message into the given data_buffer. The buffer is resized to
// exactly fit the message.
// Return false in case of an error, true if successful.
//
bool pack(data_buffer& buf) const;
// Given a buffer with the first HEADER_SIZE bytes representing the header,
// decode the header and return the message length. Return 0 in case of
// an error.
//
unsigned decodeHeader(const data_buffer& buf) const;
// Unpack and store a message from the given packed buffer.
// Return true if unpacking successful, false otherwise.
//
bool unpack(const data_buffer& buf);
*/
};
#endif /* PACKEDMESSAGE_H */

View File

@@ -345,7 +345,7 @@ void Peer::startReadHeader()
if (!mDetaching)
{
mReadbuf.clear();
mReadbuf.resize(HEADER_SIZE);
mReadbuf.resize(PackedMessage::kHeaderBytes);
boost::asio::async_read(mSocketSsl, boost::asio::buffer(mReadbuf), mIOStrand.wrap(
boost::bind(&Peer::handleReadHeader, shared_from_this(), boost::asio::placeholders::error)));
@@ -354,15 +354,15 @@ void Peer::startReadHeader()
void Peer::startReadBody(unsigned msg_len)
{
// m_readbuf already contains the header in its first HEADER_SIZE
// m_readbuf already contains the header in its first PackedMessage::kHeaderBytes
// bytes. Expand it to fit in the body as well, and start async
// read into the body.
if (!mDetaching)
{
mReadbuf.resize(HEADER_SIZE + msg_len);
mReadbuf.resize(PackedMessage::kHeaderBytes + msg_len);
boost::asio::async_read(mSocketSsl, boost::asio::buffer(&mReadbuf[HEADER_SIZE], msg_len),
boost::asio::async_read(mSocketSsl, boost::asio::buffer(&mReadbuf[PackedMessage::kHeaderBytes], msg_len),
mIOStrand.wrap(boost::bind(&Peer::handleReadBody, shared_from_this(), boost::asio::placeholders::error)));
}
}
@@ -430,7 +430,7 @@ void Peer::processReadBuffer()
{ // must not hold peer lock
int type = PackedMessage::getType(mReadbuf);
#ifdef DEBUG
// std::cerr << "PRB(" << type << "), len=" << (mReadbuf.size()-HEADER_SIZE) << std::endl;
// std::cerr << "PRB(" << type << "), len=" << (mReadbuf.size()-PackedMessage::kHeaderBytes) << std::endl;
#endif
// std::cerr << "Peer::processReadBuffer: " << mIpPort.first << " " << mIpPort.second << std::endl;
@@ -453,7 +453,7 @@ void Peer::processReadBuffer()
{
event->reName("Peer::hello");
ripple::TMHello msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvHello(msg);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -464,7 +464,7 @@ void Peer::processReadBuffer()
{
event->reName("Peer::errormessage");
ripple::TMErrorMsg msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvErrorMessage(msg);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -475,7 +475,7 @@ void Peer::processReadBuffer()
{
event->reName("Peer::ping");
ripple::TMPing msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvPing(msg);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -486,7 +486,7 @@ void Peer::processReadBuffer()
{
event->reName("Peer::getcontacts");
ripple::TMGetContacts msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvGetContacts(msg);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -498,7 +498,7 @@ void Peer::processReadBuffer()
event->reName("Peer::contact");
ripple::TMContact msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvContact(msg);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -510,7 +510,7 @@ void Peer::processReadBuffer()
event->reName("Peer::getpeers");
ripple::TMGetPeers msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvGetPeers(msg, sl);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -522,7 +522,7 @@ void Peer::processReadBuffer()
event->reName("Peer::peers");
ripple::TMPeers msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvPeers(msg);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -533,7 +533,7 @@ void Peer::processReadBuffer()
{
event->reName("Peer::searchtransaction");
ripple::TMSearchTransaction msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvSearchTransaction(msg);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -544,7 +544,7 @@ void Peer::processReadBuffer()
{
event->reName("Peer::getaccount");
ripple::TMGetAccount msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvGetAccount(msg);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -555,7 +555,7 @@ void Peer::processReadBuffer()
{
event->reName("Peer::account");
ripple::TMAccount msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvAccount(msg);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -566,7 +566,7 @@ void Peer::processReadBuffer()
{
event->reName("Peer::transaction");
ripple::TMTransaction msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvTransaction(msg, sl);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -577,7 +577,7 @@ void Peer::processReadBuffer()
{
event->reName("Peer::statuschange");
ripple::TMStatusChange msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvStatus(msg);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -588,7 +588,7 @@ void Peer::processReadBuffer()
{
event->reName("Peer::propose");
boost::shared_ptr<ripple::TMProposeSet> msg = boost::make_shared<ripple::TMProposeSet>();
if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg->ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvPropose(msg);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -599,7 +599,7 @@ void Peer::processReadBuffer()
{
event->reName("Peer::getledger");
ripple::TMGetLedger msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvGetLedger(msg, sl);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -610,7 +610,7 @@ void Peer::processReadBuffer()
{
event->reName("Peer::ledgerdata");
boost::shared_ptr<ripple::TMLedgerData> msg = boost::make_shared<ripple::TMLedgerData>();
if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg->ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvLedger(msg, sl);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -621,7 +621,7 @@ void Peer::processReadBuffer()
{
event->reName("Peer::haveset");
ripple::TMHaveTransactionSet msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvHaveTxSet(msg);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -632,7 +632,7 @@ void Peer::processReadBuffer()
{
event->reName("Peer::validation");
boost::shared_ptr<ripple::TMValidation> msg = boost::make_shared<ripple::TMValidation>();
if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg->ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvValidation(msg, sl);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -642,7 +642,7 @@ void Peer::processReadBuffer()
case ripple::mtGET_VALIDATION:
{
ripple::TM msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recv(msg);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -654,7 +654,7 @@ void Peer::processReadBuffer()
{
event->reName("Peer::getobjects");
boost::shared_ptr<ripple::TMGetObjectByHash> msg = boost::make_shared<ripple::TMGetObjectByHash>();
if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg->ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvGetObjectByHash(msg);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -665,7 +665,7 @@ void Peer::processReadBuffer()
{
event->reName("Peer::proofofwork");
ripple::TMProofWork msg;
if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE))
if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes))
recvProofWork(msg);
else
WriteLog (lsWARNING, Peer) << "parse error: " << type;
@@ -876,7 +876,7 @@ void Peer::recvTransaction(ripple::TMTransaction& packet, ScopedLock& MasterLock
SerializedTransaction::pointer stx = boost::make_shared<SerializedTransaction>(boost::ref(sit));
int flags;
if (!theApp->isNew(stx->getTransactionID(), mPeerId, flags))
if (! theApp->getHashRouter ().addSuppressionPeer (stx->getTransactionID(), mPeerId, flags))
{ // we have seen this transaction recently
if ((flags & SF_BAD) != 0)
{
@@ -1004,7 +1004,7 @@ void Peer::recvPropose(const boost::shared_ptr<ripple::TMProposeSet>& packet)
s.add256(prevLedger);
uint256 suppression = s.getSHA512Half();
if (!theApp->isNew(suppression, mPeerId))
if (! theApp->getHashRouter ().addSuppressionPeer (suppression, mPeerId))
{
WriteLog (lsTRACE, Peer) << "Received duplicate proposal from peer " << mPeerId;
return;
@@ -1102,7 +1102,7 @@ void Peer::recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet,
SerializedValidation::pointer val = boost::make_shared<SerializedValidation>(boost::ref(sit), false);
uint256 signingHash = val->getSigningHash();
if (!theApp->isNew(signingHash, mPeerId))
if (! theApp->getHashRouter ().addSuppressionPeer (signingHash, mPeerId))
{
WriteLog (lsTRACE, Peer) << "Validation is duplicate";
return;

View File

@@ -8,7 +8,6 @@
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
#include "PackedMessage.h"
#include "Ledger.h"
#include "Transaction.h"
#include "ProofOfWork.h"
@@ -18,16 +17,73 @@ typedef std::pair<std::string,int> ipPort;
DEFINE_INSTANCE(Peer);
class Peer : public boost::enable_shared_from_this<Peer>, public IS_INSTANCE(Peer)
class Peer : public boost::enable_shared_from_this <Peer>
, public IS_INSTANCE (Peer)
{
public:
typedef boost::shared_ptr<Peer> pointer;
typedef const boost::shared_ptr<Peer>& ref;
static const int psbGotHello = 0, psbSentHello = 1, psbInMap = 2, psbTrusted = 3;
static const int psbNoLedgers = 4, psbNoTransactions = 5, psbDownLevel = 6;
static int const psbGotHello = 0;
static int const psbSentHello = 1;
static int const psbInMap = 2;
static int const psbTrusted = 3;
static int const psbNoLedgers = 4;
static int const psbNoTransactions = 5;
static int const psbDownLevel = 6;
void handleConnect(const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it);
public:
//bool operator == (const Peer& other);
void handleConnect (const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it);
std::string& getIP() { return mIpPort.first; }
std::string getDisplayName() { return mCluster ? mNodeName : mIpPort.first; }
int getPort() { return mIpPort.second; }
void setIpPort(const std::string& strIP, int iPort);
static pointer create(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 id, bool inbound)
{
return pointer(new Peer(io_service, ctx, id, inbound));
}
boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::lowest_layer_type& getSocket()
{
return mSocketSsl.lowest_layer();
}
void connect(const std::string& strIp, int iPort);
void connected(const boost::system::error_code& error);
void detach(const char *, bool onIOStrand);
bool samePeer(Peer::ref p) { return samePeer(*p); }
bool samePeer(const Peer& p) { return this == &p; }
void sendPacket(const PackedMessage::pointer& packet, bool onStrand);
void sendLedgerProposal(Ledger::ref ledger);
void sendFullLedger(Ledger::ref ledger);
void sendGetFullLedger(uint256& hash);
void sendGetPeers();
void punishPeer(LoadType);
// VFALCO: NOTE, what's with this odd parameter passing? Why the static member?
static void punishPeer(const boost::weak_ptr<Peer>&, LoadType);
Json::Value getJson();
bool isConnected() const { return mHelloed && !mDetaching; }
bool isInbound() const { return mInbound; }
bool isOutbound() const { return !mInbound; }
const uint256& getClosedLedgerHash() const { return mClosedLedgerHash; }
bool hasLedger(const uint256& hash, uint32 seq) const;
bool hasTxSet(const uint256& hash) const;
uint64 getPeerId() const { return mPeerId; }
const RippleAddress& getNodePublic() const { return mNodePublic; }
void cycleStatus() { mPreviousLedgerHash = mClosedLedgerHash; mClosedLedgerHash.zero(); }
bool hasProto(int version);
bool hasRange(uint32 uMin, uint32 uMax) { return (uMin >= mMinLedger) && (uMax <= mMaxLedger); }
private:
bool mInbound; // Connection is inbound
@@ -59,7 +115,7 @@ private:
void handleVerifyTimer(const boost::system::error_code& ecResult);
void handlePingTimer(const boost::system::error_code& ecResult);
protected:
private:
boost::asio::io_service::strand mIOStrand;
std::vector<uint8_t> mReadbuf;
std::list<PackedMessage::pointer> mSendQ;
@@ -110,57 +166,8 @@ protected:
void doFetchPack(const boost::shared_ptr<ripple::TMGetObjectByHash>& packet);
// VFALCO: NOTE, why is this a static member instead of a regular member?
static void doProofOfWork(Job&, boost::weak_ptr<Peer>, ProofOfWork::pointer);
public:
//bool operator == (const Peer& other);
std::string& getIP() { return mIpPort.first; }
std::string getDisplayName() { return mCluster ? mNodeName : mIpPort.first; }
int getPort() { return mIpPort.second; }
void setIpPort(const std::string& strIP, int iPort);
static pointer create(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 id, bool inbound)
{
return pointer(new Peer(io_service, ctx, id, inbound));
}
boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::lowest_layer_type& getSocket()
{
return mSocketSsl.lowest_layer();
}
void connect(const std::string& strIp, int iPort);
void connected(const boost::system::error_code& error);
void detach(const char *, bool onIOStrand);
bool samePeer(Peer::ref p) { return samePeer(*p); }
bool samePeer(const Peer& p) { return this == &p; }
void sendPacket(const PackedMessage::pointer& packet, bool onStrand);
void sendLedgerProposal(Ledger::ref ledger);
void sendFullLedger(Ledger::ref ledger);
void sendGetFullLedger(uint256& hash);
void sendGetPeers();
void punishPeer(LoadType);
static void punishPeer(const boost::weak_ptr<Peer>&, LoadType);
Json::Value getJson();
bool isConnected() const { return mHelloed && !mDetaching; }
bool isInbound() const { return mInbound; }
bool isOutbound() const { return !mInbound; }
const uint256& getClosedLedgerHash() const { return mClosedLedgerHash; }
bool hasLedger(const uint256& hash, uint32 seq) const;
bool hasTxSet(const uint256& hash) const;
uint64 getPeerId() const { return mPeerId; }
const RippleAddress& getNodePublic() const { return mNodePublic; }
void cycleStatus() { mPreviousLedgerHash = mClosedLedgerHash; mClosedLedgerHash.zero(); }
bool hasProto(int version);
bool hasRange(uint32 uMin, uint32 uMax) { return (uMin >= mMinLedger) && (uMax <= mMaxLedger); }
};
#endif

View File

@@ -12,6 +12,11 @@ Handles incoming connections from other Peers
class PeerDoor
{
public:
PeerDoor (boost::asio::io_service& io_service);
boost::asio::ssl::context& getSSLContext() { return mCtx; }
private:
boost::asio::ip::tcp::acceptor mAcceptor;
boost::asio::ssl::context mCtx;
@@ -19,10 +24,6 @@ private:
void startListening();
void handleConnect(Peer::pointer new_connection, const boost::system::error_code& error);
public:
PeerDoor(boost::asio::io_service& io_service);
boost::asio::ssl::context& getSSLContext() { return mCtx; }
};
#endif

View File

@@ -446,8 +446,9 @@ SHAMapItem::pointer SHAMap::peekNextItem(const uint256& id, SHAMapTreeNode::TNTy
return no_item;
}
// Get a pointer to the previous item in the tree after a given item - item must be in tree
SHAMapItem::pointer SHAMap::peekPrevItem(const uint256& id)
{ // Get a pointer to the previous item in the tree after a given item - item must be in tree
{
boost::recursive_mutex::scoped_lock sl(mLock);
std::stack<SHAMapTreeNode::pointer> stack = getStack(id, true, false);
@@ -461,17 +462,25 @@ SHAMapItem::pointer SHAMap::peekPrevItem(const uint256& id)
if (node->peekItem()->getTag() < id)
return node->peekItem();
}
else for (int i = node->selectBranch(id) - 1; i >= 0; --i)
if (!node->isEmptyBranch(i))
{
node = getNode(node->getChildNodeID(i), node->getChildHash(i), false);
SHAMapTreeNode* item = firstBelow(node.get());
if (!item)
throw std::runtime_error("missing node");
return item->peekItem();
}
else
{
for (int i = node->selectBranch(id) - 1; i >= 0; --i)
{
if (!node->isEmptyBranch(i))
{
node = getNode(node->getChildNodeID(i), node->getChildHash(i), false);
SHAMapTreeNode* item = firstBelow(node.get());
if (!item)
throw std::runtime_error("missing node");
return item->peekItem();
}
}
}
}
// must be last item
// must be last item
return no_item;
}