From 4d1bf35236e8d4f4da6b73138990a9e2ec7b5fa2 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Wed, 5 Jun 2013 06:15:26 -0700 Subject: [PATCH] Move PackedMessage to ripple_data --- .../containers/ripple_TaggedCache.h | 65 ++++--- .../protocol/ripple_PackedMessage.cpp | 66 +++++++ .../protocol/ripple_PackedMessage.h | 55 ++++++ modules/ripple_data/ripple_data.cpp | 1 + modules/ripple_data/ripple_data.h | 12 +- modules/ripple_main/ripple_main.cpp | 2 - newcoin.vcxproj | 7 + newcoin.vcxproj.filters | 6 + src/cpp/ripple/Application.cpp | 20 -- src/cpp/ripple/Application.h | 12 +- src/cpp/ripple/ConnectionPool.h | 1 - src/cpp/ripple/Ledger.cpp | 2 +- src/cpp/ripple/LedgerConsensus.cpp | 9 +- src/cpp/ripple/LedgerConsensus.h | 178 +++++++++--------- src/cpp/ripple/LedgerMaster.cpp | 2 +- src/cpp/ripple/LoadManager.h | 8 +- src/cpp/ripple/NetworkOPs.cpp | 18 +- src/cpp/ripple/PackedMessage.cpp | 53 ------ src/cpp/ripple/PackedMessage.h | 74 -------- src/cpp/ripple/Peer.cpp | 56 +++--- src/cpp/ripple/Peer.h | 119 ++++++------ src/cpp/ripple/PeerDoor.h | 9 +- src/cpp/ripple/SHAMap.cpp | 31 +-- 23 files changed, 405 insertions(+), 401 deletions(-) create mode 100644 modules/ripple_data/protocol/ripple_PackedMessage.cpp create mode 100644 modules/ripple_data/protocol/ripple_PackedMessage.h delete mode 100644 src/cpp/ripple/PackedMessage.cpp delete mode 100644 src/cpp/ripple/PackedMessage.h diff --git a/modules/ripple_basics/containers/ripple_TaggedCache.h b/modules/ripple_basics/containers/ripple_TaggedCache.h index a0e69e3401..9f9c5db285 100644 --- a/modules/ripple_basics/containers/ripple_TaggedCache.h +++ b/modules/ripple_basics/containers/ripple_TaggedCache.h @@ -33,39 +33,6 @@ public: typedef boost::weak_ptr weak_data_ptr; typedef boost::shared_ptr data_ptr; -protected: - - class cache_entry - { - public: - int last_use; - data_ptr ptr; - weak_data_ptr weak_ptr; - - cache_entry(int l, const data_ptr& d) : last_use(l), ptr(d), weak_ptr(d) { ; } - bool isWeak() { return !ptr; } - bool isCached() { return !!ptr; } - bool isExpired() { return weak_ptr.expired(); } - data_ptr lock() { return weak_ptr.lock(); } - void touch() { last_use = Timer::getElapsedSeconds (); } - }; - - typedef std::pair cache_pair; - typedef boost::unordered_map cache_type; - typedef typename cache_type::iterator cache_iterator; - - mutable boost::recursive_mutex mLock; - - std::string mName; // Used for logging - int mTargetSize; // Desired number of cache entries (0 = ignore) - int mTargetAge; // Desired maximum cache age - int mCacheCount; // Number of items cached - - cache_type mCache; // Hold strong reference to recent objects - int mLastSweep; - - uint64 mHits, mMisses; - public: TaggedCache(const char *name, int size, int age) : mName(name) @@ -99,6 +66,38 @@ public: bool retrieve(const key_type& key, c_Data& data); boost::recursive_mutex& peekMutex() { return mLock; } + +private: + class cache_entry + { + public: + int last_use; + data_ptr ptr; + weak_data_ptr weak_ptr; + + cache_entry(int l, const data_ptr& d) : last_use(l), ptr(d), weak_ptr(d) { ; } + bool isWeak() { return !ptr; } + bool isCached() { return !!ptr; } + bool isExpired() { return weak_ptr.expired(); } + data_ptr lock() { return weak_ptr.lock(); } + void touch() { last_use = Timer::getElapsedSeconds (); } + }; + + typedef std::pair cache_pair; + typedef boost::unordered_map cache_type; + typedef typename cache_type::iterator cache_iterator; + + mutable boost::recursive_mutex mLock; + + std::string mName; // Used for logging + int mTargetSize; // Desired number of cache entries (0 = ignore) + int mTargetAge; // Desired maximum cache age + int mCacheCount; // Number of items cached + + cache_type mCache; // Hold strong reference to recent objects + int mLastSweep; + + uint64 mHits, mMisses; }; template diff --git a/modules/ripple_data/protocol/ripple_PackedMessage.cpp b/modules/ripple_data/protocol/ripple_PackedMessage.cpp new file mode 100644 index 0000000000..f5989567a4 --- /dev/null +++ b/modules/ripple_data/protocol/ripple_PackedMessage.cpp @@ -0,0 +1,66 @@ + + +PackedMessage::PackedMessage (::google::protobuf::Message const& message, int type) +{ + unsigned const messageBytes = message.ByteSize (); + + assert (messageBytes != 0); + + mBuffer.resize (kHeaderBytes + messageBytes); + + encodeHeader (messageBytes, type); + + if (messageBytes != 0) + { + message.SerializeToArray (&mBuffer [PackedMessage::kHeaderBytes], messageBytes); + +#ifdef DEBUG +// std::cerr << "PackedMessage: type=" << type << ", datalen=" << msg_size << std::endl; +#endif + } +} + +bool PackedMessage::operator== (PackedMessage const& other) const +{ + return mBuffer == other.mBuffer; +} + +unsigned PackedMessage::getLength (std::vector const& buf) +{ + unsigned result; + + if (buf.size() >= PackedMessage::kHeaderBytes) + { + result = buf [0]; + result <<= 8; result |= buf [1]; + result <<= 8; result |= buf [2]; + result <<= 8; result |= buf [3]; + } + else + { + result = 0; + } + + return result; +} + +int PackedMessage::getType (std::vector const& buf) +{ + if(buf.size() < PackedMessage::kHeaderBytes) + return 0; + + int ret = buf[4]; + ret <<= 8; ret |= buf[5]; + return ret; +} + +void PackedMessage::encodeHeader (unsigned size, int type) +{ + assert(mBuffer.size() >= PackedMessage::kHeaderBytes); + mBuffer[0] = static_cast((size >> 24) & 0xFF); + mBuffer[1] = static_cast((size >> 16) & 0xFF); + mBuffer[2] = static_cast((size >> 8) & 0xFF); + mBuffer[3] = static_cast(size & 0xFF); + mBuffer[4] = static_cast((type >> 8) & 0xFF); + mBuffer[5] = static_cast(type & 0xFF); +} diff --git a/modules/ripple_data/protocol/ripple_PackedMessage.h b/modules/ripple_data/protocol/ripple_PackedMessage.h new file mode 100644 index 0000000000..b0b7c873e4 --- /dev/null +++ b/modules/ripple_data/protocol/ripple_PackedMessage.h @@ -0,0 +1,55 @@ +// +// packaging of messages into length/type-prepended buffers +// ready for transmission. + +#ifndef RIPPLE_PACKEDMESSAGE_H +#define RIPPLE_PACKEDMESSAGE_H + +// PackedMessage implements simple "packing" of protocol buffers Messages into +// a string prepended by a header specifying the message length. +// MessageType should be a Message class generated by the protobuf compiler. +// + +class PackedMessage : public boost::enable_shared_from_this +{ +public: + typedef boost::shared_ptr< ::google::protobuf::Message > MessagePointer; + typedef boost::shared_ptr pointer; + +public: + /** Number of bytes in a message header. + */ + static unsigned const kHeaderBytes = 6; + + PackedMessage (::google::protobuf::Message const& message, int type); + + /** Retrieve the packed message data. + */ + // VFALCO: TODO, shouldn't this be const? + std::vector & getBuffer() + { + return mBuffer; + } + + /** Determine bytewise equality. + */ + bool operator == (PackedMessage const& other) const; + + /** Calculate the length of a packed message. + */ + static unsigned getLength (std::vector const& buf); + + /** Determine the type of a packed message. + */ + static int getType (std::vector const& buf); + +private: + // Encodes the size and type into a header at the beginning of buf + // + void encodeHeader (unsigned size, int type); + + std::vector mBuffer; +}; + +#endif /* PACKEDMESSAGE_H */ + diff --git a/modules/ripple_data/ripple_data.cpp b/modules/ripple_data/ripple_data.cpp index 9029fe0f17..51dafb76ae 100644 --- a/modules/ripple_data/ripple_data.cpp +++ b/modules/ripple_data/ripple_data.cpp @@ -78,6 +78,7 @@ #include "protocol/ripple_FieldNames.cpp" #include "protocol/ripple_LedgerFormat.cpp" +#include "protocol/ripple_PackedMessage.cpp" #include "protocol/ripple_RippleAddress.cpp" #include "protocol/ripple_SerializedTypes.cpp" #include "protocol/ripple_Serializer.cpp" diff --git a/modules/ripple_data/ripple_data.h b/modules/ripple_data/ripple_data.h index 2dae741efa..a41a74bc68 100644 --- a/modules/ripple_data/ripple_data.h +++ b/modules/ripple_data/ripple_data.h @@ -32,14 +32,17 @@ #ifndef RIPPLE_DATA_H #define RIPPLE_DATA_H -#include #include +#include +#include #include #include #include #include #include +#include +#include #include #include #include // VFALCO: NOTE, this looks like junk @@ -54,12 +57,16 @@ // VFALCO: TODO, try to reduce these dependencies #include "../ripple_basics/ripple_basics.h" +// VFALCO: TODO, resolve the location of this file +#include "ripple.pb.h" + #include "crypto/ripple_CBigNum.h" #include "crypto/ripple_Base58.h" // VFALCO: TODO, Can be moved to .cpp if we clean up setAlphabet stuff #include "crypto/ripple_Base58Data.h" // #include "src/cpp/ripple/ProofOfWork.h" #include "protocol/ripple_FieldNames.h" +#include "protocol/ripple_PackedMessage.h" #include "protocol/ripple_RippleAddress.h" #include "protocol/ripple_RippleSystem.h" #include "protocol/ripple_Serializer.h" // needs CKey @@ -69,7 +76,4 @@ #include "protocol/ripple_LedgerFormat.h" // needs SOTemplate from SerializedObject #include "protocol/ripple_TransactionFormat.h" -// VFALCO: TODO, resolve the location of this file -#include "ripple.pb.h" - #endif diff --git a/modules/ripple_main/ripple_main.cpp b/modules/ripple_main/ripple_main.cpp index 16e715acab..15662fc242 100644 --- a/modules/ripple_main/ripple_main.cpp +++ b/modules/ripple_main/ripple_main.cpp @@ -154,7 +154,6 @@ #include "src/cpp/ripple/OrderBook.h" #include "src/cpp/ripple/OrderBookDB.h" #include "src/cpp/ripple/PFRequest.h" -#include "src/cpp/ripple/PackedMessage.h" #include "src/cpp/ripple/ParameterTable.h" #include "src/cpp/ripple/ParseSection.h" #include "src/cpp/ripple/Pathfinder.h" @@ -246,7 +245,6 @@ static DH* handleTmpDh(SSL* ssl, int is_export, int iKeyLength) #include "src/cpp/ripple/Operation.cpp" // no log #include "src/cpp/ripple/OrderBook.cpp" // no log #include "src/cpp/ripple/OrderBookDB.cpp" -#include "src/cpp/ripple/PackedMessage.cpp" // no log #include "src/cpp/ripple/ParameterTable.cpp" // no log #include "src/cpp/ripple/ParseSection.cpp" #include "src/cpp/ripple/Pathfinder.cpp" diff --git a/newcoin.vcxproj b/newcoin.vcxproj index 47e57ecb4c..489c5eb3d1 100644 --- a/newcoin.vcxproj +++ b/newcoin.vcxproj @@ -297,6 +297,12 @@ true true + + true + true + true + true + true true @@ -1294,6 +1300,7 @@ + diff --git a/newcoin.vcxproj.filters b/newcoin.vcxproj.filters index f5ee4ad7b7..e904148829 100644 --- a/newcoin.vcxproj.filters +++ b/newcoin.vcxproj.filters @@ -810,6 +810,9 @@ 1. Modules\ripple_basics\types + + 1. Modules\ripple_data\protocol + @@ -1505,6 +1508,9 @@ 1. Modules\ripple_basics\types + + 1. Modules\ripple_data\protocol + diff --git a/src/cpp/ripple/Application.cpp b/src/cpp/ripple/Application.cpp index 02ae10df9c..d50052a105 100644 --- a/src/cpp/ripple/Application.cpp +++ b/src/cpp/ripple/Application.cpp @@ -127,26 +127,6 @@ static void runIO(boost::asio::io_service& io) io.run(); } -bool Application::isNew(const uint256& s) -{ - return mHashRouter->addSuppression(s); -} - -bool Application::isNew(const uint256& s, uint64 p) -{ - return mHashRouter->addSuppressionPeer(s, p); -} - -bool Application::isNew(const uint256& s, uint64 p, int& f) -{ - return mHashRouter->addSuppressionPeer(s, p, f); -} - -bool Application::isNewFlag(const uint256& s, int f) -{ - return mHashRouter->setFlag(s, f); -} - void Application::setup() { mJobQueue.setThreadCount(); diff --git a/src/cpp/ripple/Application.h b/src/cpp/ripple/Application.h index 28b5137fc9..457163a5b7 100644 --- a/src/cpp/ripple/Application.h +++ b/src/cpp/ripple/Application.h @@ -110,7 +110,6 @@ public: NodeCache& getTempNodeCache() { return mTempNodeCache; } HashedObjectStore& getHashedObjectStore() { return mHashedObjectStore; } JobQueue& getJobQueue() { return mJobQueue; } - IHashRouter& getHashRouter() { return *mHashRouter; } boost::recursive_mutex& getMasterLock() { return mMasterLock; } ProofOfWorkGenerator& getPowGen() { return mPOWGen; } LoadManager& getLoadManager() { return mLoadMgr; } @@ -118,18 +117,13 @@ public: PeerDoor& getPeerDoor() { return *mPeerDoor; } OrderBookDB& getOrderBookDB() { return mOrderBookDB; } SLECache& getSLECache() { return mSLECache; } - IFeatures& getFeatureTable() { return *mFeatures; } - IFeeVote& getFeeVote() { return *mFeeVote; } + IFeatures& getFeatureTable() { return *mFeatures; } ILoadFeeTrack& getFeeTrack() { return *mFeeTrack; } + IFeeVote& getFeeVote() { return *mFeeVote; } + IHashRouter& getHashRouter() { return *mHashRouter; } IValidations& getValidations() { return *mValidations; } - // VFALCO: TODO, eliminate these, change callers to just call IHashRouter directly! - bool isNew(const uint256& s); - bool isNew(const uint256& s, uint64 p); - bool isNew(const uint256& s, uint64 p, int& f); - bool isNewFlag(const uint256& s, int f); - // VFALCO: TODO, Move these to the .cpp bool running() { return mTxnDB != NULL; } // VFALCO: TODO, replace with nullptr when beast is available bool getSystemTimeOffset(int& offset) { return mSNTPClient.getOffset(offset); } diff --git a/src/cpp/ripple/ConnectionPool.h b/src/cpp/ripple/ConnectionPool.h index 70e627f489..af1305a962 100644 --- a/src/cpp/ripple/ConnectionPool.h +++ b/src/cpp/ripple/ConnectionPool.h @@ -7,7 +7,6 @@ #include #include "Peer.h" -#include "PackedMessage.h" // // Access to the Ripple network. diff --git a/src/cpp/ripple/Ledger.cpp b/src/cpp/ripple/Ledger.cpp index 4e58876c54..12e69e981a 100644 --- a/src/cpp/ripple/Ledger.cpp +++ b/src/cpp/ripple/Ledger.cpp @@ -1574,7 +1574,7 @@ uint32 Ledger::roundCloseTime(uint32 closeTime, uint32 closeResolution) void Ledger::pendSave(bool fromConsensus) { - if (!fromConsensus && !theApp->isNewFlag(getHash(), SF_SAVED)) + if (!fromConsensus && !theApp->getHashRouter ().setFlag (getHash(), SF_SAVED)) return; assert(isImmutable()); diff --git a/src/cpp/ripple/LedgerConsensus.cpp b/src/cpp/ripple/LedgerConsensus.cpp index aa7e0addf2..1e2c8e5225 100644 --- a/src/cpp/ripple/LedgerConsensus.cpp +++ b/src/cpp/ripple/LedgerConsensus.cpp @@ -188,7 +188,7 @@ void LedgerConsensus::checkOurValidation() (mPreviousLedger->getHash(), theApp->getOPs().getValidationTimeNC(), mValPublic, false); v->setTrusted(); v->sign(signingHash, mValPrivate); - theApp->isNew(signingHash); + theApp->getHashRouter ().addSuppression (signingHash); theApp->getValidations().addValidation(v, "localMissing"); std::vector validation = v->getSigned(); ripple::TMValidation val; @@ -561,6 +561,7 @@ void LedgerConsensus::stateAccepted() endConsensus(); } +// VFALCO: TODO implement shutdown without a naked global extern volatile bool doShutdown; void LedgerConsensus::timerEntry() @@ -858,7 +859,7 @@ void LedgerConsensus::addDisputedTransaction(const uint256& txID, const std::vec txn->setVote(pit.first, cit->second->hasItem(txID)); } - if (theApp->isNewFlag(txID, SF_RELAYED)) + if (theApp->getHashRouter ().setFlag (txID, SF_RELAYED)) { ripple::TMTransaction msg; msg.set_rawtransaction(&(tx.front()), tx.size()); @@ -1033,7 +1034,7 @@ int LedgerConsensus::applyTransaction(TransactionEngine& engine, SerializedTrans TransactionEngineParams parms = openLedger ? tapOPEN_LEDGER : tapNONE; if (retryAssured) parms = static_cast(parms | tapRETRY); - if (theApp->isNewFlag(txn->getTransactionID(), SF_SIGGOOD)) + if (theApp->getHashRouter ().setFlag (txn->getTransactionID(), SF_SIGGOOD)) parms = static_cast(parms | tapNO_CHECK_SIGN); WriteLog (lsDEBUG, LedgerConsensus) << "TXN " << txn->getTransactionID() @@ -1223,7 +1224,7 @@ void LedgerConsensus::accept(SHAMap::ref set, LoadEvent::pointer) } v->sign(signingHash, mValPrivate); v->setTrusted(); - theApp->isNew(signingHash); // suppress it if we receive it + theApp->getHashRouter ().addSuppression (signingHash); // suppress it if we receive it theApp->getValidations().addValidation(v, "local"); theApp->getOPs().setLastValidation(v); std::vector validation = v->getSigned(); diff --git a/src/cpp/ripple/LedgerConsensus.h b/src/cpp/ripple/LedgerConsensus.h index 4e9a3e5be8..7237c51285 100644 --- a/src/cpp/ripple/LedgerConsensus.h +++ b/src/cpp/ripple/LedgerConsensus.h @@ -27,7 +27,16 @@ class TransactionAcquire public: typedef boost::shared_ptr pointer; -protected: +public: + TransactionAcquire(const uint256& hash); + virtual ~TransactionAcquire() { ; } + + SHAMap::ref getMap() { return mMap; } + + SMAddNode takeNodes(const std::list& IDs, + const std::list< std::vector >& data, Peer::ref); + +private: SHAMap::pointer mMap; bool mHaveRoot; @@ -37,26 +46,11 @@ protected: void done(); void trigger(Peer::ref); boost::weak_ptr pmDowncast(); - -public: - TransactionAcquire(const uint256& hash); - virtual ~TransactionAcquire() { ; } - - SHAMap::ref getMap() { return mMap; } - - SMAddNode takeNodes(const std::list& IDs, - const std::list< std::vector >& data, Peer::ref); }; +// A transaction that may be disputed class LCTransaction -{ // A transaction that may be disputed -protected: - uint256 mTransactionID; - int mYays, mNays; - bool mOurVote; - Serializer transaction; - boost::unordered_map mVotes; - +{ public: typedef boost::shared_ptr pointer; @@ -73,6 +67,13 @@ public: bool updateVote(int percentTime, bool proposing); Json::Value getJson(); + +private: + uint256 mTransactionID; + int mYays, mNays; + bool mOurVote; + Serializer transaction; + boost::unordered_map mVotes; }; enum LCState @@ -85,76 +86,6 @@ enum LCState class LedgerConsensus : public boost::enable_shared_from_this, IS_INSTANCE(LedgerConsensus) { -protected: - LCState mState; - uint32 mCloseTime; // The wall time this ledger closed - uint256 mPrevLedgerHash, mNewLedgerHash; - Ledger::pointer mPreviousLedger; - LedgerAcquire::pointer mAcquiringLedger; - LedgerProposal::pointer mOurPosition; - RippleAddress mValPublic, mValPrivate; - bool mProposing, mValidating, mHaveCorrectLCL, mConsensusFail; - - int mCurrentMSeconds, mClosePercent, mCloseResolution; - bool mHaveCloseTimeConsensus; - - boost::posix_time::ptime mConsensusStartTime; - int mPreviousProposers; - int mPreviousMSeconds; - - // Convergence tracking, trusted peers indexed by hash of public key - boost::unordered_map mPeerPositions; - - // Transaction Sets, indexed by hash of transaction tree - boost::unordered_map mAcquired; - boost::unordered_map mAcquiring; - - // Peer sets - boost::unordered_map > > mPeerData; - - // Disputed transactions - boost::unordered_map mDisputes; - - // Close time estimates - std::map mCloseTimes; - - // nodes that have bowed out of this consensus process - boost::unordered_set mDeadNodes; - - // final accept logic - void accept(SHAMap::ref txSet, LoadEvent::pointer); - - void weHave(const uint256& id, Peer::ref avoidPeer); - void startAcquiring(TransactionAcquire::pointer); - SHAMap::pointer find(const uint256& hash); - - void createDisputes(SHAMap::ref, SHAMap::ref); - void addDisputedTransaction(const uint256&, const std::vector& transaction); - void adjustCount(SHAMap::ref map, const std::vector& peers); - void propose(); - - void addPosition(LedgerProposal&, bool ours); - void removePosition(LedgerProposal&, bool ours); - void sendHaveTxSet(const uint256& set, bool direct); - void applyTransactions(SHAMap::ref transactionSet, Ledger::ref targetLedger, - Ledger::ref checkLedger, CanonicalTXSet& failedTransactions, bool openLgr); - int applyTransaction(TransactionEngine& engine, SerializedTransaction::ref txn, Ledger::ref targetLedger, - bool openLgr, bool retryAssured); - - uint32 roundCloseTime(uint32 closeTime); - - // manipulating our own position - void statusChange(ripple::NodeEvent, Ledger& ledger); - void takeInitialPosition(Ledger& initialLedger); - void updateOurPositions(); - void playbackProposals(); - int getThreshold(); - void closeLedger(); - void checkOurValidation(); - - void beginAccept(bool synchronous); - void endConsensus(); - public: LedgerConsensus(const uint256& prevLCLHash, Ledger::ref previousLedger, uint32 closeTime); @@ -193,6 +124,77 @@ public: // test/debug void simulate(); + +private: + // final accept logic + void accept(SHAMap::ref txSet, LoadEvent::pointer); + + void weHave(const uint256& id, Peer::ref avoidPeer); + void startAcquiring(TransactionAcquire::pointer); + SHAMap::pointer find(const uint256& hash); + + void createDisputes(SHAMap::ref, SHAMap::ref); + void addDisputedTransaction(const uint256&, const std::vector& transaction); + void adjustCount(SHAMap::ref map, const std::vector& peers); + void propose(); + + void addPosition(LedgerProposal&, bool ours); + void removePosition(LedgerProposal&, bool ours); + void sendHaveTxSet(const uint256& set, bool direct); + void applyTransactions(SHAMap::ref transactionSet, Ledger::ref targetLedger, + Ledger::ref checkLedger, CanonicalTXSet& failedTransactions, bool openLgr); + int applyTransaction(TransactionEngine& engine, SerializedTransaction::ref txn, Ledger::ref targetLedger, + bool openLgr, bool retryAssured); + + uint32 roundCloseTime(uint32 closeTime); + + // manipulating our own position + void statusChange(ripple::NodeEvent, Ledger& ledger); + void takeInitialPosition(Ledger& initialLedger); + void updateOurPositions(); + void playbackProposals(); + int getThreshold(); + void closeLedger(); + void checkOurValidation(); + + void beginAccept(bool synchronous); + void endConsensus(); + +private: + LCState mState; + uint32 mCloseTime; // The wall time this ledger closed + uint256 mPrevLedgerHash, mNewLedgerHash; + Ledger::pointer mPreviousLedger; + LedgerAcquire::pointer mAcquiringLedger; + LedgerProposal::pointer mOurPosition; + RippleAddress mValPublic, mValPrivate; + bool mProposing, mValidating, mHaveCorrectLCL, mConsensusFail; + + int mCurrentMSeconds, mClosePercent, mCloseResolution; + bool mHaveCloseTimeConsensus; + + boost::posix_time::ptime mConsensusStartTime; + int mPreviousProposers; + int mPreviousMSeconds; + + // Convergence tracking, trusted peers indexed by hash of public key + boost::unordered_map mPeerPositions; + + // Transaction Sets, indexed by hash of transaction tree + boost::unordered_map mAcquired; + boost::unordered_map mAcquiring; + + // Peer sets + boost::unordered_map > > mPeerData; + + // Disputed transactions + boost::unordered_map mDisputes; + + // Close time estimates + std::map mCloseTimes; + + // nodes that have bowed out of this consensus process + boost::unordered_set mDeadNodes; }; diff --git a/src/cpp/ripple/LedgerMaster.cpp b/src/cpp/ripple/LedgerMaster.cpp index 2b9fac9a1a..2146ee5772 100644 --- a/src/cpp/ripple/LedgerMaster.cpp +++ b/src/cpp/ripple/LedgerMaster.cpp @@ -118,7 +118,7 @@ Ledger::pointer LedgerMaster::closeLedger(bool recover) { TransactionEngineParams tepFlags = tapOPEN_LEDGER; - if (theApp->isNew(it->first.getTXID(), SF_SIGGOOD)) + if (theApp->getHashRouter ().addSuppressionPeer (it->first.getTXID(), SF_SIGGOOD)) tepFlags = static_cast(tepFlags | tapNO_CHECK_SIGN); bool didApply; diff --git a/src/cpp/ripple/LoadManager.h b/src/cpp/ripple/LoadManager.h index 375648f67c..bdafa53fc9 100644 --- a/src/cpp/ripple/LoadManager.h +++ b/src/cpp/ripple/LoadManager.h @@ -5,6 +5,7 @@ #include +// VFALCO: TODO replace LT_ with loadType in constants enum LoadType { // types of load that can be placed on the server @@ -44,8 +45,9 @@ public: LoadCost(LoadType t, int cost, int cat) : mType(t), mCost(cost), mCategories(cat) { ; } }; +// a single endpoint that can impose load class LoadSource -{ // a single endpoint that can impose load +{ private: // VFALCO: Make this not a friend friend class LoadManager; @@ -97,9 +99,9 @@ private: bool mLogged; }; - +// a collection of load sources class LoadManager -{ // a collection of load sources +{ public: LoadManager(int creditRate = 100, int creditLimit = 500, int debitWarn = -500, int debitLimit = -1000); diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index 700a256582..ea538ff433 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -167,7 +167,7 @@ void NetworkOPs::submitTransaction(Job&, SerializedTransaction::pointer iTrans, uint256 suppress = trans->getTransactionID(); int flags; - if (theApp->isNew(suppress, 0, flags) && ((flags & SF_RETRY) != 0)) + if (theApp->getHashRouter ().addSuppressionPeer (suppress, 0, flags) && ((flags & SF_RETRY) != 0)) { WriteLog (lsWARNING, NetworkOPs) << "Redundant transactions submitted"; return; @@ -186,10 +186,10 @@ void NetworkOPs::submitTransaction(Job&, SerializedTransaction::pointer iTrans, if (!trans->checkSign()) { WriteLog (lsWARNING, NetworkOPs) << "Submitted transaction has bad signature"; - theApp->isNewFlag(suppress, SF_BAD); + theApp->getHashRouter ().setFlag (suppress, SF_BAD); return; } - theApp->isNewFlag(suppress, SF_SIGGOOD); + theApp->getHashRouter ().setFlag (suppress, SF_SIGGOOD); } catch (...) { @@ -260,9 +260,9 @@ void NetworkOPs::runTransactionQueue() dbtx->setResult(r); if (isTemMalformed(r)) // malformed, cache bad - theApp->isNewFlag(txn->getID(), SF_BAD); + theApp->getHashRouter ().setFlag (txn->getID(), SF_BAD); else if(isTelLocal(r) || isTerRetry(r)) // can be retried - theApp->isNewFlag(txn->getID(), SF_RETRY); + theApp->getHashRouter ().setFlag (txn->getID(), SF_RETRY); if (isTerRetry(r)) @@ -333,10 +333,10 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, WriteLog (lsINFO, NetworkOPs) << "Transaction has bad signature"; trans->setStatus(INVALID); trans->setResult(temBAD_SIGNATURE); - theApp->isNewFlag(trans->getID(), SF_BAD); + theApp->getHashRouter ().setFlag (trans->getID(), SF_BAD); return trans; } - theApp->isNewFlag(trans->getID(), SF_SIGGOOD); + theApp->getHashRouter ().setFlag (trans->getID(), SF_SIGGOOD); } boost::recursive_mutex::scoped_lock sl(theApp->getMasterLock()); @@ -347,9 +347,9 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, trans->setResult(r); if (isTemMalformed(r)) // malformed, cache bad - theApp->isNewFlag(trans->getID(), SF_BAD); + theApp->getHashRouter ().setFlag (trans->getID(), SF_BAD); else if(isTelLocal(r) || isTerRetry(r)) // can be retried - theApp->isNewFlag(trans->getID(), SF_RETRY); + theApp->getHashRouter ().setFlag (trans->getID(), SF_RETRY); #ifdef DEBUG if (r != tesSUCCESS) diff --git a/src/cpp/ripple/PackedMessage.cpp b/src/cpp/ripple/PackedMessage.cpp deleted file mode 100644 index 190837dea0..0000000000 --- a/src/cpp/ripple/PackedMessage.cpp +++ /dev/null @@ -1,53 +0,0 @@ -#include "PackedMessage.h" - - -void PackedMessage::encodeHeader(unsigned size, int type) -{ - assert(mBuffer.size() >= HEADER_SIZE); - mBuffer[0] = static_cast((size >> 24) & 0xFF); - mBuffer[1] = static_cast((size >> 16) & 0xFF); - mBuffer[2] = static_cast((size >> 8) & 0xFF); - mBuffer[3] = static_cast(size & 0xFF); - mBuffer[4] = static_cast((type >> 8) & 0xFF); - mBuffer[5] = static_cast(type & 0xFF); -} - -PackedMessage::PackedMessage(const ::google::protobuf::Message &message, int type) -{ - unsigned msg_size = message.ByteSize(); - assert(msg_size); - mBuffer.resize(HEADER_SIZE + msg_size); - encodeHeader(msg_size, type); - if (msg_size) - { - message.SerializeToArray(&mBuffer[HEADER_SIZE], msg_size); -#ifdef DEBUG -// std::cerr << "PackedMessage: type=" << type << ", datalen=" << msg_size << std::endl; -#endif - } -} - -bool PackedMessage::operator == (const PackedMessage& other) -{ - return (mBuffer == other.mBuffer); -} - -unsigned PackedMessage::getLength(std::vector& buf) -{ - if(buf.size() < HEADER_SIZE) - return 0; - - int ret = buf[0]; - ret <<= 8; ret |= buf[1]; ret <<= 8; ret |= buf[2]; ret <<= 8; ret |= buf[3]; - return ret; -} - -int PackedMessage::getType(std::vector& buf) -{ - if(buf.size() < HEADER_SIZE) - return 0; - - int ret = buf[4]; - ret <<= 8; ret |= buf[5]; - return ret; -} diff --git a/src/cpp/ripple/PackedMessage.h b/src/cpp/ripple/PackedMessage.h deleted file mode 100644 index 7b226c4a39..0000000000 --- a/src/cpp/ripple/PackedMessage.h +++ /dev/null @@ -1,74 +0,0 @@ -// -// packaging of messages into length/type-prepended buffers -// ready for transmission. - -#ifndef PACKEDMESSAGE_H -#define PACKEDMESSAGE_H - -#include -#include -#include -#include - -#include -#include -#include - -// The header size for packed messages -// len(4)+type(2) -const unsigned HEADER_SIZE = 6; - - -// PackedMessage implements simple "packing" of protocol buffers Messages into -// a string prepended by a header specifying the message length. -// MessageType should be a Message class generated by the protobuf compiler. -// - -class PackedMessage : public boost::enable_shared_from_this -{ - - std::vector mBuffer; - - // Encodes the size and type into a header at the beginning of buf - // - void encodeHeader(unsigned size, int type); - -public: - typedef boost::shared_ptr< ::google::protobuf::Message > MessagePointer; - typedef boost::shared_ptr pointer; - - PackedMessage(const ::google::protobuf::Message& message, int type); - - std::vector& getBuffer() { return(mBuffer); } - - static unsigned getLength(std::vector& buf); - static int getType(std::vector& buf); - - bool operator == (const PackedMessage& other); - - /* - void setMsg(MessagePointer msg, int type); - - MessagePointer getMsg(); - - // Pack the message into the given data_buffer. The buffer is resized to - // exactly fit the message. - // Return false in case of an error, true if successful. - // - bool pack(data_buffer& buf) const; - - // Given a buffer with the first HEADER_SIZE bytes representing the header, - // decode the header and return the message length. Return 0 in case of - // an error. - // - unsigned decodeHeader(const data_buffer& buf) const; - - // Unpack and store a message from the given packed buffer. - // Return true if unpacking successful, false otherwise. - // - bool unpack(const data_buffer& buf); - */ -}; - -#endif /* PACKEDMESSAGE_H */ - diff --git a/src/cpp/ripple/Peer.cpp b/src/cpp/ripple/Peer.cpp index 352c6a29dd..ab84bd4446 100644 --- a/src/cpp/ripple/Peer.cpp +++ b/src/cpp/ripple/Peer.cpp @@ -345,7 +345,7 @@ void Peer::startReadHeader() if (!mDetaching) { mReadbuf.clear(); - mReadbuf.resize(HEADER_SIZE); + mReadbuf.resize(PackedMessage::kHeaderBytes); boost::asio::async_read(mSocketSsl, boost::asio::buffer(mReadbuf), mIOStrand.wrap( boost::bind(&Peer::handleReadHeader, shared_from_this(), boost::asio::placeholders::error))); @@ -354,15 +354,15 @@ void Peer::startReadHeader() void Peer::startReadBody(unsigned msg_len) { - // m_readbuf already contains the header in its first HEADER_SIZE + // m_readbuf already contains the header in its first PackedMessage::kHeaderBytes // bytes. Expand it to fit in the body as well, and start async // read into the body. if (!mDetaching) { - mReadbuf.resize(HEADER_SIZE + msg_len); + mReadbuf.resize(PackedMessage::kHeaderBytes + msg_len); - boost::asio::async_read(mSocketSsl, boost::asio::buffer(&mReadbuf[HEADER_SIZE], msg_len), + boost::asio::async_read(mSocketSsl, boost::asio::buffer(&mReadbuf[PackedMessage::kHeaderBytes], msg_len), mIOStrand.wrap(boost::bind(&Peer::handleReadBody, shared_from_this(), boost::asio::placeholders::error))); } } @@ -430,7 +430,7 @@ void Peer::processReadBuffer() { // must not hold peer lock int type = PackedMessage::getType(mReadbuf); #ifdef DEBUG -// std::cerr << "PRB(" << type << "), len=" << (mReadbuf.size()-HEADER_SIZE) << std::endl; +// std::cerr << "PRB(" << type << "), len=" << (mReadbuf.size()-PackedMessage::kHeaderBytes) << std::endl; #endif // std::cerr << "Peer::processReadBuffer: " << mIpPort.first << " " << mIpPort.second << std::endl; @@ -453,7 +453,7 @@ void Peer::processReadBuffer() { event->reName("Peer::hello"); ripple::TMHello msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvHello(msg); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -464,7 +464,7 @@ void Peer::processReadBuffer() { event->reName("Peer::errormessage"); ripple::TMErrorMsg msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvErrorMessage(msg); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -475,7 +475,7 @@ void Peer::processReadBuffer() { event->reName("Peer::ping"); ripple::TMPing msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvPing(msg); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -486,7 +486,7 @@ void Peer::processReadBuffer() { event->reName("Peer::getcontacts"); ripple::TMGetContacts msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvGetContacts(msg); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -498,7 +498,7 @@ void Peer::processReadBuffer() event->reName("Peer::contact"); ripple::TMContact msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvContact(msg); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -510,7 +510,7 @@ void Peer::processReadBuffer() event->reName("Peer::getpeers"); ripple::TMGetPeers msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvGetPeers(msg, sl); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -522,7 +522,7 @@ void Peer::processReadBuffer() event->reName("Peer::peers"); ripple::TMPeers msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvPeers(msg); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -533,7 +533,7 @@ void Peer::processReadBuffer() { event->reName("Peer::searchtransaction"); ripple::TMSearchTransaction msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvSearchTransaction(msg); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -544,7 +544,7 @@ void Peer::processReadBuffer() { event->reName("Peer::getaccount"); ripple::TMGetAccount msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvGetAccount(msg); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -555,7 +555,7 @@ void Peer::processReadBuffer() { event->reName("Peer::account"); ripple::TMAccount msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvAccount(msg); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -566,7 +566,7 @@ void Peer::processReadBuffer() { event->reName("Peer::transaction"); ripple::TMTransaction msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvTransaction(msg, sl); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -577,7 +577,7 @@ void Peer::processReadBuffer() { event->reName("Peer::statuschange"); ripple::TMStatusChange msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvStatus(msg); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -588,7 +588,7 @@ void Peer::processReadBuffer() { event->reName("Peer::propose"); boost::shared_ptr msg = boost::make_shared(); - if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg->ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvPropose(msg); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -599,7 +599,7 @@ void Peer::processReadBuffer() { event->reName("Peer::getledger"); ripple::TMGetLedger msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvGetLedger(msg, sl); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -610,7 +610,7 @@ void Peer::processReadBuffer() { event->reName("Peer::ledgerdata"); boost::shared_ptr msg = boost::make_shared(); - if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg->ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvLedger(msg, sl); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -621,7 +621,7 @@ void Peer::processReadBuffer() { event->reName("Peer::haveset"); ripple::TMHaveTransactionSet msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvHaveTxSet(msg); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -632,7 +632,7 @@ void Peer::processReadBuffer() { event->reName("Peer::validation"); boost::shared_ptr msg = boost::make_shared(); - if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg->ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvValidation(msg, sl); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -642,7 +642,7 @@ void Peer::processReadBuffer() case ripple::mtGET_VALIDATION: { ripple::TM msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recv(msg); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -654,7 +654,7 @@ void Peer::processReadBuffer() { event->reName("Peer::getobjects"); boost::shared_ptr msg = boost::make_shared(); - if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg->ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvGetObjectByHash(msg); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -665,7 +665,7 @@ void Peer::processReadBuffer() { event->reName("Peer::proofofwork"); ripple::TMProofWork msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + if (msg.ParseFromArray(&mReadbuf[PackedMessage::kHeaderBytes], mReadbuf.size() - PackedMessage::kHeaderBytes)) recvProofWork(msg); else WriteLog (lsWARNING, Peer) << "parse error: " << type; @@ -876,7 +876,7 @@ void Peer::recvTransaction(ripple::TMTransaction& packet, ScopedLock& MasterLock SerializedTransaction::pointer stx = boost::make_shared(boost::ref(sit)); int flags; - if (!theApp->isNew(stx->getTransactionID(), mPeerId, flags)) + if (! theApp->getHashRouter ().addSuppressionPeer (stx->getTransactionID(), mPeerId, flags)) { // we have seen this transaction recently if ((flags & SF_BAD) != 0) { @@ -1004,7 +1004,7 @@ void Peer::recvPropose(const boost::shared_ptr& packet) s.add256(prevLedger); uint256 suppression = s.getSHA512Half(); - if (!theApp->isNew(suppression, mPeerId)) + if (! theApp->getHashRouter ().addSuppressionPeer (suppression, mPeerId)) { WriteLog (lsTRACE, Peer) << "Received duplicate proposal from peer " << mPeerId; return; @@ -1102,7 +1102,7 @@ void Peer::recvValidation(const boost::shared_ptr& packet, SerializedValidation::pointer val = boost::make_shared(boost::ref(sit), false); uint256 signingHash = val->getSigningHash(); - if (!theApp->isNew(signingHash, mPeerId)) + if (! theApp->getHashRouter ().addSuppressionPeer (signingHash, mPeerId)) { WriteLog (lsTRACE, Peer) << "Validation is duplicate"; return; diff --git a/src/cpp/ripple/Peer.h b/src/cpp/ripple/Peer.h index f5f2846e6e..6cc2da0e42 100644 --- a/src/cpp/ripple/Peer.h +++ b/src/cpp/ripple/Peer.h @@ -8,7 +8,6 @@ #include #include -#include "PackedMessage.h" #include "Ledger.h" #include "Transaction.h" #include "ProofOfWork.h" @@ -18,16 +17,73 @@ typedef std::pair ipPort; DEFINE_INSTANCE(Peer); -class Peer : public boost::enable_shared_from_this, public IS_INSTANCE(Peer) +class Peer : public boost::enable_shared_from_this + , public IS_INSTANCE (Peer) { public: typedef boost::shared_ptr pointer; typedef const boost::shared_ptr& ref; - static const int psbGotHello = 0, psbSentHello = 1, psbInMap = 2, psbTrusted = 3; - static const int psbNoLedgers = 4, psbNoTransactions = 5, psbDownLevel = 6; + static int const psbGotHello = 0; + static int const psbSentHello = 1; + static int const psbInMap = 2; + static int const psbTrusted = 3; + static int const psbNoLedgers = 4; + static int const psbNoTransactions = 5; + static int const psbDownLevel = 6; - void handleConnect(const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it); +public: + //bool operator == (const Peer& other); + + void handleConnect (const boost::system::error_code& error, boost::asio::ip::tcp::resolver::iterator it); + + std::string& getIP() { return mIpPort.first; } + std::string getDisplayName() { return mCluster ? mNodeName : mIpPort.first; } + int getPort() { return mIpPort.second; } + + void setIpPort(const std::string& strIP, int iPort); + + static pointer create(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 id, bool inbound) + { + return pointer(new Peer(io_service, ctx, id, inbound)); + } + + boost::asio::ssl::stream::lowest_layer_type& getSocket() + { + return mSocketSsl.lowest_layer(); + } + + void connect(const std::string& strIp, int iPort); + void connected(const boost::system::error_code& error); + void detach(const char *, bool onIOStrand); + bool samePeer(Peer::ref p) { return samePeer(*p); } + bool samePeer(const Peer& p) { return this == &p; } + + void sendPacket(const PackedMessage::pointer& packet, bool onStrand); + void sendLedgerProposal(Ledger::ref ledger); + void sendFullLedger(Ledger::ref ledger); + void sendGetFullLedger(uint256& hash); + void sendGetPeers(); + + void punishPeer(LoadType); + + // VFALCO: NOTE, what's with this odd parameter passing? Why the static member? + static void punishPeer(const boost::weak_ptr&, LoadType); + + Json::Value getJson(); + bool isConnected() const { return mHelloed && !mDetaching; } + bool isInbound() const { return mInbound; } + bool isOutbound() const { return !mInbound; } + + const uint256& getClosedLedgerHash() const { return mClosedLedgerHash; } + bool hasLedger(const uint256& hash, uint32 seq) const; + bool hasTxSet(const uint256& hash) const; + uint64 getPeerId() const { return mPeerId; } + + const RippleAddress& getNodePublic() const { return mNodePublic; } + void cycleStatus() { mPreviousLedgerHash = mClosedLedgerHash; mClosedLedgerHash.zero(); } + bool hasProto(int version); + bool hasRange(uint32 uMin, uint32 uMax) { return (uMin >= mMinLedger) && (uMax <= mMaxLedger); } private: bool mInbound; // Connection is inbound @@ -59,7 +115,7 @@ private: void handleVerifyTimer(const boost::system::error_code& ecResult); void handlePingTimer(const boost::system::error_code& ecResult); -protected: +private: boost::asio::io_service::strand mIOStrand; std::vector mReadbuf; std::list mSendQ; @@ -110,57 +166,8 @@ protected: void doFetchPack(const boost::shared_ptr& packet); + // VFALCO: NOTE, why is this a static member instead of a regular member? static void doProofOfWork(Job&, boost::weak_ptr, ProofOfWork::pointer); - -public: - - //bool operator == (const Peer& other); - - std::string& getIP() { return mIpPort.first; } - std::string getDisplayName() { return mCluster ? mNodeName : mIpPort.first; } - int getPort() { return mIpPort.second; } - - void setIpPort(const std::string& strIP, int iPort); - - static pointer create(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 id, bool inbound) - { - return pointer(new Peer(io_service, ctx, id, inbound)); - } - - boost::asio::ssl::stream::lowest_layer_type& getSocket() - { - return mSocketSsl.lowest_layer(); - } - - void connect(const std::string& strIp, int iPort); - void connected(const boost::system::error_code& error); - void detach(const char *, bool onIOStrand); - bool samePeer(Peer::ref p) { return samePeer(*p); } - bool samePeer(const Peer& p) { return this == &p; } - - void sendPacket(const PackedMessage::pointer& packet, bool onStrand); - void sendLedgerProposal(Ledger::ref ledger); - void sendFullLedger(Ledger::ref ledger); - void sendGetFullLedger(uint256& hash); - void sendGetPeers(); - - void punishPeer(LoadType); - static void punishPeer(const boost::weak_ptr&, LoadType); - - Json::Value getJson(); - bool isConnected() const { return mHelloed && !mDetaching; } - bool isInbound() const { return mInbound; } - bool isOutbound() const { return !mInbound; } - - const uint256& getClosedLedgerHash() const { return mClosedLedgerHash; } - bool hasLedger(const uint256& hash, uint32 seq) const; - bool hasTxSet(const uint256& hash) const; - uint64 getPeerId() const { return mPeerId; } - - const RippleAddress& getNodePublic() const { return mNodePublic; } - void cycleStatus() { mPreviousLedgerHash = mClosedLedgerHash; mClosedLedgerHash.zero(); } - bool hasProto(int version); - bool hasRange(uint32 uMin, uint32 uMax) { return (uMin >= mMinLedger) && (uMax <= mMaxLedger); } }; #endif diff --git a/src/cpp/ripple/PeerDoor.h b/src/cpp/ripple/PeerDoor.h index a6146e4117..93b7f7619d 100644 --- a/src/cpp/ripple/PeerDoor.h +++ b/src/cpp/ripple/PeerDoor.h @@ -12,6 +12,11 @@ Handles incoming connections from other Peers class PeerDoor { +public: + PeerDoor (boost::asio::io_service& io_service); + + boost::asio::ssl::context& getSSLContext() { return mCtx; } + private: boost::asio::ip::tcp::acceptor mAcceptor; boost::asio::ssl::context mCtx; @@ -19,10 +24,6 @@ private: void startListening(); void handleConnect(Peer::pointer new_connection, const boost::system::error_code& error); - -public: - PeerDoor(boost::asio::io_service& io_service); - boost::asio::ssl::context& getSSLContext() { return mCtx; } }; #endif diff --git a/src/cpp/ripple/SHAMap.cpp b/src/cpp/ripple/SHAMap.cpp index ed2f81a4aa..066e06fc69 100644 --- a/src/cpp/ripple/SHAMap.cpp +++ b/src/cpp/ripple/SHAMap.cpp @@ -446,8 +446,9 @@ SHAMapItem::pointer SHAMap::peekNextItem(const uint256& id, SHAMapTreeNode::TNTy return no_item; } +// Get a pointer to the previous item in the tree after a given item - item must be in tree SHAMapItem::pointer SHAMap::peekPrevItem(const uint256& id) -{ // Get a pointer to the previous item in the tree after a given item - item must be in tree +{ boost::recursive_mutex::scoped_lock sl(mLock); std::stack stack = getStack(id, true, false); @@ -461,17 +462,25 @@ SHAMapItem::pointer SHAMap::peekPrevItem(const uint256& id) if (node->peekItem()->getTag() < id) return node->peekItem(); } - else for (int i = node->selectBranch(id) - 1; i >= 0; --i) - if (!node->isEmptyBranch(i)) - { - node = getNode(node->getChildNodeID(i), node->getChildHash(i), false); - SHAMapTreeNode* item = firstBelow(node.get()); - if (!item) - throw std::runtime_error("missing node"); - return item->peekItem(); - } + else + { + for (int i = node->selectBranch(id) - 1; i >= 0; --i) + { + if (!node->isEmptyBranch(i)) + { + node = getNode(node->getChildNodeID(i), node->getChildHash(i), false); + SHAMapTreeNode* item = firstBelow(node.get()); + + if (!item) + throw std::runtime_error("missing node"); + + return item->peekItem(); + } + } + } } - // must be last item + + // must be last item return no_item; }