From 62354350a354badf2daf3c58a3febbb30dc346bc Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Wed, 9 Apr 2014 20:38:23 -0700 Subject: [PATCH] Refactor ripple_overlay: * Use rvalue move to receive accepted sockets * Split asio dependent APIs to their own class and file * Update documentation * Organize code into different files * Make some members private * Rename things for clarity --- Builds/VisualStudio2013/RippleD.vcxproj | 13 +- .../VisualStudio2013/RippleD.vcxproj.filters | 42 +- src/ripple/peerfinder/README.md | 15 +- src/ripple/peerfinder/api/Callback.h | 2 + src/ripple/peerfinder/api/Manager.h | 3 + src/ripple/peerfinder/api/Slot.h | 2 + src/ripple/peerfinder/api/Types.h | 2 + src/ripple/resource/api/Consumer.h | 3 + src/ripple/resource/api/Manager.h | 3 + src/ripple/sitefiles/api/Listener.h | 2 + src/ripple/sitefiles/api/Manager.h | 2 + src/ripple/sitefiles/api/Section.h | 5 +- src/ripple/sitefiles/api/SiteFile.h | 7 +- src/ripple/types/api/RippleAccountID.h | 2 + src/ripple/types/api/RipplePublicKeyHash.h | 2 + src/ripple/types/api/UInt160.h | 1 + src/ripple/types/api/UInt256.h | 2 + src/ripple/types/api/base_uint.h | 2 + src/ripple_app/consensus/LedgerConsensus.cpp | 40 +- src/ripple_app/consensus/LedgerConsensus.h | 4 +- src/ripple_app/ledger/InboundLedger.cpp | 20 +- src/ripple_app/ledger/InboundLedger.h | 4 +- src/ripple_app/ledger/LedgerMaster.cpp | 8 +- src/ripple_app/main/Application.cpp | 13 +- src/ripple_app/main/Application.h | 4 +- src/ripple_app/misc/NetworkOPs.cpp | 44 +- src/ripple_app/peers/PeerSet.cpp | 18 +- src/ripple_app/peers/PeerSet.h | 12 +- src/ripple_app/ripple_app.h | 5 +- src/ripple_app/rpc/RPCHandler.cpp | 6 +- src/ripple_app/tx/TransactionAcquire.cpp | 14 +- src/ripple_app/tx/TransactionAcquire.h | 6 +- src/ripple_core/nodestore/api/NodeObject.h | 8 - src/ripple_data/crypto/Base58Data.h | 2 + src/ripple_data/protocol/Protocol.h | 2 + src/ripple_data/protocol/RippleAddress.h | 12 + src/ripple_data/protocol/ripple.proto | 2 +- src/ripple_overlay/README.md | 16 +- .../api/{PackedMessage.h => Message.h} | 33 +- src/ripple_overlay/api/Overlay.h | 131 ++++ src/ripple_overlay/api/Peer.h | 99 ++- src/ripple_overlay/api/Peers.h | 300 --------- src/ripple_overlay/api/make_Overlay.h | 62 ++ src/ripple_overlay/api/predicates.h | 174 +++++ .../impl/{PackedMessage.cpp => Message.cpp} | 36 +- src/ripple_overlay/impl/MessageStream.h | 43 ++ src/ripple_overlay/impl/OverlayImpl.cpp | 598 ++++++++++++++++ src/ripple_overlay/impl/OverlayImpl.h | 226 +++++++ src/ripple_overlay/impl/PeerDoor.cpp | 87 +-- src/ripple_overlay/impl/PeerDoor.h | 16 +- src/ripple_overlay/impl/PeerImp.h | 303 +++++---- src/ripple_overlay/impl/Peers.cpp | 637 ------------------ src/ripple_overlay/ripple_overlay.cpp | 13 +- src/ripple_overlay/ripple_overlay.h | 7 - 54 files changed, 1728 insertions(+), 1387 deletions(-) rename src/ripple_overlay/api/{PackedMessage.h => Message.h} (70%) create mode 100644 src/ripple_overlay/api/Overlay.h delete mode 100644 src/ripple_overlay/api/Peers.h create mode 100644 src/ripple_overlay/api/make_Overlay.h create mode 100644 src/ripple_overlay/api/predicates.h rename src/ripple_overlay/impl/{PackedMessage.cpp => Message.cpp} (61%) create mode 100644 src/ripple_overlay/impl/MessageStream.h create mode 100644 src/ripple_overlay/impl/OverlayImpl.cpp create mode 100644 src/ripple_overlay/impl/OverlayImpl.h delete mode 100644 src/ripple_overlay/impl/Peers.cpp diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj index c0449e29e..6659a016e 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj +++ b/Builds/VisualStudio2013/RippleD.vcxproj @@ -1854,7 +1854,7 @@ true true - + true true true @@ -1866,7 +1866,7 @@ true true - + true true true @@ -2683,12 +2683,15 @@ - + + - + + + + - diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters index f043ee5ac..48f8b37cb 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters @@ -1413,12 +1413,6 @@ [2] Old Ripple\ripple_overlay\impl - - [2] Old Ripple\ripple_overlay\impl - - - [2] Old Ripple\ripple_overlay\impl - [2] Old Ripple\ripple_overlay @@ -1509,15 +1503,18 @@ [2] Old Ripple\ripple_app\tx - - [1] Ripple\common\tests - [2] Old Ripple\ripple_app\book\tests [2] Old Ripple\ripple_app\book\tests + + [2] Old Ripple\ripple_overlay\impl + + + [2] Old Ripple\ripple_overlay\impl + @@ -2984,15 +2981,6 @@ [2] Old Ripple\ripple_overlay\api - - [2] Old Ripple\ripple_overlay\api - - - [2] Old Ripple\ripple_overlay\api - - - [2] Old Ripple\ripple_overlay - [1] Ripple\peerfinder\impl @@ -3105,6 +3093,24 @@ [2] Old Ripple\ripple_app\book + + [2] Old Ripple\ripple_overlay\api + + + [2] Old Ripple\ripple_overlay\impl + + + [2] Old Ripple\ripple_overlay\api + + + [2] Old Ripple\ripple_overlay\impl + + + [2] Old Ripple\ripple_overlay\api + + + [2] Old Ripple\ripple_overlay\api + diff --git a/src/ripple/peerfinder/README.md b/src/ripple/peerfinder/README.md index 092425698..17b2aa715 100644 --- a/src/ripple/peerfinder/README.md +++ b/src/ripple/peerfinder/README.md @@ -3,12 +3,15 @@ ## Introduction -Each _peer_ (a running instance of the **rippled** program) on the Ripple network -maintains multiple TCP/IP connections to other peers (neighbors) who themselves -have neighboring peers. The resulting network is called a _peer to peer overlay -network_, or just [_overlay network_][overlay_network]. Messages passed along these -connections travel between peers and implement the communication layer of the -_Ripple peer protocol_. +The _Ripple payment network_ consists of a collection of _peers_ running the +**rippled software**. Each peer maintains multiple outgoing connections and +optional incoming connections to other peers. These connections are made over +both the public Internet and private local area networks. This network defines +a fully connected directed graph of nodes. Peers send and receive messages to +other connected peers. This peer to peer network, layered on top of the public +and private Internet, forms an [_overlay network_][overlay_network]. + +## Bootstrapping When a peer comes online it needs a set of IP addresses to connect to in order to gain initial entry into the overlay in a process called _bootstrapping_. Once they diff --git a/src/ripple/peerfinder/api/Callback.h b/src/ripple/peerfinder/api/Callback.h index 592f41b61..28a196f89 100644 --- a/src/ripple/peerfinder/api/Callback.h +++ b/src/ripple/peerfinder/api/Callback.h @@ -20,6 +20,8 @@ #ifndef RIPPLE_PEERFINDER_CALLBACK_H_INCLUDED #define RIPPLE_PEERFINDER_CALLBACK_H_INCLUDED +#include "Types.h" + namespace ripple { namespace PeerFinder { diff --git a/src/ripple/peerfinder/api/Manager.h b/src/ripple/peerfinder/api/Manager.h index 21bb066fb..5044d0a5b 100644 --- a/src/ripple/peerfinder/api/Manager.h +++ b/src/ripple/peerfinder/api/Manager.h @@ -20,9 +20,12 @@ #ifndef RIPPLE_PEERFINDER_MANAGER_H_INCLUDED #define RIPPLE_PEERFINDER_MANAGER_H_INCLUDED +#include "Config.h" #include "Slot.h" #include "Types.h" +#include "../../sitefiles/api/Manager.h" + #include "../../beast/modules/beast_core/files/File.h" namespace ripple { diff --git a/src/ripple/peerfinder/api/Slot.h b/src/ripple/peerfinder/api/Slot.h index 2e25db132..5dbf823f7 100644 --- a/src/ripple/peerfinder/api/Slot.h +++ b/src/ripple/peerfinder/api/Slot.h @@ -20,6 +20,8 @@ #ifndef RIPPLE_PEERFINDER_SLOT_H_INCLUDED #define RIPPLE_PEERFINDER_SLOT_H_INCLUDED +#include "../../types/api/RipplePublicKey.h" + #include "../../beast/beast/net/IPEndpoint.h" #include diff --git a/src/ripple/peerfinder/api/Types.h b/src/ripple/peerfinder/api/Types.h index 4a846a999..aa19f3446 100644 --- a/src/ripple/peerfinder/api/Types.h +++ b/src/ripple/peerfinder/api/Types.h @@ -20,6 +20,8 @@ #ifndef RIPPLE_PEERFINDER_TYPES_H_INCLUDED #define RIPPLE_PEERFINDER_TYPES_H_INCLUDED +#include "Endpoint.h" + #include "../../beast/beast/chrono/abstract_clock.h" namespace ripple { diff --git a/src/ripple/resource/api/Consumer.h b/src/ripple/resource/api/Consumer.h index b0e249bba..4f827e476 100644 --- a/src/ripple/resource/api/Consumer.h +++ b/src/ripple/resource/api/Consumer.h @@ -20,9 +20,12 @@ #ifndef RIPPLE_RESOURCE_CONSUMER_H_INCLUDED #define RIPPLE_RESOURCE_CONSUMER_H_INCLUDED +#include "Disposition.h" + namespace ripple { namespace Resource { +struct Entry; class Logic; /** An endpoint that consumes resources. */ diff --git a/src/ripple/resource/api/Manager.h b/src/ripple/resource/api/Manager.h index 745ee2c76..9c8b1af28 100644 --- a/src/ripple/resource/api/Manager.h +++ b/src/ripple/resource/api/Manager.h @@ -20,6 +20,9 @@ #ifndef RIPPLE_RESOURCE_MANAGER_H_INCLUDED #define RIPPLE_RESOURCE_MANAGER_H_INCLUDED +#include "Consumer.h" +#include "Gossip.h" + #include "../../beast/beast/insight/Collector.h" #include "../../beast/beast/net/IPEndpoint.h" #include "../../beast/beast/utility/Journal.h" diff --git a/src/ripple/sitefiles/api/Listener.h b/src/ripple/sitefiles/api/Listener.h index 0102c1954..a55281d2b 100644 --- a/src/ripple/sitefiles/api/Listener.h +++ b/src/ripple/sitefiles/api/Listener.h @@ -20,6 +20,8 @@ #ifndef RIPPLE_SITEFILES_LISTENER_H_INCLUDED #define RIPPLE_SITEFILES_LISTENER_H_INCLUDED +#include "SiteFile.h" + namespace ripple { namespace SiteFiles { diff --git a/src/ripple/sitefiles/api/Manager.h b/src/ripple/sitefiles/api/Manager.h index 04838d440..a3a693ffc 100644 --- a/src/ripple/sitefiles/api/Manager.h +++ b/src/ripple/sitefiles/api/Manager.h @@ -20,6 +20,8 @@ #ifndef RIPPLE_SITEFILES_MANAGER_H_INCLUDED #define RIPPLE_SITEFILES_MANAGER_H_INCLUDED +#include "Listener.h" + #include "../../beast/beast/utility/PropertyStream.h" namespace ripple { diff --git a/src/ripple/sitefiles/api/Section.h b/src/ripple/sitefiles/api/Section.h index a777ec298..8eb99740b 100644 --- a/src/ripple/sitefiles/api/Section.h +++ b/src/ripple/sitefiles/api/Section.h @@ -20,6 +20,9 @@ #ifndef RIPPLE_SITEFILES_SECTION_H_INCLUDED #define RIPPLE_SITEFILES_SECTION_H_INCLUDED +#include +#include + namespace ripple { namespace SiteFiles { @@ -30,7 +33,7 @@ namespace SiteFiles { class Section { public: - typedef boost::unordered_map MapType; + typedef std::unordered_map MapType; typedef std::vector DataType; Section(int = 0); // dummy argument for emplace() diff --git a/src/ripple/sitefiles/api/SiteFile.h b/src/ripple/sitefiles/api/SiteFile.h index 421bf642f..3294748d4 100644 --- a/src/ripple/sitefiles/api/SiteFile.h +++ b/src/ripple/sitefiles/api/SiteFile.h @@ -20,6 +20,11 @@ #ifndef RIPPLE_SITEFILES_SITEFILE_H_INCLUDED #define RIPPLE_SITEFILES_SITEFILE_H_INCLUDED +#include "Section.h" + +#include +#include + namespace ripple { namespace SiteFiles { @@ -28,7 +33,7 @@ class SiteFile public: SiteFile (int = 0); // dummy argument for emplace - typedef boost::unordered_map SectionsType; + typedef std::unordered_map SectionsType; /** Retrieve a section by name. */ /** @{ */ diff --git a/src/ripple/types/api/RippleAccountID.h b/src/ripple/types/api/RippleAccountID.h index 455e4b791..bff1e3f57 100644 --- a/src/ripple/types/api/RippleAccountID.h +++ b/src/ripple/types/api/RippleAccountID.h @@ -21,6 +21,8 @@ #define RIPPLE_TYPES_RIPPLEACCOUNTID_H_INCLUDED #include "CryptoIdentifier.h" +#include "IdentifierType.h" + #include namespace ripple { diff --git a/src/ripple/types/api/RipplePublicKeyHash.h b/src/ripple/types/api/RipplePublicKeyHash.h index 4e74a18f6..8068b9dbb 100644 --- a/src/ripple/types/api/RipplePublicKeyHash.h +++ b/src/ripple/types/api/RipplePublicKeyHash.h @@ -20,6 +20,8 @@ #ifndef RIPPLE_TYPES_RIPPLEPUBLICKEYHASH_H_INCLUDED #define RIPPLE_TYPES_RIPPLEPUBLICKEYHASH_H_INCLUDED +#include "SimpleIdentifier.h" + namespace ripple { /** Traits for the public key hash. */ diff --git a/src/ripple/types/api/UInt160.h b/src/ripple/types/api/UInt160.h index 106af5749..2fdcd6d19 100644 --- a/src/ripple/types/api/UInt160.h +++ b/src/ripple/types/api/UInt160.h @@ -26,6 +26,7 @@ #define RIPPLE_TYPES_UINT160_H_INCLUDED #include "base_uint.h" +#include "strHex.h" namespace ripple { diff --git a/src/ripple/types/api/UInt256.h b/src/ripple/types/api/UInt256.h index c3d4dacc0..3e725b351 100644 --- a/src/ripple/types/api/UInt256.h +++ b/src/ripple/types/api/UInt256.h @@ -26,6 +26,8 @@ #define RIPPLE_TYPES_UINT256_H_INCLUDED #include "base_uint.h" +#include "Blob.h" +#include "ByteOrder.h" #include diff --git a/src/ripple/types/api/base_uint.h b/src/ripple/types/api/base_uint.h index 21ad82c5d..04ace2ae3 100644 --- a/src/ripple/types/api/base_uint.h +++ b/src/ripple/types/api/base_uint.h @@ -25,6 +25,8 @@ #ifndef RIPPLE_TYPES_BASE_UINT_H_INCLUDED #define RIPPLE_TYPES_BASE_UINT_H_INCLUDED +#include "strHex.h" + #include "../../beast/beast/container/hardened_hash.h" #include diff --git a/src/ripple_app/consensus/LedgerConsensus.cpp b/src/ripple_app/consensus/LedgerConsensus.cpp index f1f167585..2a9a5982f 100644 --- a/src/ripple_app/consensus/LedgerConsensus.cpp +++ b/src/ripple_app/consensus/LedgerConsensus.cpp @@ -17,6 +17,8 @@ */ //============================================================================== +#include "../../ripple_overlay/api/predicates.h" + namespace ripple { SETUP_LOG (LedgerConsensus) @@ -786,7 +788,7 @@ public: /** A peer has informed us that it can give us a transaction set */ - bool peerHasSet (Peer::ref peer, uint256 const& hashSet + bool peerHasSet (Peer::ptr const& peer, uint256 const& hashSet , protocol::TxSetStatus status) { if (status != protocol::tsHAVE) // Indirect requests for future support @@ -811,7 +813,7 @@ public: /** A peer has sent us some nodes from a transaction set */ - SHAMapAddNode peerGaveNodes (Peer::ref peer + SHAMapAddNode peerGaveNodes (Peer::ptr const& peer , uint256 const& setHash, const std::list& nodeIDs , const std::list< Blob >& nodeData) { @@ -963,8 +965,8 @@ private: Blob validation = v->getSigned (); protocol::TMValidation val; val.set_validation (&validation[0], validation.size ()); - getApp ().getPeers ().foreach (send_always ( - boost::make_shared ( + getApp ().overlay ().foreach (send_always ( + boost::make_shared ( val, protocol::mtVALIDATION))); WriteLog (lsINFO, LedgerConsensus) << "CNF Val " << newLCLHash; @@ -1076,7 +1078,7 @@ private: while (pit != peerList.end ()) { - Peer::pointer pr = pit->lock (); + Peer::ptr pr = pit->lock (); if (!pr) { @@ -1100,14 +1102,14 @@ private: : acquire(acq) { } - return_type operator() (Peer::ref peer) const + return_type operator() (Peer::ptr const& peer) const { if (peer->hasTxSet (acquire->getHash ())) acquire->peerHas (peer); } }; - getApp().getPeers ().foreach (build_acquire_list (acquire)); + getApp().overlay ().foreach (build_acquire_list (acquire)); acquire->setTimer (); } @@ -1201,8 +1203,8 @@ private: msg.set_rawtransaction (& (tx.front ()), tx.size ()); msg.set_status (protocol::tsNEW); msg.set_receivetimestamp (getApp().getOPs ().getNetworkTimeNC ()); - getApp ().getPeers ().foreach (send_always ( - boost::make_shared ( + getApp ().overlay ().foreach (send_always ( + boost::make_shared ( msg, protocol::mtTRANSACTION))); } } @@ -1240,8 +1242,8 @@ private: Blob sig = mOurPosition->sign (); prop.set_nodepubkey (&pubKey[0], pubKey.size ()); prop.set_signature (&sig[0], sig.size ()); - getApp ().getPeers ().foreach (send_always ( - boost::make_shared ( + getApp ().overlay ().foreach (send_always ( + boost::make_shared ( prop, protocol::mtPROPOSE_LEDGER))); } @@ -1253,8 +1255,8 @@ private: protocol::TMHaveTransactionSet msg; msg.set_hash (hash.begin (), 256 / 8); msg.set_status (direct ? protocol::tsHAVE : protocol::tsCAN_GET); - getApp ().getPeers ().foreach (send_always ( - boost::make_shared ( + getApp ().overlay ().foreach (send_always ( + boost::make_shared ( msg, protocol::mtHAVE_SET))); } @@ -1458,8 +1460,8 @@ private: } s.set_firstseq (uMin); s.set_lastseq (uMax); - getApp ().getPeers ().foreach (send_always ( - boost::make_shared ( + getApp ().overlay ().foreach (send_always ( + boost::make_shared ( s, protocol::mtSTATUS_CHANGE))); WriteLog (lsTRACE, LedgerConsensus) << "send status change to peer"; } @@ -1753,8 +1755,8 @@ private: closetime nodepubkey signature - getApp ().getPeers ().foreach (send_if_not ( - boost::make_shared ( + getApp ().overlay ().foreach (send_if_not ( + boost::make_shared ( set, protocol::mtPROPOSE_LEDGER), peer_in_set(peers))); } @@ -1819,8 +1821,8 @@ private: protocol::TMValidation val; val.set_validation (&validation[0], validation.size ()); #if 0 - getApp ().getPeers ().visit (RelayMessage ( - boost::make_shared ( + getApp ().overlay ().visit (RelayMessage ( + boost::make_shared ( val, protocol::mtVALIDATION))); #endif getApp().getOPs ().setLastValidation (v); diff --git a/src/ripple_app/consensus/LedgerConsensus.h b/src/ripple_app/consensus/LedgerConsensus.h index 2ca1bac53..50c5b2e1c 100644 --- a/src/ripple_app/consensus/LedgerConsensus.h +++ b/src/ripple_app/consensus/LedgerConsensus.h @@ -71,10 +71,10 @@ public: virtual bool peerPosition (LedgerProposal::ref) = 0; - virtual bool peerHasSet (Peer::ref peer, uint256 const & set, + virtual bool peerHasSet (Peer::ptr const& peer, uint256 const & set, protocol::TxSetStatus status) = 0; - virtual SHAMapAddNode peerGaveNodes (Peer::ref peer, + virtual SHAMapAddNode peerGaveNodes (Peer::ptr const& peer, uint256 const & setHash, const std::list& nodeIDs, const std::list< Blob >& nodeData) = 0; diff --git a/src/ripple_app/ledger/InboundLedger.cpp b/src/ripple_app/ledger/InboundLedger.cpp index 68d4dd1f0..35be31cea 100644 --- a/src/ripple_app/ledger/InboundLedger.cpp +++ b/src/ripple_app/ledger/InboundLedger.cpp @@ -17,6 +17,8 @@ */ //============================================================================== +#include "../../ripple_overlay/api/Overlay.h" + namespace ripple { //SETUP_LOG (InboundLedger) @@ -90,7 +92,7 @@ void InboundLedger::init (ScopedLockType& collectionLock) // For historical nodes, wait a bit since a // fetch pack is probably coming if (mReason != fcHISTORY) - trigger (Peer::pointer ()); + trigger (Peer::ptr ()); } else if (!isFailed ()) { @@ -261,7 +263,7 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&) "No progress(" << pc << ") for ledger " << mHash; - trigger (Peer::pointer ()); + trigger (Peer::ptr ()); if (pc < 4) addPeers (); } @@ -270,7 +272,7 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&) /** Add more peers to the set, if possible */ void InboundLedger::addPeers () { - Peers::PeerSequence peerList = getApp().getPeers ().getActivePeers (); + Overlay::PeerSequence peerList = getApp().overlay ().getActivePeers (); int vSize = peerList.size (); @@ -297,7 +299,7 @@ void InboundLedger::addPeers () // First look for peers that are likely to have this ledger for (int i = 0; i < vSize; ++i) { - Peer::ref peer = peerList[ (i + firstPeer) % vSize]; + Peer::ptr const& peer = peerList[ (i + firstPeer) % vSize]; if (peer->hasLedger (getHash (), mSeq)) { @@ -406,7 +408,7 @@ bool InboundLedger::addOnComplete ( /** Request more nodes, perhaps from a specific peer */ -void InboundLedger::trigger (Peer::ref peer) +void InboundLedger::trigger (Peer::ptr const& peer) { ScopedLockType sl (mLock); @@ -484,7 +486,7 @@ void InboundLedger::trigger (Peer::ref peer) } } - PackedMessage::pointer packet (boost::make_shared ( + Message::pointer packet (boost::make_shared ( tmBH, protocol::mtGET_OBJECTS)); { ScopedLockType sl (mLock); @@ -492,8 +494,8 @@ void InboundLedger::trigger (Peer::ref peer) for (PeerSetMap::iterator it = mPeers.begin (), end = mPeers.end (); it != end; ++it) { - Peer::pointer iPeer ( - getApp().getPeers ().findPeerByShortID (it->first)); + Peer::ptr iPeer ( + getApp().overlay ().findPeerByShortID (it->first)); if (iPeer) { @@ -1191,7 +1193,7 @@ void InboundLedger::runData () // breaking ties in favor of the peer that responded first. BOOST_FOREACH (PeerDataPairType& entry, data) { - Peer::pointer peer = entry.first.lock(); + Peer::ptr peer = entry.first.lock(); if (peer) { int count = processData (peer, *(entry.second)); diff --git a/src/ripple_app/ledger/InboundLedger.h b/src/ripple_app/ledger/InboundLedger.h index 7bc0ce45f..f278140a6 100644 --- a/src/ripple_app/ledger/InboundLedger.h +++ b/src/ripple_app/ledger/InboundLedger.h @@ -82,7 +82,7 @@ public: // VFALCO TODO Make this the Listener / Observer pattern bool addOnComplete (std::function); - void trigger (Peer::ref); + void trigger (Peer::ptr const&); bool tryLocal (); void addPeers (); bool checkLocal (); @@ -106,7 +106,7 @@ private: void onTimer (bool progress, ScopedLockType& peerSetLock); - void newPeer (Peer::ref peer) + void newPeer (Peer::ptr const& peer) { trigger (peer); } diff --git a/src/ripple_app/ledger/LedgerMaster.cpp b/src/ripple_app/ledger/LedgerMaster.cpp index d1f1ad36f..5580b8e86 100644 --- a/src/ripple_app/ledger/LedgerMaster.cpp +++ b/src/ripple_app/ledger/LedgerMaster.cpp @@ -512,11 +512,11 @@ public: */ void getFetchPack (Ledger::ref nextLedger) { - Peer::pointer target; + Peer::ptr target; int count = 0; - Peers::PeerSequence peerList = getApp().getPeers ().getActivePeers (); - BOOST_FOREACH (const Peer::pointer & peer, peerList) + Overlay::PeerSequence peerList = getApp().overlay ().getActivePeers (); + BOOST_FOREACH (const Peer::ptr & peer, peerList) { if (peer->hasRange (nextLedger->getLedgerSeq() - 1, nextLedger->getLedgerSeq())) { @@ -533,7 +533,7 @@ public: tmBH.set_query (true); tmBH.set_type (protocol::TMGetObjectByHash::otFETCH_PACK); tmBH.set_ledgerhash (nextLedger->getHash().begin (), 32); - PackedMessage::pointer packet = boost::make_shared (tmBH, protocol::mtGET_OBJECTS); + Message::pointer packet = boost::make_shared (tmBH, protocol::mtGET_OBJECTS); target->sendPacket (packet, false); WriteLog (lsTRACE, LedgerMaster) << "Requested fetch pack for " << nextLedger->getLedgerSeq() - 1; diff --git a/src/ripple_app/main/Application.cpp b/src/ripple_app/main/Application.cpp index 49216c615..f9a5d72b1 100644 --- a/src/ripple_app/main/Application.cpp +++ b/src/ripple_app/main/Application.cpp @@ -19,6 +19,7 @@ #include "../ripple/common/seconds_clock.h" #include "../ripple_rpc/api/Manager.h" +#include "../ripple_overlay/api/make_Overlay.h" #include "Tuning.h" @@ -153,7 +154,7 @@ public: std::unique_ptr m_peerSSLContext; std::unique_ptr m_wsSSLContext; - std::unique_ptr m_peers; + std::unique_ptr m_peers; std::unique_ptr m_rpcDoor; std::unique_ptr m_wsPublicDoor; std::unique_ptr m_wsPrivateDoor; @@ -482,7 +483,7 @@ public: return *mProofOfWorkFactory; } - Peers& getPeers () + Overlay& overlay () { return *m_peers; } @@ -682,13 +683,15 @@ public: } // VFALCO NOTE Unfortunately, in stand-alone mode some code still - // foolishly calls getPeers(). When this is fixed we can + // foolishly calls overlay(). When this is fixed we can // move the instantiation inside a conditional: // // if (!getConfig ().RUN_STANDALONE) - m_peers.reset (add (Peers::New (m_mainIoPool, *m_resourceManager, + m_peers = make_Overlay (m_mainIoPool, *m_resourceManager, *m_siteFiles, getConfig ().getModuleDatabasePath (), - *m_resolver, m_mainIoPool, m_peerSSLContext->get ()))); + *m_resolver, m_mainIoPool, m_peerSSLContext->get ()); + // add to Stoppable + add (*m_peers); // SSL context used for WebSocket connections. if (getConfig ().WEBSOCKET_SECURE) diff --git a/src/ripple_app/main/Application.h b/src/ripple_app/main/Application.h index 29946237e..b05e366d7 100644 --- a/src/ripple_app/main/Application.h +++ b/src/ripple_app/main/Application.h @@ -36,7 +36,7 @@ class IFeatures; class IFeeVote; class IHashRouter; class LoadFeeTrack; -class Peers; +class Overlay; class UniqueNodeList; class JobQueue; class InboundLedgers; @@ -94,7 +94,7 @@ public: virtual IHashRouter& getHashRouter () = 0; virtual LoadFeeTrack& getFeeTrack () = 0; virtual LoadManager& getLoadManager () = 0; - virtual Peers& getPeers () = 0; + virtual Overlay& overlay () = 0; virtual ProofOfWorkFactory& getProofOfWorkFactory () = 0; virtual UniqueNodeList& getUNL () = 0; virtual Validations& getValidations () = 0; diff --git a/src/ripple_app/misc/NetworkOPs.cpp b/src/ripple_app/misc/NetworkOPs.cpp index c2d0fbd9a..87a4db20b 100644 --- a/src/ripple_app/misc/NetworkOPs.cpp +++ b/src/ripple_app/misc/NetworkOPs.cpp @@ -17,6 +17,8 @@ */ //============================================================================== +#include "../../ripple_overlay/api/predicates.h" + #include "../../beast/modules/beast_core/thread/DeadlineTimer.h" #include "../../beast/modules/beast_core/system/SystemStats.h" @@ -245,7 +247,7 @@ public: // VFALCO TODO Try to make all these private since they seem to be...private // void switchLastClosedLedger (Ledger::pointer newLedger, bool duringConsensus); // Used for the "jump" case - bool checkLastClosedLedger (const Peers::PeerSequence&, uint256& networkClosed); + bool checkLastClosedLedger (const Overlay::PeerSequence&, uint256& networkClosed); int beginConsensus (uint256 const& networkClosed, Ledger::pointer closingLedger); void tryStartConsensus (); void endConsensus (bool correctLCL); @@ -539,7 +541,7 @@ void NetworkOPsImp::processHeartbeatTimer () LoadManager& mgr (app.getLoadManager ()); mgr.resetDeadlockDetector (); - std::size_t const numPeers = getApp().getPeers ().size (); + std::size_t const numPeers = getApp().overlay ().size (); // do we have sufficient peers? If not, we are disconnected. if (numPeers < getConfig ().NETWORK_QUORUM) @@ -614,8 +616,8 @@ void NetworkOPsImp::processClusterTimer () node.set_name (to_string (item.address)); node.set_cost (item.balance); } - getApp ().getPeers ().foreach (send_if ( - boost::make_shared(cluster, protocol::mtCLUSTER), + getApp ().overlay ().foreach (send_if ( + boost::make_shared(cluster, protocol::mtCLUSTER), peer_in_cluster ())); setClusterTimer (); } @@ -889,8 +891,8 @@ void NetworkOPsImp::runTransactionQueue () tx.set_rawtransaction (&s.getData ().front (), s.getLength ()); tx.set_status (protocol::tsCURRENT); tx.set_receivetimestamp (getNetworkTimeNC ()); // FIXME: This should be when we received it - getApp ().getPeers ().foreach (send_if_not ( - boost::make_shared (tx, protocol::mtTRANSACTION), + getApp ().overlay ().foreach (send_if_not ( + boost::make_shared (tx, protocol::mtTRANSACTION), peer_in_set(peers))); } else @@ -1020,8 +1022,8 @@ Transaction::pointer NetworkOPsImp::processTransactionCb ( tx.set_rawtransaction (&s.getData ().front (), s.getLength ()); tx.set_status (protocol::tsCURRENT); tx.set_receivetimestamp (getNetworkTimeNC ()); // FIXME: This should be when we received it - getApp ().getPeers ().foreach (send_if_not ( - boost::make_shared (tx, protocol::mtTRANSACTION), + getApp ().overlay ().foreach (send_if_not ( + boost::make_shared (tx, protocol::mtTRANSACTION), peer_in_set(peers))); } } @@ -1221,7 +1223,7 @@ public: void NetworkOPsImp::tryStartConsensus () { uint256 networkClosed; - bool ledgerChange = checkLastClosedLedger (getApp().getPeers ().getActivePeers (), networkClosed); + bool ledgerChange = checkLastClosedLedger (getApp().overlay ().getActivePeers (), networkClosed); if (networkClosed.isZero ()) return; @@ -1253,7 +1255,7 @@ void NetworkOPsImp::tryStartConsensus () beginConsensus (networkClosed, m_ledgerMaster.getCurrentLedger ()); } -bool NetworkOPsImp::checkLastClosedLedger (const Peers::PeerSequence& peerList, uint256& networkClosed) +bool NetworkOPsImp::checkLastClosedLedger (const Overlay::PeerSequence& peerList, uint256& networkClosed) { // Returns true if there's an *abnormal* ledger issue, normal changing in TRACKING mode should return false // Do we have sufficient validations for our last closed ledger? Or do sufficient nodes @@ -1298,7 +1300,7 @@ bool NetworkOPsImp::checkLastClosedLedger (const Peers::PeerSequence& peerList, ourVC.highNodeUsing = ourAddress; } - BOOST_FOREACH (Peer::ref peer, peerList) + BOOST_FOREACH (Peer::ptr const& peer, peerList) { uint256 peerLedger = peer->getClosedLedgerHash (); @@ -1430,8 +1432,8 @@ void NetworkOPsImp::switchLastClosedLedger (Ledger::pointer newLedger, bool duri hash = newLedger->getHash (); s.set_ledgerhash (hash.begin (), hash.size ()); - getApp ().getPeers ().foreach (send_always ( - boost::make_shared (s, protocol::mtSTATUS_CHANGE))); + getApp ().overlay ().foreach (send_always ( + boost::make_shared (s, protocol::mtSTATUS_CHANGE))); } int NetworkOPsImp::beginConsensus (uint256 const& networkClosed, Ledger::pointer closingLedger) @@ -1481,7 +1483,7 @@ bool NetworkOPsImp::haveConsensusObject () { // we need to get into the consensus process uint256 networkClosed; - Peers::PeerSequence peerList = getApp().getPeers ().getActivePeers (); + Overlay::PeerSequence peerList = getApp().overlay ().getActivePeers (); bool ledgerChange = checkLastClosedLedger (peerList, networkClosed); if (!ledgerChange) @@ -1549,8 +1551,8 @@ void NetworkOPsImp::processTrustedProposal (LedgerProposal::pointer proposal, if (getApp().getHashRouter ().swapSet ( proposal->getSuppressionID (), peers, SF_RELAYED)) { - getApp ().getPeers ().foreach (send_if_not ( - boost::make_shared (*set, protocol::mtPROPOSE_LEDGER), + getApp ().overlay ().foreach (send_if_not ( + boost::make_shared (*set, protocol::mtPROPOSE_LEDGER), peer_in_set(peers))); } } @@ -1640,9 +1642,9 @@ void NetworkOPsImp::endConsensus (bool correctLCL) { uint256 deadLedger = m_ledgerMaster.getClosedLedger ()->getParentHash (); - std::vector peerList = getApp().getPeers ().getActivePeers (); + std::vector peerList = getApp().overlay ().getActivePeers (); - BOOST_FOREACH (Peer::ref it, peerList) + BOOST_FOREACH (Peer::ptr const& it, peerList) { if (it && (it->getClosedLedgerHash () == deadLedger)) { @@ -2203,7 +2205,7 @@ Json::Value NetworkOPsImp::getServerInfo (bool human, bool admin) if (fp != 0) info["fetch_pack"] = Json::UInt (fp); - info["peers"] = Json::UInt (getApp ().getPeers ().size ()); + info["peers"] = Json::UInt (getApp ().overlay ().size ()); Json::Value lastClose = Json::objectValue; lastClose["proposers"] = getApp().getOPs ().getPreviousProposers (); @@ -3193,7 +3195,7 @@ void NetworkOPsImp::makeFetchPack (Job&, boost::weak_ptr wPeer, try { - Peer::pointer peer = wPeer.lock (); + Peer::ptr peer = wPeer.lock (); if (!peer) return; @@ -3236,7 +3238,7 @@ void NetworkOPsImp::makeFetchPack (Job&, boost::weak_ptr wPeer, while (wantLedger && (UptimeTimer::getInstance ().getElapsedSeconds () <= (uUptime + 1))); m_journal.info << "Built fetch pack with " << reply.objects ().size () << " nodes"; - PackedMessage::pointer msg = boost::make_shared (reply, protocol::mtGET_OBJECTS); + Message::pointer msg = boost::make_shared (reply, protocol::mtGET_OBJECTS); peer->sendPacket (msg, false); } catch (...) diff --git a/src/ripple_app/peers/PeerSet.cpp b/src/ripple_app/peers/PeerSet.cpp index e9f083605..582adb1fe 100644 --- a/src/ripple_app/peers/PeerSet.cpp +++ b/src/ripple_app/peers/PeerSet.cpp @@ -17,6 +17,8 @@ */ //============================================================================== +#include"../../ripple_overlay/api/Overlay.h" + namespace ripple { class InboundLedger; @@ -50,7 +52,7 @@ PeerSet::~PeerSet () { } -bool PeerSet::peerHas (Peer::ref ptr) +bool PeerSet::peerHas (Peer::ptr const& ptr) { ScopedLockType sl (mLock); @@ -61,7 +63,7 @@ bool PeerSet::peerHas (Peer::ref ptr) return true; } -void PeerSet::badPeer (Peer::ref ptr) +void PeerSet::badPeer (Peer::ptr const& ptr) { ScopedLockType sl (mLock); mPeers.erase (ptr->getShortId ()); @@ -141,12 +143,12 @@ bool PeerSet::isActive () return !isDone (); } -void PeerSet::sendRequest (const protocol::TMGetLedger& tmGL, Peer::ref peer) +void PeerSet::sendRequest (const protocol::TMGetLedger& tmGL, Peer::ptr const& peer) { if (!peer) sendRequest (tmGL); else - peer->sendPacket (boost::make_shared (tmGL, protocol::mtGET_LEDGER), false); + peer->sendPacket (boost::make_shared (tmGL, protocol::mtGET_LEDGER), false); } void PeerSet::sendRequest (const protocol::TMGetLedger& tmGL) @@ -156,12 +158,12 @@ void PeerSet::sendRequest (const protocol::TMGetLedger& tmGL) if (mPeers.empty ()) return; - PackedMessage::pointer packet ( - boost::make_shared (tmGL, protocol::mtGET_LEDGER)); + Message::pointer packet ( + boost::make_shared (tmGL, protocol::mtGET_LEDGER)); for (auto const& p : mPeers) { - Peer::pointer peer (getApp().getPeers ().findPeerByShortID (p.first)); + Peer::ptr peer (getApp().overlay ().findPeerByShortID (p.first)); if (peer) peer->sendPacket (packet, false); @@ -188,7 +190,7 @@ std::size_t PeerSet::getPeerCount () const for (auto const& p : mPeers) { - if (getApp ().getPeers ().findPeerByShortID (p.first)) + if (getApp ().overlay ().findPeerByShortID (p.first)) ++ret; } diff --git a/src/ripple_app/peers/PeerSet.h b/src/ripple_app/peers/PeerSet.h index 7286f73b0..b80a391ee 100644 --- a/src/ripple_app/peers/PeerSet.h +++ b/src/ripple_app/peers/PeerSet.h @@ -20,6 +20,8 @@ #ifndef RIPPLE_PEERSET_H #define RIPPLE_PEERSET_H +#include "../../ripple_overlay/api/Peer.h" + namespace ripple { /** A set of peers used to acquire data. @@ -81,15 +83,15 @@ public: // VFALCO TODO Rename this to addPeerToSet // - bool peerHas (Peer::ref); + bool peerHas (Peer::ptr const&); // VFALCO Workaround for MSVC std::function which doesn't swallow return types. - void peerHasVoid (Peer::ref peer) + void peerHasVoid (Peer::ptr const& peer) { peerHas (peer); } - void badPeer (Peer::ref); + void badPeer (Peer::ptr const&); void setTimer (); @@ -113,7 +115,7 @@ protected: clock_type& clock, beast::Journal journal); virtual ~PeerSet () = 0; - virtual void newPeer (Peer::ref) = 0; + virtual void newPeer (Peer::ptr const&) = 0; virtual void onTimer (bool progress, ScopedLockType&) = 0; virtual boost::weak_ptr pmDowncast () = 0; @@ -128,7 +130,7 @@ protected: void invokeOnTimer (); void sendRequest (const protocol::TMGetLedger& message); - void sendRequest (const protocol::TMGetLedger& message, Peer::ref peer); + void sendRequest (const protocol::TMGetLedger& message, Peer::ptr const& peer); protected: beast::Journal m_journal; diff --git a/src/ripple_app/ripple_app.h b/src/ripple_app/ripple_app.h index aeee4d03d..a712a2e25 100644 --- a/src/ripple_app/ripple_app.h +++ b/src/ripple_app/ripple_app.h @@ -33,7 +33,6 @@ #include #include #include -//#include #include #include #include @@ -53,7 +52,7 @@ #include "../ripple/common/ResolverAsio.h" -//#include "../beast/modules/beast_sqdb/beast_sqdb.h" +// VFALCO TODO Remove this include #include "../beast/modules/beast_sqlite/beast_sqlite.h" // Order matters here. If you get compile errors, @@ -62,8 +61,6 @@ #include "../../ripple/common/KeyCache.h" #include "../../ripple/common/TaggedCache.h" -#include "../../ripple_overlay/ripple_overlay.h" - #include "data/Database.h" #include "data/DatabaseCon.h" #include "data/SqliteDatabase.h" diff --git a/src/ripple_app/rpc/RPCHandler.cpp b/src/ripple_app/rpc/RPCHandler.cpp index efaa4125a..7c55a79d4 100644 --- a/src/ripple_app/rpc/RPCHandler.cpp +++ b/src/ripple_app/rpc/RPCHandler.cpp @@ -17,6 +17,8 @@ */ //============================================================================== +#include "../../ripple_overlay/api/Overlay.h" + #include "../../beast/beast/unit_test/suite.h" namespace ripple { @@ -863,7 +865,7 @@ Json::Value RPCHandler::doConnect (Json::Value params, beast::IP::Endpoint ip (beast::IP::Endpoint::from_string(params["ip"].asString ())); if (! is_unspecified (ip)) - getApp().getPeers ().connect (ip.at_port(iPort)); + getApp().overlay ().connect (ip.at_port(iPort)); return "connecting"; } @@ -934,7 +936,7 @@ Json::Value RPCHandler::doPeers (Json::Value, Resource::Charge& loadType, Applic { Json::Value jvResult (Json::objectValue); - jvResult["peers"] = getApp().getPeers ().json (); + jvResult["peers"] = getApp().overlay ().json (); getApp().getUNL().addClusterStatus(jvResult); diff --git a/src/ripple_app/tx/TransactionAcquire.cpp b/src/ripple_app/tx/TransactionAcquire.cpp index 75b6ea385..6a6739dcf 100644 --- a/src/ripple_app/tx/TransactionAcquire.cpp +++ b/src/ripple_app/tx/TransactionAcquire.cpp @@ -17,6 +17,8 @@ */ //============================================================================== +#include "../../ripple_overlay/api/Overlay.h" + namespace ripple { //SETUP_LOG (TransactionAcquire) @@ -110,8 +112,8 @@ void TransactionAcquire::onTimer (bool progress, ScopedLockType& psl) WriteLog (lsWARNING, TransactionAcquire) << "Out of peers for TX set " << getHash (); bool found = false; - Peers::PeerSequence peerList = getApp().getPeers ().getActivePeers (); - BOOST_FOREACH (Peer::ref peer, peerList) + Overlay::PeerSequence peerList = getApp().overlay ().getActivePeers (); + BOOST_FOREACH (Peer::ptr const& peer, peerList) { if (peer->hasTxSet (getHash ())) { @@ -122,12 +124,12 @@ void TransactionAcquire::onTimer (bool progress, ScopedLockType& psl) if (!found) { - BOOST_FOREACH (Peer::ref peer, peerList) + BOOST_FOREACH (Peer::ptr const& peer, peerList) peerHas (peer); } } else if (!progress) - trigger (Peer::pointer ()); + trigger (Peer::ptr ()); } boost::weak_ptr TransactionAcquire::pmDowncast () @@ -135,7 +137,7 @@ boost::weak_ptr TransactionAcquire::pmDowncast () return boost::dynamic_pointer_cast (shared_from_this ()); } -void TransactionAcquire::trigger (Peer::ref peer) +void TransactionAcquire::trigger (Peer::ptr const& peer) { if (mComplete) { @@ -196,7 +198,7 @@ void TransactionAcquire::trigger (Peer::ref peer) } SHAMapAddNode TransactionAcquire::takeNodes (const std::list& nodeIDs, - const std::list< Blob >& data, Peer::ref peer) + const std::list< Blob >& data, Peer::ptr const& peer) { if (mComplete) { diff --git a/src/ripple_app/tx/TransactionAcquire.h b/src/ripple_app/tx/TransactionAcquire.h index 148ada131..e4c4cfaff 100644 --- a/src/ripple_app/tx/TransactionAcquire.h +++ b/src/ripple_app/tx/TransactionAcquire.h @@ -44,20 +44,20 @@ public: } SHAMapAddNode takeNodes (const std::list& IDs, - const std::list< Blob >& data, Peer::ref); + const std::list< Blob >& data, Peer::ptr const&); private: SHAMap::pointer mMap; bool mHaveRoot; void onTimer (bool progress, ScopedLockType& peerSetLock); - void newPeer (Peer::ref peer) + void newPeer (Peer::ptr const& peer) { trigger (peer); } void done (); - void trigger (Peer::ref); + void trigger (Peer::ptr const&); boost::weak_ptr pmDowncast (); }; diff --git a/src/ripple_core/nodestore/api/NodeObject.h b/src/ripple_core/nodestore/api/NodeObject.h index a721f0af3..c7518bc4d 100644 --- a/src/ripple_core/nodestore/api/NodeObject.h +++ b/src/ripple_core/nodestore/api/NodeObject.h @@ -61,14 +61,6 @@ public: keyBytes = 32, }; - /** The type used to hold the hash. - - The hahes are fixed size, SHA256. - - @note The key size can be retrieved with `Hash::sizeInBytes` - */ - typedef beast::UnsignedInteger <32> Hash; - // Please use this one. For a reference use Ptr const& typedef boost::shared_ptr Ptr; diff --git a/src/ripple_data/crypto/Base58Data.h b/src/ripple_data/crypto/Base58Data.h index 0743a6aef..b01d6d6dc 100644 --- a/src/ripple_data/crypto/Base58Data.h +++ b/src/ripple_data/crypto/Base58Data.h @@ -32,6 +32,8 @@ #ifndef RIPPLE_BASE58DATA_H #define RIPPLE_BASE58DATA_H +#include ".././ripple/types/api/Base58.h" + namespace ripple { class CBase58Data diff --git a/src/ripple_data/protocol/Protocol.h b/src/ripple_data/protocol/Protocol.h index 1d196a8ac..4b18f15b0 100644 --- a/src/ripple_data/protocol/Protocol.h +++ b/src/ripple_data/protocol/Protocol.h @@ -20,6 +20,8 @@ #ifndef RIPPLE_PROTOCOL_H #define RIPPLE_PROTOCOL_H +#include + namespace ripple { /** Protocol specific constants, types, and data. diff --git a/src/ripple_data/protocol/RippleAddress.h b/src/ripple_data/protocol/RippleAddress.h index 33187b239..8bb1e54cd 100644 --- a/src/ripple_data/protocol/RippleAddress.h +++ b/src/ripple_data/protocol/RippleAddress.h @@ -20,8 +20,20 @@ #ifndef RIPPLE_RIPPLEADDRESS_H #define RIPPLE_RIPPLEADDRESS_H +#include "../crypto/Base58Data.h" + +#include "../ripple/types/api/UInt160.h" +#include "../ripple/types/api/RippleAccountID.h" +#include "../ripple/types/api/RippleAccountPrivateKey.h" +#include "../ripple/types/api/RippleAccountPublicKey.h" +#include "../ripple/types/api/RipplePrivateKey.h" +#include "../ripple/types/api/RipplePublicKey.h" +#include "../ripple/types/api/RipplePublicKeyHash.h" #include "../ripple/sslutil/api/ECDSACanonical.h" +struct bignum_st; +typedef struct bignum_st BIGNUM; + namespace ripple { // diff --git a/src/ripple_data/protocol/ripple.proto b/src/ripple_data/protocol/ripple.proto index f174879a0..f1c8ec1ed 100644 --- a/src/ripple_data/protocol/ripple.proto +++ b/src/ripple_data/protocol/ripple.proto @@ -226,7 +226,7 @@ message TMGetContacts message TMGetPeers { - required uint32 doWeNeedThis = 1; // yes since you are asserting that the packet size isn't 0 in PackedMessage + required uint32 doWeNeedThis = 1; // yes since you are asserting that the packet size isn't 0 in Message } message TMIPv4Endpoint diff --git a/src/ripple_overlay/README.md b/src/ripple_overlay/README.md index 49b21cf95..edb3f862d 100644 --- a/src/ripple_overlay/README.md +++ b/src/ripple_overlay/README.md @@ -1,4 +1,16 @@ # Overlay -A module which manages peer connections that operate the _Ripple -peer protocol_. +## Introduction + +The _Ripple payment network_ consists of a collection of _peers_ running the +**rippled software**. Each peer maintains multiple outgoing connections and +optional incoming connections to other peers. These connections are made over +both the public Internet and private local area networks. This network defines +a fully connected directed graph of nodes. Peers send and receive messages to +other connected peers. This peer to peer network, layered on top of the public +and private Internet, forms an [_overlay network_][overlay_network]. The +contents of the messages and the behavior of peers in response to the messages, +plus the information exchanged during the handshaking phase of connection +establishment, defines the _Peer Protocol_. + +[overlay_network]: http://en.wikipedia.org/wiki/Overlay_network \ No newline at end of file diff --git a/src/ripple_overlay/api/PackedMessage.h b/src/ripple_overlay/api/Message.h similarity index 70% rename from src/ripple_overlay/api/PackedMessage.h rename to src/ripple_overlay/api/Message.h index 0241d93b4..99796ba8a 100644 --- a/src/ripple_overlay/api/PackedMessage.h +++ b/src/ripple_overlay/api/Message.h @@ -17,8 +17,8 @@ */ //============================================================================== -#ifndef RIPPLE_OVERLAY_PACKEDMESSAGE_H_INCLUDED -#define RIPPLE_OVERLAY_PACKEDMESSAGE_H_INCLUDED +#ifndef RIPPLE_OVERLAY_MESSAGE_H_INCLUDED +#define RIPPLE_OVERLAY_MESSAGE_H_INCLUDED #include "ripple.pb.h" @@ -28,7 +28,7 @@ namespace ripple { -// VFALCO NOTE If we forward declare PackedMessage and write out shared_ptr +// VFALCO NOTE If we forward declare Message and write out shared_ptr // instead of using the in-class typedef, we can remove the entire // ripple.pb.h from the main headers. // @@ -36,42 +36,37 @@ namespace ripple { // packaging of messages into length/type-prepended buffers // ready for transmission. // -// PackedMessage implements simple "packing" of protocol buffers Messages into +// Message implements simple "packing" of protocol buffers Messages into // a string prepended by a header specifying the message length. // MessageType should be a Message class generated by the protobuf compiler. // -class PackedMessage : public boost::enable_shared_from_this +class Message : public boost::enable_shared_from_this { public: - typedef boost::shared_ptr< ::google::protobuf::Message > MessagePointer; - typedef boost::shared_ptr pointer; + typedef boost::shared_ptr pointer; public: /** Number of bytes in a message header. */ static size_t const kHeaderBytes = 6; - PackedMessage (::google::protobuf::Message const& message, int type); + Message (::google::protobuf::Message const& message, int type); - /** Retrieve the packed message data. - */ - // VFALCO TODO shouldn't this be const? - std::vector & getBuffer () + /** Retrieve the packed message data. */ + std::vector const& + getBuffer () const { return mBuffer; } - /** Determine bytewise equality. - */ - bool operator == (PackedMessage const& other) const; + /** Determine bytewise equality. */ + bool operator == (Message const& other) const; - /** Calculate the length of a packed message. - */ + /** Calculate the length of a packed message. */ static unsigned getLength (std::vector const& buf); - /** Determine the type of a packed message. - */ + /** Determine the type of a packed message. */ static int getType (std::vector const& buf); private: diff --git a/src/ripple_overlay/api/Overlay.h b/src/ripple_overlay/api/Overlay.h new file mode 100644 index 000000000..6d1be9083 --- /dev/null +++ b/src/ripple_overlay/api/Overlay.h @@ -0,0 +1,131 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_OVERLAY_H_INCLUDED +#define RIPPLE_OVERLAY_OVERLAY_H_INCLUDED + +#include "Peer.h" + +// VFALCO TODO Remove this include dependency it shouldn't be needed +#include "../../ripple/peerfinder/api/Slot.h" + +#include "../../beast/beast/threads/Stoppable.h" +#include "../../beast/beast/utility/PropertyStream.h" + +#include "../../beast/beast/cxx14/type_traits.h" // + +namespace ripple { + +/** Manages the set of connected peers. */ +class Overlay + : public beast::Stoppable + , public beast::PropertyStream::Source +{ +protected: + // VFALCO NOTE The requirement of this constructor is an + // unfortunate problem with the API for + // Stoppable and PropertyStream + // + Overlay (Stoppable& parent) + : Stoppable ("Overlay", parent) + , beast::PropertyStream::Source ("peers") + { + + } + +public: + typedef std::vector PeerSequence; + + virtual ~Overlay () = default; + + // VFALCO NOTE These should be a private API + /** @{ */ + virtual void remove (PeerFinder::Slot::ptr const& slot) = 0; + /** @} */ + + virtual void connect (beast::IP::Endpoint const& address) = 0; + + // Notification that a peer has connected. + virtual void onPeerActivated (Peer::ptr const& peer) = 0; + + // Notification that a peer has disconnected. + virtual void onPeerDisconnect (Peer::ptr const& peer) = 0; + + virtual std::size_t size () = 0; + virtual Json::Value json () = 0; + virtual PeerSequence getActivePeers () = 0; + + // Peer 64-bit ID function + virtual Peer::ptr findPeerByShortID (Peer::ShortId const& id) = 0; + + /** Visit every active peer and return a value + The functor must: + - Be callable as: + void operator()(Peer::ptr const& peer); + - Must have the following typedef: + typedef void return_type; + - Be callable as: + Function::return_type operator()() const; + + @param f the functor to call with every peer + @returns `f()` + + @note The functor is passed by value! + */ + template + std::enable_if_t < + ! std::is_void ::value, + typename Function::return_type + > + foreach(Function f) + { + PeerSequence peers (getActivePeers()); + + for(PeerSequence::const_iterator i = peers.begin(); i != peers.end(); ++i) + f (*i); + + return f(); + } + + /** Visit every active peer + The visitor functor must: + - Be callable as: + void operator()(Peer::ptr const& peer); + - Must have the following typedef: + typedef void return_type; + + @param f the functor to call with every peer + */ + template + std::enable_if_t < + std::is_void ::value, + typename Function::return_type + > + foreach(Function f) + { + PeerSequence peers (getActivePeers()); + + for(PeerSequence::const_iterator i = peers.begin(); i != peers.end(); ++i) + f (*i); + } +}; + +} + +#endif diff --git a/src/ripple_overlay/api/Peer.h b/src/ripple_overlay/api/Peer.h index 1b445f047..85c332d2b 100644 --- a/src/ripple_overlay/api/Peer.h +++ b/src/ripple_overlay/api/Peer.h @@ -20,81 +20,60 @@ #ifndef RIPPLE_OVERLAY_PEER_H_INCLUDED #define RIPPLE_OVERLAY_PEER_H_INCLUDED -#include "../../beast/beast/net/IPEndpoint.h" +#include "Message.h" -#include +#include "../../ripple/json/ripple_json.h" +#include "../../ripple/types/api/UInt256.h" +#include "../../ripple_data/protocol/RippleAddress.h" + +#include "../../beast/beast/net/IPEndpoint.h" namespace ripple { -typedef boost::asio::ip::tcp::socket NativeSocketType; - namespace Resource { class Charge; -class Manager; } -namespace PeerFinder { -class Manager; -} - -class Peers; - -/** Represents a peer connection in the overlay. -*/ -class Peer : private beast::LeakChecked +/** Represents a peer connection in the overlay. */ +class Peer { public: - typedef boost::shared_ptr Ptr; - - // DEPRECATED typedefs. - typedef boost::shared_ptr pointer; - typedef pointer const& ref; - - /** Uniquely identifies a particular connection of a peer - This works upto a restart of rippled. - */ + typedef boost::shared_ptr ptr; + + /** Uniquely identifies a particular connection of a peer. */ typedef std::uint32_t ShortId; - virtual void sendPacket (const PackedMessage::pointer& packet, bool onStrand) = 0; - - /** Adjust this peer's load balance based on the type of load imposed. - - @note Formerly named punishPeer - */ - virtual void charge (Resource::Charge const& fee) = 0; - static void charge (boost::weak_ptr & peer, Resource::Charge const& fee); - - virtual Json::Value json () = 0; - - virtual bool isInCluster () const = 0; - - virtual std::string getClusterNodeName() const = 0; - - virtual uint256 const& getClosedLedgerHash () const = 0; - - virtual bool hasLedger (uint256 const& hash, std::uint32_t seq) const = 0; - - virtual void getLedger (protocol::TMGetLedger &) = 0; - - virtual void ledgerRange (std::uint32_t& minSeq, std::uint32_t& maxSeq) const = 0; - - virtual bool hasTxSet (uint256 const& hash) const = 0; - - virtual void setShortId(Peer::ShortId shortId) = 0; - - virtual ShortId getShortId () const = 0; - - virtual const RippleAddress& getNodePublic () const = 0; - - virtual void cycleStatus () = 0; - - virtual bool supportsVersion (int version) = 0; - - virtual bool hasRange (std::uint32_t uMin, std::uint32_t uMax) = 0; + // + // Network + // + virtual void sendPacket (const Message::pointer& packet, bool onStrand) = 0; virtual beast::IP::Endpoint getRemoteAddress() const = 0; - virtual NativeSocketType& getNativeSocket () = 0; + /** Adjust this peer's load balance based on the type of load imposed. */ + virtual void charge (Resource::Charge const& fee) = 0; + + // + // Identity + // + + virtual ShortId getShortId () const = 0; + virtual const RippleAddress& getNodePublic () const = 0; + virtual Json::Value json () = 0; + virtual bool isInCluster () const = 0; + virtual std::string getClusterNodeName() const = 0; + + // + // Ledger + // + + virtual uint256 const& getClosedLedgerHash () const = 0; + virtual bool hasLedger (uint256 const& hash, std::uint32_t seq) const = 0; + virtual void ledgerRange (std::uint32_t& minSeq, std::uint32_t& maxSeq) const = 0; + virtual bool hasTxSet (uint256 const& hash) const = 0; + virtual void cycleStatus () = 0; + virtual bool supportsVersion (int version) = 0; + virtual bool hasRange (std::uint32_t uMin, std::uint32_t uMax) = 0; }; } diff --git a/src/ripple_overlay/api/Peers.h b/src/ripple_overlay/api/Peers.h deleted file mode 100644 index ceab544f3..000000000 --- a/src/ripple_overlay/api/Peers.h +++ /dev/null @@ -1,300 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_PEERS_H_INCLUDED -#define RIPPLE_PEERS_H_INCLUDED - -// VFALCO TODO Remove this include dependency it shouldn't be needed -#include "../../ripple/peerfinder/api/Slot.h" -#include "../../ripple/common/Resolver.h" - -#include "../../beast/beast/threads/Stoppable.h" -#include "../../beast/modules/beast_core/files/File.h" - -#include -#include - -#include - -namespace ripple { - -namespace PeerFinder { -struct Endpoint; -class Manager; -} - -namespace Resource { -class Manager; -} - -namespace SiteFiles { -class Manager; -} - -//------------------------------------------------------------------------------ - -/** Manages the set of connected peers. */ -class Peers - : public beast::Stoppable - , public beast::PropertyStream::Source -{ -protected: - // VFALCO NOTE The requirement of this constructor is an - // unfortunate problem with the API for - // Stoppable and PropertyStream - // - Peers (Stoppable& parent) - : Stoppable ("Peers", parent) - , beast::PropertyStream::Source ("peers") - { - - } - -public: - typedef std::vector PeerSequence; - - static Peers* New ( - Stoppable& parent, - Resource::Manager& resourceManager, - SiteFiles::Manager& siteFiles, - beast::File const& pathToDbFileOrDirectory, - Resolver& resolver, - boost::asio::io_service& io_service, - boost::asio::ssl::context& context); - - virtual ~Peers () = 0; - - // VFALCO NOTE These should be a private API - /** @{ */ - virtual void remove (PeerFinder::Slot::ptr const& slot) = 0; - /** @} */ - - virtual void accept (bool proxyHandshake, - boost::shared_ptr const& socket) = 0; - - virtual void connect (beast::IP::Endpoint const& address) = 0; - - // Notification that a peer has connected. - virtual void onPeerActivated (Peer::ref peer) = 0; - - // Notification that a peer has disconnected. - virtual void onPeerDisconnect (Peer::ref peer) = 0; - - virtual std::size_t size () = 0; - virtual Json::Value json () = 0; - virtual PeerSequence getActivePeers () = 0; - - // Peer 64-bit ID function - virtual Peer::pointer findPeerByShortID (Peer::ShortId const& id) = 0; - - /** Visit every active peer and return a value - The functor must: - - Be callable as: - void operator()(Peer::ref peer); - - Must have the following typedef: - typedef void return_type; - - Be callable as: - Function::return_type operator()() const; - - @param f the functor to call with every peer - @returns `f()` - - @note The functor is passed by value! - */ - template - typename boost::disable_if < - boost::is_void , - typename Function::return_type>::type - foreach(Function f) - { - PeerSequence peers (getActivePeers()); - - for(PeerSequence::const_iterator i = peers.begin(); i != peers.end(); ++i) - f (*i); - - return f(); - } - - /** Visit every active peer - The visitor functor must: - - Be callable as: - void operator()(Peer::ref peer); - - Must have the following typedef: - typedef void return_type; - - @param f the functor to call with every peer - */ - template - typename boost::enable_if < - boost::is_void , - typename Function::return_type>::type - foreach(Function f) - { - PeerSequence peers (getActivePeers()); - - for(PeerSequence::const_iterator i = peers.begin(); i != peers.end(); ++i) - f (*i); - } -}; - -/** Sends a message to all peers */ -struct send_always -{ - typedef void return_type; - - PackedMessage::pointer const& msg; - - send_always(PackedMessage::pointer const& m) - : msg(m) - { } - - void operator()(Peer::ref peer) const - { - peer->sendPacket (msg, false); - } -}; - -//------------------------------------------------------------------------------ - -/** Sends a message to match peers */ -template -struct send_if_pred -{ - typedef void return_type; - - PackedMessage::pointer const& msg; - Predicate const& predicate; - - send_if_pred(PackedMessage::pointer const& m, Predicate const& p) - : msg(m), predicate(p) - { } - - void operator()(Peer::ref peer) const - { - if (predicate (peer)) - peer->sendPacket (msg, false); - } -}; - -/** Helper function to aid in type deduction */ -template -send_if_pred send_if ( - PackedMessage::pointer const& m, - Predicate const &f) -{ - return send_if_pred(m, f); -} - -//------------------------------------------------------------------------------ - -/** Sends a message to non-matching peers */ -template -struct send_if_not_pred -{ - typedef void return_type; - - PackedMessage::pointer const& msg; - Predicate const& predicate; - - send_if_not_pred(PackedMessage::pointer const& m, Predicate const& p) - : msg(m), predicate(p) - { } - - void operator()(Peer::ref peer) const - { - if (!predicate (peer)) - peer->sendPacket (msg, false); - } -}; - -/** Helper function to aid in type deduction */ -template -send_if_not_pred send_if_not ( - PackedMessage::pointer const& m, - Predicate const &f) -{ - return send_if_not_pred(m, f); -} - -//------------------------------------------------------------------------------ - -/** Select the specific peer */ -struct match_peer -{ - Peer const* matchPeer; - - match_peer (Peer const* match = nullptr) - : matchPeer (match) - { } - - bool operator() (Peer::ref peer) const - { - if(matchPeer && (peer.get () == matchPeer)) - return true; - - return false; - } -}; - -//------------------------------------------------------------------------------ - -/** Select all peers (except optional excluded) that are in our cluster */ -struct peer_in_cluster -{ - match_peer skipPeer; - - peer_in_cluster (Peer const* skip = nullptr) - : skipPeer (skip) - { } - - bool operator() (Peer::ref peer) const - { - if (skipPeer (peer)) - return false; - - if (!peer->isInCluster ()) - return false; - - return true; - } -}; - -//------------------------------------------------------------------------------ - -/** Select all peers that are in the specified set */ -struct peer_in_set -{ - std::set const& peerSet; - - peer_in_set (std::set const& peers) - : peerSet (peers) - { } - - bool operator() (Peer::ref peer) const - { - if (peerSet.count (peer->getShortId ()) == 0) - return false; - - return true; - } -}; - -} - -#endif diff --git a/src/ripple_overlay/api/make_Overlay.h b/src/ripple_overlay/api/make_Overlay.h new file mode 100644 index 000000000..403010aa0 --- /dev/null +++ b/src/ripple_overlay/api/make_Overlay.h @@ -0,0 +1,62 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_MAKE_OVERLAY_H_INCLUDED +#define RIPPLE_OVERLAY_MAKE_OVERLAY_H_INCLUDED + +#include "Overlay.h" + +#include "../../ripple/resource/api/Manager.h" +#include "../../ripple/sitefiles/api/Manager.h" +#include "../../ripple/common/Resolver.h" + +#include "../../beast/beast/threads/Stoppable.h" +#include "../../beast/modules/beast_core/files/File.h" + +#include +#include + +namespace ripple { + +// VFALCO This is separated so that users of the Overlay interface do not need +// to know about creation details such as asio or ssl. + +/** Creates the implementation of Overlay. + + @param parent + @param resourceManager + @param siteFiles + @param pathToDbFileOrDirectory + @param resolver + @param io_service + @param context +*/ +std::unique_ptr +make_Overlay ( + beast::Stoppable& parent, + Resource::Manager& resourceManager, + SiteFiles::Manager& siteFiles, + beast::File const& pathToDbFileOrDirectory, + Resolver& resolver, + boost::asio::io_service& io_service, + boost::asio::ssl::context& ssl_context); + +} // ripple + +#endif diff --git a/src/ripple_overlay/api/predicates.h b/src/ripple_overlay/api/predicates.h new file mode 100644 index 000000000..f2925e406 --- /dev/null +++ b/src/ripple_overlay/api/predicates.h @@ -0,0 +1,174 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_PREDICATES_H_INCLUDED +#define RIPPLE_OVERLAY_PREDICATES_H_INCLUDED + +#include "Message.h" +#include "Peer.h" + +#include + +namespace ripple { + +/** Sends a message to all peers */ +struct send_always +{ + typedef void return_type; + + Message::pointer const& msg; + + send_always(Message::pointer const& m) + : msg(m) + { } + + void operator()(Peer::ptr const& peer) const + { + peer->sendPacket (msg, false); + } +}; + +//------------------------------------------------------------------------------ + +/** Sends a message to match peers */ +template +struct send_if_pred +{ + typedef void return_type; + + Message::pointer const& msg; + Predicate const& predicate; + + send_if_pred(Message::pointer const& m, Predicate const& p) + : msg(m), predicate(p) + { } + + void operator()(Peer::ptr const& peer) const + { + if (predicate (peer)) + peer->sendPacket (msg, false); + } +}; + +/** Helper function to aid in type deduction */ +template +send_if_pred send_if ( + Message::pointer const& m, + Predicate const &f) +{ + return send_if_pred(m, f); +} + +//------------------------------------------------------------------------------ + +/** Sends a message to non-matching peers */ +template +struct send_if_not_pred +{ + typedef void return_type; + + Message::pointer const& msg; + Predicate const& predicate; + + send_if_not_pred(Message::pointer const& m, Predicate const& p) + : msg(m), predicate(p) + { } + + void operator()(Peer::ptr const& peer) const + { + if (!predicate (peer)) + peer->sendPacket (msg, false); + } +}; + +/** Helper function to aid in type deduction */ +template +send_if_not_pred send_if_not ( + Message::pointer const& m, + Predicate const &f) +{ + return send_if_not_pred(m, f); +} + +//------------------------------------------------------------------------------ + +/** Select the specific peer */ +struct match_peer +{ + Peer const* matchPeer; + + match_peer (Peer const* match = nullptr) + : matchPeer (match) + { } + + bool operator() (Peer::ptr const& peer) const + { + if(matchPeer && (peer.get () == matchPeer)) + return true; + + return false; + } +}; + +//------------------------------------------------------------------------------ + +/** Select all peers (except optional excluded) that are in our cluster */ +struct peer_in_cluster +{ + match_peer skipPeer; + + peer_in_cluster (Peer const* skip = nullptr) + : skipPeer (skip) + { } + + bool operator() (Peer::ptr const& peer) const + { + if (skipPeer (peer)) + return false; + + if (!peer->isInCluster ()) + return false; + + return true; + } +}; + +//------------------------------------------------------------------------------ + +/** Select all peers that are in the specified set */ +struct peer_in_set +{ + std::set const& peerSet; + + peer_in_set (std::set const& peers) + : peerSet (peers) + { } + + bool operator() (Peer::ptr const& peer) const + { + if (peerSet.count (peer->getShortId ()) == 0) + return false; + + return true; + } +}; + +} + +#endif diff --git a/src/ripple_overlay/impl/PackedMessage.cpp b/src/ripple_overlay/impl/Message.cpp similarity index 61% rename from src/ripple_overlay/impl/PackedMessage.cpp rename to src/ripple_overlay/impl/Message.cpp index 36280ff62..4c8808718 100644 --- a/src/ripple_overlay/impl/PackedMessage.cpp +++ b/src/ripple_overlay/impl/Message.cpp @@ -17,9 +17,13 @@ */ //============================================================================== +#include "../api/Message.h" + +#include + namespace ripple { -PackedMessage::PackedMessage (::google::protobuf::Message const& message, int type) +Message::Message (::google::protobuf::Message const& message, int type) { unsigned const messageBytes = message.ByteSize (); @@ -31,24 +35,24 @@ PackedMessage::PackedMessage (::google::protobuf::Message const& message, int ty if (messageBytes != 0) { - message.SerializeToArray (&mBuffer [PackedMessage::kHeaderBytes], messageBytes); + message.SerializeToArray (&mBuffer [Message::kHeaderBytes], messageBytes); #ifdef BEAST_DEBUG - //Log::out() << "PackedMessage: type=" << type << ", datalen=" << msg_size; + //Log::out() << "Message: type=" << type << ", datalen=" << msg_size; #endif } } -bool PackedMessage::operator== (PackedMessage const& other) const +bool Message::operator== (Message const& other) const { return mBuffer == other.mBuffer; } -unsigned PackedMessage::getLength (std::vector const& buf) +unsigned Message::getLength (std::vector const& buf) { unsigned result; - if (buf.size () >= PackedMessage::kHeaderBytes) + if (buf.size () >= Message::kHeaderBytes) { result = buf [0]; result <<= 8; @@ -66,9 +70,9 @@ unsigned PackedMessage::getLength (std::vector const& buf) return result; } -int PackedMessage::getType (std::vector const& buf) +int Message::getType (std::vector const& buf) { - if (buf.size () < PackedMessage::kHeaderBytes) + if (buf.size () < Message::kHeaderBytes) return 0; int ret = buf[4]; @@ -77,15 +81,15 @@ int PackedMessage::getType (std::vector const& buf) return ret; } -void PackedMessage::encodeHeader (unsigned size, int type) +void Message::encodeHeader (unsigned size, int type) { - assert (mBuffer.size () >= PackedMessage::kHeaderBytes); - mBuffer[0] = static_cast ((size >> 24) & 0xFF); - mBuffer[1] = static_cast ((size >> 16) & 0xFF); - mBuffer[2] = static_cast ((size >> 8) & 0xFF); - mBuffer[3] = static_cast (size & 0xFF); - mBuffer[4] = static_cast ((type >> 8) & 0xFF); - mBuffer[5] = static_cast (type & 0xFF); + assert (mBuffer.size () >= Message::kHeaderBytes); + mBuffer[0] = static_cast ((size >> 24) & 0xFF); + mBuffer[1] = static_cast ((size >> 16) & 0xFF); + mBuffer[2] = static_cast ((size >> 8) & 0xFF); + mBuffer[3] = static_cast (size & 0xFF); + mBuffer[4] = static_cast ((type >> 8) & 0xFF); + mBuffer[5] = static_cast (type & 0xFF); } } diff --git a/src/ripple_overlay/impl/MessageStream.h b/src/ripple_overlay/impl/MessageStream.h new file mode 100644 index 000000000..008f05d5c --- /dev/null +++ b/src/ripple_overlay/impl/MessageStream.h @@ -0,0 +1,43 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_MESSAGESTREAM_H_INCLUDED +#define RIPPLE_OVERLAY_MESSAGESTREAM_H_INCLUDED + +#include + +namespace ripple { + +/** Turns blocks of incoming data into protocol messages. */ +class MessageStream +{ +private: + std::size_t m_bytes; + std::vector m_buffer; + +public: + void + write (void const* buffer, std::size_t bytes) + { + } +}; + +} // ripple + +#endif diff --git a/src/ripple_overlay/impl/OverlayImpl.cpp b/src/ripple_overlay/impl/OverlayImpl.cpp new file mode 100644 index 000000000..ab7959673 --- /dev/null +++ b/src/ripple_overlay/impl/OverlayImpl.cpp @@ -0,0 +1,598 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "OverlayImpl.h" +#include "PeerDoor.h" +#include "PeerImp.h" + +#include "../../beast/beast/ByteOrder.h" + +namespace ripple { + +SETUP_LOG (Peer) + +class PeersLog; +template <> char const* LogPartition::getPartitionName () { return "Overlay"; } + +class PeerFinderLog; +template <> char const* LogPartition::getPartitionName () { return "PeerFinder"; } + +class NameResolverLog; +template <> char const* LogPartition::getPartitionName () { return "NameResolver"; } + +/** Calls a function during static initialization. */ +struct static_call +{ + // Function must be callable as + // void f (void) const + // + template + static_call (Function const& f) + { + f (); + } +}; + +static static_call init_PeerFinderLog (&LogPartition::get ); +static static_call init_NameResolverLog (&LogPartition::get ); + +//------------------------------------------------------------------------------ + +/** A functor to visit all active peers and retrieve their JSON data */ +struct get_peer_json +{ + typedef Json::Value return_type; + + Json::Value json; + + get_peer_json () + { } + + void operator() (Peer::ptr const& peer) + { + json.append (peer->json ()); + } + + Json::Value operator() () + { + return json; + } +}; + +//------------------------------------------------------------------------------ + +OverlayImpl::OverlayImpl (Stoppable& parent, + Resource::Manager& resourceManager, + SiteFiles::Manager& siteFiles, + beast::File const& pathToDbFileOrDirectory, + Resolver& resolver, + boost::asio::io_service& io_service, + boost::asio::ssl::context& ssl_context) + : Overlay (parent) + , m_child_count (1) + , m_journal (LogPartition::getJournal ()) + , m_resourceManager (resourceManager) + , m_peerFinder (add (PeerFinder::Manager::New ( + *this, + siteFiles, + pathToDbFileOrDirectory, + *this, + get_seconds_clock (), + LogPartition::getJournal ()))) + , m_io_service (io_service) + , m_ssl_context (ssl_context) + , m_resolver (resolver) +{ +} + +OverlayImpl::~OverlayImpl () +{ + // Block until dependent objects have been destroyed. + // This is just to catch improper use of the Stoppable API. + // + std::unique_lock lock (m_mutex); + m_cond.wait (lock, [this] { + return this->m_child_count == 0; }); +} + +void +OverlayImpl::accept (bool proxyHandshake, socket_type&& socket) +{ + // An error getting an endpoint means the connection closed. + // Just do nothing and the socket will be closed by the caller. + boost::system::error_code ec; + auto const local_endpoint_native (socket.local_endpoint (ec)); + if (ec) + return; + auto const remote_endpoint_native (socket.remote_endpoint (ec)); + if (ec) + return; + + auto const local_endpoint ( + beast::IPAddressConversion::from_asio (local_endpoint_native)); + auto const remote_endpoint ( + beast::IPAddressConversion::from_asio (remote_endpoint_native)); + + PeerFinder::Slot::ptr const slot (m_peerFinder->new_inbound_slot ( + local_endpoint, remote_endpoint)); + + if (slot == nullptr) + return; + + MultiSocket::Flag flags ( + MultiSocket::Flag::server_role | MultiSocket::Flag::ssl_required); + + if (proxyHandshake) + flags = flags.with (MultiSocket::Flag::proxy); + + PeerImp::ptr const peer (boost::make_shared ( + std::move (socket), remote_endpoint, *this, m_resourceManager, + *m_peerFinder, slot, m_ssl_context, flags)); + + { + std::lock_guard lock (m_mutex); + { + std::pair const result ( + m_peers.emplace (slot, peer)); + assert (result.second); + } + ++m_child_count; + + // This has to happen while holding the lock, + // otherwise the socket might not be canceled during a stop. + peer->start (); + } +} + +void +OverlayImpl::connect (beast::IP::Endpoint const& remote_endpoint) +{ + if (isStopping()) + { + m_journal.debug << + "Skipping " << remote_endpoint << + " connect on stop"; + return; + } + + PeerFinder::Slot::ptr const slot ( + m_peerFinder->new_outbound_slot (remote_endpoint)); + + if (slot == nullptr) + return; + + MultiSocket::Flag const flags ( + MultiSocket::Flag::client_role | MultiSocket::Flag::ssl); + + PeerImp::ptr const peer (boost::make_shared ( + remote_endpoint, m_io_service, *this, m_resourceManager, + *m_peerFinder, slot, m_ssl_context, flags)); + + { + std::lock_guard lock (m_mutex); + { + std::pair const result ( + m_peers.emplace (slot, peer)); + assert (result.second); + } + ++m_child_count; + + // This has to happen while holding the lock, + // otherwise the socket might not be canceled during a stop. + peer->start (); + } +} + +Peer::ShortId +OverlayImpl::next_id() +{ + return ++m_nextShortId; +} + +//-------------------------------------------------------------------------- + +// Check for the stopped condition +// Caller must hold the mutex +void +OverlayImpl::check_stopped () +{ + // To be stopped, child Stoppable objects must be stopped + // and the count of dependent objects must be zero + if (areChildrenStopped () && m_child_count == 0) + { + m_cond.notify_all (); + m_journal.info << + "Stopped."; + stopped (); + } +} + +// Decrement the count of dependent objects +// Caller must hold the mutex +void +OverlayImpl::release () +{ + if (--m_child_count == 0) + check_stopped (); +} + +void +OverlayImpl::remove (PeerFinder::Slot::ptr const& slot) +{ + std::lock_guard lock (m_mutex); + + PeersBySlot::iterator const iter (m_peers.find (slot)); + assert (iter != m_peers.end ()); + m_peers.erase (iter); + + release(); +} + +//-------------------------------------------------------------------------- +// +// PeerFinder::Callback +// +//-------------------------------------------------------------------------- + +void +OverlayImpl::connect (std::vector const& list) +{ + for (std::vector ::const_iterator iter (list.begin()); + iter != list.end(); ++iter) + connect (*iter); +} + +void +OverlayImpl::activate (PeerFinder::Slot::ptr const& slot) +{ + m_journal.trace << + "Activate " << slot->remote_endpoint(); + + std::lock_guard lock (m_mutex); + + PeersBySlot::iterator const iter (m_peers.find (slot)); + assert (iter != m_peers.end ()); + PeerImp::ptr const peer (iter->second.lock()); + assert (peer != nullptr); + peer->activate (); +} + +void +OverlayImpl::send (PeerFinder::Slot::ptr const& slot, + std::vector const& endpoints) +{ + typedef std::vector List; + protocol::TMEndpoints tm; + for (List::const_iterator iter (endpoints.begin()); + iter != endpoints.end(); ++iter) + { + PeerFinder::Endpoint const& ep (*iter); + protocol::TMEndpoint& tme (*tm.add_endpoints()); + if (ep.address.is_v4()) + tme.mutable_ipv4()->set_ipv4( + beast::toNetworkByteOrder (ep.address.to_v4().value)); + else + tme.mutable_ipv4()->set_ipv4(0); + tme.mutable_ipv4()->set_ipv4port (ep.address.port()); + + tme.set_hops (ep.hops); + } + + tm.set_version (1); + + Message::pointer msg ( + boost::make_shared ( + tm, protocol::mtENDPOINTS)); + + { + std::lock_guard lock (m_mutex); + PeersBySlot::iterator const iter (m_peers.find (slot)); + assert (iter != m_peers.end ()); + PeerImp::ptr const peer (iter->second.lock()); + assert (peer != nullptr); + peer->sendPacket (msg, false); + } +} + +void +OverlayImpl::disconnect (PeerFinder::Slot::ptr const& slot, bool graceful) +{ + if (m_journal.trace) m_journal.trace << + "Disconnect " << slot->remote_endpoint () << + (graceful ? "gracefully" : ""); + + std::lock_guard lock (m_mutex); + + PeersBySlot::iterator const iter (m_peers.find (slot)); + assert (iter != m_peers.end ()); + PeerImp::ptr const peer (iter->second.lock()); + assert (peer != nullptr); + peer->close (graceful); + //peer->detach ("disc", false); +} + +//-------------------------------------------------------------------------- +// +// Stoppable +// +//-------------------------------------------------------------------------- + +void +OverlayImpl::onPrepare () +{ + PeerFinder::Config config; + + if (getConfig ().PEERS_MAX != 0) + config.maxPeers = getConfig ().PEERS_MAX; + + config.outPeers = config.calcOutPeers(); + + config.wantIncoming = + (! getConfig ().PEER_PRIVATE) && + (getConfig().peerListeningPort != 0); + + // if it's a private peer or we are running as standalone + // automatic connections would defeat the purpose. + config.autoConnect = + !getConfig().RUN_STANDALONE && + !getConfig().PEER_PRIVATE; + + config.listeningPort = getConfig().peerListeningPort; + + config.features = ""; + + // Enforce business rules + config.applyTuning (); + + m_peerFinder->setConfig (config); + + auto bootstrapIps (getConfig ().IPS); + + // If no IPs are specified, use the Ripple Labs round robin + // pool to get some servers to insert into the boot cache. + if (bootstrapIps.empty ()) + bootstrapIps.push_back ("r.ripple.com 51235"); + + if (!bootstrapIps.empty ()) + { + m_resolver.resolve (bootstrapIps, + [this]( + std::string const& name, + std::vector const& addresses) + { + std::vector ips; + + for (auto const& addr : addresses) + ips.push_back (to_string (addr)); + + std::string const base ("config: "); + + if (!ips.empty ()) + m_peerFinder->addFallbackStrings (base + name, ips); + }); + } + + // Add the ips_fixed from the rippled.cfg file + if (! getConfig ().RUN_STANDALONE && !getConfig ().IPS_FIXED.empty ()) + { + m_resolver.resolve (getConfig ().IPS_FIXED, + [this]( + std::string const& name, + std::vector const& addresses) + { + if (!addresses.empty ()) + m_peerFinder->addFixedPeer (name, addresses); + }); + } + + // Configure the peer doors, which allow the server to accept incoming + // peer connections: + if (! getConfig ().RUN_STANDALONE) + { + m_doorDirect = make_PeerDoor ( + PeerDoor::sslRequired, + *this, + getConfig ().PEER_IP, + getConfig ().peerListeningPort, + m_io_service); + + if (getConfig ().peerPROXYListeningPort != 0) + { + m_doorProxy = make_PeerDoor ( + PeerDoor::sslAndPROXYRequired, + *this, + getConfig ().PEER_IP, + getConfig ().peerPROXYListeningPort, + m_io_service); + } + } +} + +void +OverlayImpl::onStart () +{ +} + +/** Close all peer connections. + If `graceful` is true then active + Requirements: + Caller must hold the mutex. +*/ +void +OverlayImpl::close_all (bool graceful) +{ + for (auto const& entry : m_peers) + { + PeerImp::ptr const peer (entry.second.lock()); + + // VFALCO The only case where the weak_ptr is expired should be if + // ~PeerImp is pre-empted before it calls m_peers.remove() + // + if (peer != nullptr) + peer->close (graceful); + } +} + +void +OverlayImpl::onStop () +{ + if (m_doorDirect) + m_doorDirect->stop(); + if (m_doorProxy) + m_doorProxy->stop(); + + std::lock_guard lock (m_mutex); + // Take off the extra count we added in the constructor + release(); + + close_all (false); +} + +void +OverlayImpl::onChildrenStopped () +{ + std::lock_guard lock (m_mutex); + check_stopped (); +} + +//-------------------------------------------------------------------------- +// +// PropertyStream +// +//-------------------------------------------------------------------------- + +void +OverlayImpl::onWrite (beast::PropertyStream::Map& stream) +{ +} + +//-------------------------------------------------------------------------- +/** A peer has connected successfully + This is called after the peer handshake has been completed and during + peer activation. At this point, the peer address and the public key + are known. +*/ +void +OverlayImpl::onPeerActivated (Peer::ptr const& peer) +{ + std::lock_guard lock (m_mutex); + + // Now track this peer + { + auto const result (m_shortIdMap.emplace ( + std::piecewise_construct, + std::make_tuple (peer->getShortId()), + std::make_tuple (peer))); + assert(result.second); + } + + { + auto const result (m_publicKeyMap.emplace ( + boost::unordered::piecewise_construct, + boost::make_tuple (peer->getNodePublic()), + boost::make_tuple (peer))); + assert(result.second); + } + + m_journal.debug << + "activated " << peer->getRemoteAddress() << + " (" << peer->getShortId() << + ":" << RipplePublicKey(peer->getNodePublic()) << ")"; + + // We just accepted this peer so we have non-zero active peers + assert(size() != 0); +} + +/** A peer is being disconnected + This is called during the disconnection of a known, activated peer. It + will not be called for outbound peer connections that don't succeed or + for connections of peers that are dropped prior to being activated. +*/ +void +OverlayImpl::onPeerDisconnect (Peer::ptr const& peer) +{ + std::lock_guard lock (m_mutex); + m_shortIdMap.erase (peer->getShortId ()); + m_publicKeyMap.erase (peer->getNodePublic ()); +} + +/** The number of active peers on the network + Active peers are only those peers that have completed the handshake + and are running the Ripple protocol. +*/ +std::size_t +OverlayImpl::size () +{ + std::lock_guard lock (m_mutex); + return m_publicKeyMap.size (); +} + +// Returns information on verified peers. +Json::Value +OverlayImpl::json () +{ + return foreach (get_peer_json()); +} + +Overlay::PeerSequence +OverlayImpl::getActivePeers () +{ + Overlay::PeerSequence ret; + + std::lock_guard lock (m_mutex); + + ret.reserve (m_publicKeyMap.size ()); + + BOOST_FOREACH (PeerByPublicKey::value_type const& pair, m_publicKeyMap) + { + assert (!!pair.second); + ret.push_back (pair.second); + } + + return ret; +} + +Peer::ptr +OverlayImpl::findPeerByShortID (Peer::ShortId const& id) +{ + std::lock_guard lock (m_mutex); + PeerByShortId::iterator const iter ( + m_shortIdMap.find (id)); + if (iter != m_shortIdMap.end ()) + return iter->second; + return Peer::ptr(); +} + +//------------------------------------------------------------------------------ + +std::unique_ptr +make_Overlay ( + beast::Stoppable& parent, + Resource::Manager& resourceManager, + SiteFiles::Manager& siteFiles, + beast::File const& pathToDbFileOrDirectory, + Resolver& resolver, + boost::asio::io_service& io_service, + boost::asio::ssl::context& ssl_context) +{ + return std::make_unique (parent, resourceManager, siteFiles, + pathToDbFileOrDirectory, resolver, io_service, ssl_context); +} + +} diff --git a/src/ripple_overlay/impl/OverlayImpl.h b/src/ripple_overlay/impl/OverlayImpl.h new file mode 100644 index 000000000..b6f4d359e --- /dev/null +++ b/src/ripple_overlay/impl/OverlayImpl.h @@ -0,0 +1,226 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_OVERLAYIMPL_H_INCLUDED +#define RIPPLE_OVERLAY_OVERLAYIMPL_H_INCLUDED + +#include "../api/Overlay.h" + +#include "../../ripple/common/Resolver.h" +#include "../../ripple/common/seconds_clock.h" +#include "../../ripple/peerfinder/api/Callback.h" +#include "../../ripple/peerfinder/api/Manager.h" +#include "../../ripple/resource/api/Manager.h" + +#include +#include +#include + +#include "../../beast/beast/cxx14/memory.h" // +#include +#include +#include +#include + +namespace ripple { + +class PeerDoor; +class PeerImp; + +class OverlayImpl + : public Overlay + , public PeerFinder::Callback +{ +public: + typedef boost::asio::ip::tcp::socket socket_type; + + typedef std::unordered_map > PeersBySlot; + + typedef boost::unordered_map < + RippleAddress, Peer::ptr> PeerByPublicKey; + + typedef std::unordered_map < + Peer::ShortId, Peer::ptr> PeerByShortId; + + std::recursive_mutex m_mutex; + + // Blocks us until dependent objects have been destroyed + std::condition_variable_any m_cond; + + // Number of dependencies that must be destroyed before we can stop + std::size_t m_child_count; + + beast::Journal m_journal; + Resource::Manager& m_resourceManager; + + std::unique_ptr m_peerFinder; + + boost::asio::io_service& m_io_service; + boost::asio::ssl::context& m_ssl_context; + + /** Associates slots to peers. */ + PeersBySlot m_peers; + + /** Tracks peers by their public key */ + PeerByPublicKey m_publicKeyMap; + + /** Tracks peers by their session ID */ + PeerByShortId m_shortIdMap; + + /** The peer door for regular SSL connections */ + std::unique_ptr m_doorDirect; + + /** The peer door for proxy connections */ + std::unique_ptr m_doorProxy; + + /** The resolver we use for peer hostnames */ + Resolver& m_resolver; + + /** Monotically increasing identifiers for peers */ + beast::Atomic m_nextShortId; + + //-------------------------------------------------------------------------- + + OverlayImpl (Stoppable& parent, + Resource::Manager& resourceManager, + SiteFiles::Manager& siteFiles, + beast::File const& pathToDbFileOrDirectory, + Resolver& resolver, + boost::asio::io_service& io_service, + boost::asio::ssl::context& ssl_context); + + ~OverlayImpl (); + + OverlayImpl (OverlayImpl const&) = delete; + OverlayImpl& operator= (OverlayImpl const&) = delete; + + /** Process an incoming connection using the Peer protocol. + The caller transfers ownership of the socket via rvalue move. + @param proxyHandshake `true` If a PROXY handshake is required. + @param socket A socket in the accepted state. + */ + void + accept (bool proxyHandshake, + socket_type&& socket); + + void + connect (beast::IP::Endpoint const& remote_endpoint); + + Peer::ShortId + next_id(); + + //-------------------------------------------------------------------------- + + void + check_stopped (); + + void + release (); + + void + remove (PeerFinder::Slot::ptr const& slot); + + // + // PeerFinder::Callback + // + + void + connect (std::vector const& list); + + void + activate (PeerFinder::Slot::ptr const& slot); + + void + send (PeerFinder::Slot::ptr const& slot, + std::vector const& endpoints); + + void + disconnect (PeerFinder::Slot::ptr const& slot, bool graceful); + + // + // Stoppable + // + + void + onPrepare () override; + + void + onStart () override; + + /** Close all peer connections. + If `graceful` is true then active + Requirements: + Caller must hold the mutex. + */ + void + close_all (bool graceful); + + void + onStop () override; + + void + onChildrenStopped () override; + + // + // PropertyStream + // + + void + onWrite (beast::PropertyStream::Map& stream); + + //-------------------------------------------------------------------------- + + /** A peer has connected successfully + This is called after the peer handshake has been completed and during + peer activation. At this point, the peer address and the public key + are known. + */ + void + onPeerActivated (Peer::ptr const& peer); + + /** A peer is being disconnected + This is called during the disconnection of a known, activated peer. It + will not be called for outbound peer connections that don't succeed or + for connections of peers that are dropped prior to being activated. + */ + void + onPeerDisconnect (Peer::ptr const& peer); + + /** The number of active peers on the network + Active peers are only those peers that have completed the handshake + and are running the Ripple protocol. + */ + std::size_t + size (); + + // Returns information on verified peers. + Json::Value + json (); + + Overlay::PeerSequence + getActivePeers (); + + Peer::ptr + findPeerByShortID (Peer::ShortId const& id); +}; + +} // ripple + +#endif diff --git a/src/ripple_overlay/impl/PeerDoor.cpp b/src/ripple_overlay/impl/PeerDoor.cpp index 18d9b3b2d..9a82e0672 100644 --- a/src/ripple_overlay/impl/PeerDoor.cpp +++ b/src/ripple_overlay/impl/PeerDoor.cpp @@ -17,6 +17,7 @@ */ //============================================================================== +#include "OverlayImpl.h" #include "PeerDoor.h" namespace ripple { @@ -27,16 +28,24 @@ class PeerDoorImp : public PeerDoor , public beast::LeakChecked { +private: + OverlayImpl& m_overlay; + beast::Journal m_journal; + Kind m_kind; + boost::asio::ip::tcp::acceptor m_acceptor; + boost::asio::deadline_timer m_acceptDelay; + NativeSocketType m_socket; + public: - PeerDoorImp (Kind kind, Peers& peers, - boost::asio::ip::tcp::endpoint const &ep, - boost::asio::io_service& io_service) - : PeerDoor (static_cast(peers)) - , m_peers (peers) + PeerDoorImp (Kind kind, OverlayImpl& overlay, + boost::asio::ip::tcp::endpoint const &ep, + boost::asio::io_service& io_service) + : m_overlay (overlay) , m_journal (LogPartition::getJournal ()) , m_kind (kind) , m_acceptor (io_service, ep) , m_acceptDelay (io_service) + , m_socket (io_service) { m_journal.info << "Listening on " << @@ -47,8 +56,18 @@ public: async_accept (); } - ~PeerDoorImp () + void + stop() { + { + boost::system::error_code ec; + m_acceptDelay.cancel (ec); + } + + { + boost::system::error_code ec; + m_acceptor.cancel (ec); + } } //-------------------------------------------------------------------------- @@ -57,14 +76,9 @@ public: // void async_accept () { - boost::shared_ptr socket ( - boost::make_shared ( - m_acceptor.get_io_service())); - - m_acceptor.async_accept (*socket, + m_acceptor.async_accept (m_socket, boost::bind (&PeerDoorImp::handleAccept, this, - boost::asio::placeholders::error, - socket)); + boost::asio::placeholders::error)); } //-------------------------------------------------------------------------- @@ -73,7 +87,7 @@ public: // void handleTimer (boost::system::error_code ec) { - if (ec == boost::asio::error::operation_aborted || isStopping ()) + if (ec == boost::asio::error::operation_aborted) return; async_accept (); @@ -81,10 +95,9 @@ public: // Called when the accept socket wait completes // - void handleAccept (boost::system::error_code ec, - boost::shared_ptr const& socket) + void handleAccept (boost::system::error_code ec) { - if (ec == boost::asio::error::operation_aborted || isStopping ()) + if (ec == boost::asio::error::operation_aborted) return; bool delay = false; @@ -93,7 +106,7 @@ public: { bool const proxyHandshake (m_kind == sslAndPROXYRequired); - m_peers.accept (proxyHandshake, socket); + m_overlay.accept (proxyHandshake, std::move(m_socket)); } else { @@ -103,6 +116,8 @@ public: m_journal.info << "Error " << ec; } + m_socket.close(ec); + if (delay) { m_acceptDelay.expires_from_now (boost::posix_time::milliseconds (500)); @@ -114,43 +129,13 @@ public: async_accept (); } } - - //-------------------------------------------------------------------------- - - void onStop () - { - { - boost::system::error_code ec; - m_acceptDelay.cancel (ec); - } - - { - boost::system::error_code ec; - m_acceptor.cancel (ec); - } - - stopped (); - } - -private: - Peers& m_peers; - beast::Journal m_journal; - Kind m_kind; - boost::asio::ip::tcp::acceptor m_acceptor; - boost::asio::deadline_timer m_acceptDelay; }; //------------------------------------------------------------------------------ -PeerDoor::PeerDoor (Stoppable& parent) - : Stoppable ("PeerDoor", parent) -{ -} - -//------------------------------------------------------------------------------ std::unique_ptr -createPeerDoor ( - PeerDoor::Kind kind, Peers& peers, +make_PeerDoor ( + PeerDoor::Kind kind, OverlayImpl& overlay, std::string const& ip, int port, boost::asio::io_service& io_service) { @@ -161,7 +146,7 @@ createPeerDoor ( boost::asio::ip::address ().from_string ( ip.empty () ? "0.0.0.0" : ip), port); - return std::make_unique(kind, peers, ep, io_service); + return std::make_unique(kind, overlay, ep, io_service); } } diff --git a/src/ripple_overlay/impl/PeerDoor.h b/src/ripple_overlay/impl/PeerDoor.h index 87d645172..1011937f7 100644 --- a/src/ripple_overlay/impl/PeerDoor.h +++ b/src/ripple_overlay/impl/PeerDoor.h @@ -20,29 +20,31 @@ #ifndef RIPPLE_PEERDOOR_H_INCLUDED #define RIPPLE_PEERDOOR_H_INCLUDED +#include "OverlayImpl.h" + #include "../../beast/beast/cxx14/memory.h" // namespace ripple { /** Handles incoming connections from peers. */ -class PeerDoor : public beast::Stoppable +class PeerDoor { -protected: - explicit PeerDoor (Stoppable& parent); - public: - virtual ~PeerDoor () { } + virtual ~PeerDoor () = default; enum Kind { sslRequired, sslAndPROXYRequired }; + + virtual + void stop() = 0; }; std::unique_ptr -createPeerDoor ( - PeerDoor::Kind kind, Peers& peers, +make_PeerDoor ( + PeerDoor::Kind kind, OverlayImpl& overlay, std::string const& ip, int port, boost::asio::io_service& io_service); diff --git a/src/ripple_overlay/impl/PeerImp.h b/src/ripple_overlay/impl/PeerImp.h index 39b3a3d5d..f63f43c1d 100644 --- a/src/ripple_overlay/impl/PeerImp.h +++ b/src/ripple_overlay/impl/PeerImp.h @@ -20,10 +20,25 @@ #ifndef RIPPLE_OVERLAY_PEERIMP_H_INCLUDED #define RIPPLE_OVERLAY_PEERIMP_H_INCLUDED +#include "../api/predicates.h" + #include "../../ripple/common/MultiSocket.h" +#include "../../ripple_data/protocol/Protocol.h" +#include "../ripple/validators/ripple_validators.h" +#include "../ripple/peerfinder/ripple_peerfinder.h" +#include "../ripple_app/misc/ProofOfWork.h" +#include "../ripple_app/misc/ProofOfWorkFactory.h" + +// VFALCO This is unfortunate. Comment this out and +// just include what is needed. +#include "../ripple_app/ripple_app.h" + +#include namespace ripple { +typedef boost::asio::ip::tcp::socket NativeSocketType; + class PeerImp; std::string to_string (Peer const& peer); @@ -40,36 +55,10 @@ std::ostream& operator<< (std::ostream& os, PeerImp const* peer); //------------------------------------------------------------------------------ -struct get_usable_peers -{ - typedef Peers::PeerSequence return_type; - - Peers::PeerSequence usablePeers; - uint256 const& txHash; - Peer const* skip; - - get_usable_peers(uint256 const& hash, Peer const* s) - : txHash(hash), skip(s) - { } - - void operator() (Peer::ref peer) - { - if (peer->hasTxSet (txHash) && (peer.get () != skip)) - usablePeers.push_back (peer); - } - - return_type operator() () - { - return usablePeers; - } -}; - -//------------------------------------------------------------------------------ - class PeerImp : public Peer - , public CountedObject , public boost::enable_shared_from_this + , private beast::LeakChecked { private: /** Time alloted for a peer to send a HELLO message (DEPRECATED) */ @@ -127,7 +116,7 @@ private: return; } - getNativeSocket ().async_connect ( + m_socket->next_layer ().async_connect ( beast::IPAddressConversion::to_asio_endpoint (m_remoteAddress), m_strand.wrap (boost::bind (&PeerImp::onConnect, shared_from_this (), boost::asio::placeholders::error))); @@ -155,7 +144,7 @@ public: typedef boost::shared_ptr ptr; - boost::shared_ptr m_shared_socket; + NativeSocketType m_owned_socket; beast::Journal m_journal; @@ -173,7 +162,7 @@ public: // Resource::Manager& m_resourceManager; PeerFinder::Manager& m_peerFinder; - Peers& m_peers; + OverlayImpl& m_overlay; bool m_inbound; std::unique_ptr m_socket; @@ -205,8 +194,8 @@ public: boost::asio::deadline_timer m_timer; std::vector m_readBuffer; - std::list mSendQ; - PackedMessage::pointer mSendingPacket; + std::list mSendQ; + Message::pointer mSendingPacket; protocol::TMStatusChange mLastStatus; protocol::TMHello mHello; @@ -221,31 +210,31 @@ public: //-------------------------------------------------------------------------- /** New incoming peer from the specified socket */ PeerImp ( - boost::shared_ptr const& socket, + NativeSocketType&& socket, beast::IP::Endpoint remoteAddress, - Peers& peers, + OverlayImpl& overlay, Resource::Manager& resourceManager, PeerFinder::Manager& peerFinder, PeerFinder::Slot::ptr const& slot, boost::asio::ssl::context& ssl_context, MultiSocket::Flag flags) - : m_shared_socket (socket) + : m_owned_socket (std::move (socket)) , m_journal (LogPartition::getJournal ()) , m_shortId (0) , m_remoteAddress (remoteAddress) , m_resourceManager (resourceManager) , m_peerFinder (peerFinder) - , m_peers (peers) + , m_overlay (overlay) , m_inbound (true) , m_socket (MultiSocket::New ( - *socket, ssl_context, flags.asBits ())) - , m_strand (socket->get_io_service()) + m_owned_socket, ssl_context, flags.asBits ())) + , m_strand (m_owned_socket.get_io_service()) , m_state (stateConnected) , m_detaching (false) , m_clusterNode (false) , m_minLedger (0) , m_maxLedger (0) - , m_timer (socket->get_io_service()) + , m_timer (m_owned_socket.get_io_service()) , m_slot (slot) , m_was_canceled (false) { @@ -260,18 +249,19 @@ public: PeerImp ( beast::IP::Endpoint remoteAddress, boost::asio::io_service& io_service, - Peers& peers, + OverlayImpl& overlay, Resource::Manager& resourceManager, PeerFinder::Manager& peerFinder, PeerFinder::Slot::ptr const& slot, boost::asio::ssl::context& ssl_context, MultiSocket::Flag flags) - : m_journal (LogPartition::getJournal ()) + : m_owned_socket (io_service) + , m_journal (LogPartition::getJournal ()) , m_shortId (0) , m_remoteAddress (remoteAddress) , m_resourceManager (resourceManager) , m_peerFinder (peerFinder) - , m_peers (peers) + , m_overlay (overlay) , m_inbound (false) , m_socket (MultiSocket::New ( io_service, ssl_context, flags.asBits ())) @@ -287,15 +277,14 @@ public: { } - virtual ~PeerImp () + virtual + ~PeerImp () { - m_peers.remove (m_slot); + m_overlay.remove (m_slot); } - NativeSocketType& getNativeSocket () - { - return m_socket->next_layer (); - } + PeerImp (PeerImp const&) = delete; + PeerImp& operator= (PeerImp const&) = delete; MultiSocket& getStream () { @@ -346,7 +335,7 @@ public: m_peerFinder.on_closed (m_slot); if (m_state == stateActive) - m_peers.onPeerDisconnect (shared_from_this ()); + m_overlay.onPeerDisconnect (shared_from_this ()); m_state = stateGracefulClose; @@ -442,7 +431,9 @@ public: { bassert (m_state == stateHandshaked); m_state = stateActive; - m_peers.onPeerActivated(shared_from_this ()); + bassert(m_shortId == 0); + m_shortId = m_overlay.next_id(); + m_overlay.onPeerActivated(shared_from_this ()); } void start () @@ -461,7 +452,7 @@ public: //-------------------------------------------------------------------------- - void sendPacket (const PackedMessage::pointer& packet, bool onStrand) + void sendPacket (const Message::pointer& packet, bool onStrand) { if (packet) { @@ -486,12 +477,12 @@ public: void sendGetPeers () { // Ask peer for known other peers. - protocol::TMGetPeers getPeers; + protocol::TMGetPeers msg; - getPeers.set_doweneedthis (1); + msg.set_doweneedthis (1); - PackedMessage::pointer packet = boost::make_shared ( - getPeers, protocol::mtGET_PEERS); + Message::pointer packet = boost::make_shared ( + msg, protocol::mtGET_PEERS); sendPacket (packet, true); } @@ -502,6 +493,14 @@ public: detach ("resource"); } + static void charge (boost::weak_ptr & peer, Resource::Charge const& fee) + { + Peer::ptr p (peer.lock()); + + if (p != nullptr) + p->charge (fee); + } + Json::Value json () { Json::Value ret (Json::objectValue); @@ -617,13 +616,6 @@ public: return false; } - void setShortId(Peer::ShortId shortId) - { - bassert((m_shortId == 0) && (shortId != 0)); - - m_shortId = shortId; - } - Peer::ShortId getShortId () const { return m_shortId; @@ -696,7 +688,7 @@ private: if (!mSendQ.empty ()) { - PackedMessage::pointer packet = mSendQ.front (); + Message::pointer packet = mSendQ.front (); if (packet) { @@ -722,7 +714,7 @@ private: return; } - unsigned msg_len = PackedMessage::getLength (m_readBuffer); + unsigned msg_len = Message::getLength (m_readBuffer); // WRITEME: Compare to maximum message length, abort if too large if ((msg_len > (32 * 1024 * 1024)) || (msg_len == 0)) @@ -824,7 +816,7 @@ private: void processReadBuffer () { // must not hold peer lock - int type = PackedMessage::getType (m_readBuffer); + int type = Message::getType (m_readBuffer); LoadEvent::autoptr event ( getApp().getJobQueue ().getLoadEventAP (jtPEER, "Peer::read")); @@ -848,7 +840,7 @@ private: return; } - size_t msgLen (m_readBuffer.size () - PackedMessage::kHeaderBytes); + size_t msgLen (m_readBuffer.size () - Message::kHeaderBytes); switch (type) { @@ -857,7 +849,7 @@ private: event->reName ("Peer::hello"); protocol::TMHello msg; - if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvHello (msg); else @@ -870,7 +862,7 @@ private: event->reName ("Peer::cluster"); protocol::TMCluster msg; - if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvCluster (msg); else @@ -883,7 +875,7 @@ private: event->reName ("Peer::errormessage"); protocol::TMErrorMsg msg; - if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvErrorMessage (msg); else @@ -896,7 +888,7 @@ private: event->reName ("Peer::ping"); protocol::TMPing msg; - if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvPing (msg); else @@ -909,7 +901,7 @@ private: event->reName ("Peer::getcontacts"); protocol::TMGetContacts msg; - if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvGetContacts (msg); else @@ -922,7 +914,7 @@ private: event->reName ("Peer::contact"); protocol::TMContact msg; - if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvContact (msg); else @@ -935,7 +927,7 @@ private: event->reName ("Peer::getpeers"); protocol::TMGetPeers msg; - if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvGetPeers (msg); else @@ -948,7 +940,7 @@ private: event->reName ("Peer::peers"); protocol::TMPeers msg; - if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvPeers (msg); else @@ -961,7 +953,7 @@ private: event->reName ("Peer::endpoints"); protocol::TMEndpoints msg; - if(msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if(msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvEndpoints (msg); else @@ -974,7 +966,7 @@ private: event->reName ("Peer::searchtransaction"); protocol::TMSearchTransaction msg; - if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvSearchTransaction (msg); else @@ -987,7 +979,7 @@ private: event->reName ("Peer::getaccount"); protocol::TMGetAccount msg; - if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvGetAccount (msg); else @@ -1000,7 +992,7 @@ private: event->reName ("Peer::account"); protocol::TMAccount msg; - if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvAccount (msg); else @@ -1013,7 +1005,7 @@ private: event->reName ("Peer::transaction"); protocol::TMTransaction msg; - if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvTransaction (msg); else @@ -1026,7 +1018,7 @@ private: event->reName ("Peer::statuschange"); protocol::TMStatusChange msg; - if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvStatus (msg); else @@ -1040,7 +1032,7 @@ private: boost::shared_ptr msg ( boost::make_shared ()); - if (msg->ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg->ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvPropose (msg); else @@ -1054,7 +1046,7 @@ private: boost::shared_ptr msg ( boost::make_shared ()); - if (msg->ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg->ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvGetLedger (msg); else @@ -1068,7 +1060,7 @@ private: boost::shared_ptr msg ( boost::make_shared ()); - if (msg->ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg->ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvLedger (msg); else @@ -1081,7 +1073,7 @@ private: event->reName ("Peer::haveset"); protocol::TMHaveTransactionSet msg; - if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvHaveTxSet (msg); else @@ -1095,7 +1087,7 @@ private: boost::shared_ptr msg ( boost::make_shared ()); - if (msg->ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg->ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvValidation (msg); else @@ -1108,7 +1100,7 @@ private: { protocol::TM msg; - if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], msgLen)) + if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recv (msg); else m_journal.warning << "parse error: " << type; @@ -1123,7 +1115,7 @@ private: boost::shared_ptr msg = boost::make_shared (); - if (msg->ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg->ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvGetObjectByHash (msg); else @@ -1136,7 +1128,7 @@ private: event->reName ("Peer::proofofwork"); protocol::TMProofWork msg; - if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], + if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen)) recvProofWork (msg); else @@ -1158,7 +1150,7 @@ private: if (!m_detaching) { m_readBuffer.clear (); - m_readBuffer.resize (PackedMessage::kHeaderBytes); + m_readBuffer.resize (Message::kHeaderBytes); boost::asio::async_read (getStream (), boost::asio::buffer (m_readBuffer), @@ -1171,17 +1163,17 @@ private: void startReadBody (unsigned msg_len) { - // The first PackedMessage::kHeaderBytes bytes of m_readbuf already + // The first Message::kHeaderBytes bytes of m_readbuf already // contains the header. Expand it to fit in the body as well, and // start async read into the body. if (!m_detaching) { - m_readBuffer.resize (PackedMessage::kHeaderBytes + msg_len); + m_readBuffer.resize (Message::kHeaderBytes + msg_len); boost::asio::async_read (getStream (), boost::asio::buffer ( - &m_readBuffer [PackedMessage::kHeaderBytes], msg_len), + &m_readBuffer [Message::kHeaderBytes], msg_len), m_strand.wrap (boost::bind ( &PeerImp::handleReadBody, boost::static_pointer_cast (shared_from_this ()), @@ -1190,7 +1182,7 @@ private: } } - void sendPacketForce (const PackedMessage::pointer& packet) + void sendPacketForce (const Message::pointer& packet) { // must be on IO strand if (!m_detaching) @@ -1339,7 +1331,7 @@ private: h.set_ledgerprevious (hash.begin (), hash.GetSerializeSize ()); } - PackedMessage::pointer packet = boost::make_shared ( + Message::pointer packet = boost::make_shared ( h, protocol::mtHELLO); sendPacket (packet, true); @@ -1597,7 +1589,7 @@ private: isTrusted ? jtVALIDATION_t : jtVALIDATION_ut, "recvValidation->checkValidation", BIND_TYPE ( - &PeerImp::checkValidation, P_1, &m_peers, val, + &PeerImp::checkValidation, P_1, &m_overlay, val, isTrusted, m_clusterNode, packet, boost::weak_ptr (shared_from_this ()))); } @@ -1638,7 +1630,7 @@ private: // response with some data here anyways, and send if non-empty. sendPacket ( - boost::make_shared (peers, protocol::mtPEERS), + boost::make_shared (peers, protocol::mtPEERS), true); #endif } @@ -1761,7 +1753,7 @@ private: " of " << packet.objects_size () << " for " << to_string (this); sendPacket ( - boost::make_shared (reply, protocol::mtGET_OBJECTS), + boost::make_shared (reply, protocol::mtGET_OBJECTS), true); } else @@ -1824,7 +1816,7 @@ private: if (packet.type () == protocol::TMPing::ptPING) { packet.set_type (protocol::TMPing::ptPONG); - sendPacket (boost::make_shared (packet, protocol::mtPING), true); + sendPacket (boost::make_shared (packet, protocol::mtPING), true); } } @@ -1847,7 +1839,7 @@ private: void recvGetLedger (boost::shared_ptr const& packet) { getApp().getJobQueue().addJob (jtPACK, "recvGetLedger", - std::bind (&sGetLedger, boost::weak_ptr (shared_from_this ()), packet)); + std::bind (&sGetLedger, boost::weak_ptr (shared_from_this ()), packet)); } void recvLedger (boost::shared_ptr const& packet_ptr) @@ -1862,12 +1854,12 @@ private: if (packet.has_requestcookie ()) { - Peer::pointer target = m_peers.findPeerByShortID (packet.requestcookie ()); + Peer::ptr target = m_overlay.findPeerByShortID (packet.requestcookie ()); if (target) { packet.clear_requestcookie (); - target->sendPacket (boost::make_shared (packet, protocol::mtLEDGER_DATA), false); + target->sendPacket (boost::make_shared (packet, protocol::mtLEDGER_DATA), false); } else { @@ -2067,7 +2059,7 @@ private: getApp().getJobQueue ().addJob (isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, "recvPropose->checkPropose", BIND_TYPE ( - &PeerImp::checkPropose, P_1, &m_peers, packet, proposal, consensusLCL, + &PeerImp::checkPropose, P_1, &m_overlay, packet, proposal, consensusLCL, m_nodePublicKey, boost::weak_ptr (shared_from_this ()), m_clusterNode)); } @@ -2228,7 +2220,31 @@ private: { m_journal.debug << "Trying to route TX set request"; - Peers::PeerSequence usablePeers (m_peers.foreach ( + struct get_usable_peers + { + typedef Overlay::PeerSequence return_type; + + Overlay::PeerSequence usablePeers; + uint256 const& txHash; + Peer const* skip; + + get_usable_peers(uint256 const& hash, Peer const* s) + : txHash(hash), skip(s) + { } + + void operator() (Peer::ptr const& peer) + { + if (peer->hasTxSet (txHash) && (peer.get () != skip)) + usablePeers.push_back (peer); + } + + return_type operator() () + { + return usablePeers; + } + }; + + Overlay::PeerSequence usablePeers (m_overlay.foreach ( get_usable_peers (txHash, this))); if (usablePeers.empty ()) @@ -2237,10 +2253,10 @@ private: return; } - Peer::ref selectedPeer = usablePeers[rand () % usablePeers.size ()]; + Peer::ptr const& selectedPeer = usablePeers[rand () % usablePeers.size ()]; packet.set_requestcookie (getShortId ()); selectedPeer->sendPacket ( - boost::make_shared (packet, protocol::mtGET_LEDGER), + boost::make_shared (packet, protocol::mtGET_LEDGER), false); return; } @@ -2297,9 +2313,9 @@ private: if (packet.has_ledgerseq ()) seq = packet.ledgerseq (); - Peers::PeerSequence peerList = m_peers.getActivePeers (); - Peers::PeerSequence usablePeers; - BOOST_FOREACH (Peer::ref peer, peerList) + Overlay::PeerSequence peerList = m_overlay.getActivePeers (); + Overlay::PeerSequence usablePeers; + BOOST_FOREACH (Peer::ptr const& peer, peerList) { if (peer->hasLedger (ledgerhash, seq) && (peer.get () != this)) usablePeers.push_back (peer); @@ -2311,10 +2327,10 @@ private: return; } - Peer::ref selectedPeer = usablePeers[rand () % usablePeers.size ()]; + Peer::ptr const& selectedPeer = usablePeers[rand () % usablePeers.size ()]; packet.set_requestcookie (getShortId ()); selectedPeer->sendPacket ( - boost::make_shared (packet, protocol::mtGET_LEDGER), false); + boost::make_shared (packet, protocol::mtGET_LEDGER), false); m_journal.debug << "Ledger request routed"; return; } @@ -2404,7 +2420,7 @@ private: } } - PackedMessage::pointer oPacket = boost::make_shared (reply, protocol::mtLEDGER_DATA); + Message::pointer oPacket = boost::make_shared (reply, protocol::mtLEDGER_DATA); sendPacket (oPacket, false); return; } @@ -2488,15 +2504,17 @@ private: } } - PackedMessage::pointer oPacket = boost::make_shared (reply, protocol::mtLEDGER_DATA); + Message::pointer oPacket = boost::make_shared (reply, protocol::mtLEDGER_DATA); sendPacket (oPacket, false); } // This is dispatched by the job queue - static void sGetLedger (boost::weak_ptr wPeer, - boost::shared_ptr packet) + static + void + sGetLedger (boost::weak_ptr wPeer, + boost::shared_ptr packet) { - boost::shared_ptr peer = wPeer.lock (); + boost::shared_ptr peer = wPeer.lock (); if (peer) peer->getLedger (*packet); @@ -2584,14 +2602,14 @@ private: } else { - Peer::pointer pptr (peer.lock ()); + Peer::ptr pptr (peer.lock ()); if (pptr) { protocol::TMProofWork reply; reply.set_token (pow->getToken ()); reply.set_response (solution.begin (), solution.size ()); - pptr->sendPacket (boost::make_shared (reply, protocol::mtPROOFOFWORK), false); + pptr->sendPacket (boost::make_shared (reply, protocol::mtPROOFOFWORK), false); } else { @@ -2612,7 +2630,7 @@ private: getApp().getLedgerMaster().getValidLedgerIndex())) { // Transaction has expired getApp().getHashRouter().setFlag(stx->getTransactionID(), SF_BAD); - Peer::charge (peer, Resource::feeUnwantedData); + charge (peer, Resource::feeUnwantedData); return; } @@ -2623,7 +2641,7 @@ private: if (tx->getStatus () == INVALID) { getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_BAD); - Peer::charge (peer, Resource::feeInvalidSignature); + charge (peer, Resource::feeInvalidSignature); return; } else @@ -2636,14 +2654,14 @@ private: catch (...) { getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_BAD); - Peer::charge (peer, Resource::feeInvalidRequest); + charge (peer, Resource::feeInvalidRequest); } #endif } // Called from our JobQueue - static void checkPropose (Job& job, Peers* pPeers, boost::shared_ptr packet, + static void checkPropose (Job& job, Overlay* pPeers, boost::shared_ptr packet, LedgerProposal::pointer proposal, uint256 consensusLCL, RippleAddress nodePublic, boost::weak_ptr peer, bool fromCluster) { @@ -2667,10 +2685,10 @@ private: if (!fromCluster && !proposal->checkSign (set.signature ())) { - Peer::pointer p = peer.lock (); + Peer::ptr p = peer.lock (); WriteLog(lsWARNING, Peer) << "proposal with previous ledger fails sig check: " << *p; - Peer::charge (peer, Resource::feeInvalidSignature); + charge (peer, Resource::feeInvalidSignature); return; } else @@ -2705,7 +2723,7 @@ private: proposal->getSuppressionID (), peers, SF_RELAYED)) { pPeers->foreach (send_if_not ( - boost::make_shared (set, protocol::mtPROPOSE_LEDGER), + boost::make_shared (set, protocol::mtPROPOSE_LEDGER), peer_in_set(peers))); } } @@ -2715,7 +2733,7 @@ private: } } - static void checkValidation (Job&, Peers* pPeers, SerializedValidation::pointer val, bool isTrusted, bool isCluster, + static void checkValidation (Job&, Overlay* pPeers, SerializedValidation::pointer val, bool isTrusted, bool isCluster, boost::shared_ptr packet, boost::weak_ptr peer) { #ifndef TRUST_NETWORK @@ -2727,12 +2745,12 @@ private: if (!isCluster && !val->isValid (signingHash)) { WriteLog(lsWARNING, Peer) << "Validation is invalid"; - Peer::charge (peer, Resource::feeInvalidRequest); + charge (peer, Resource::feeInvalidRequest); return; } std::string source; - Peer::pointer lp = peer.lock (); + Peer::ptr lp = peer.lock (); if (lp) source = to_string(*lp); @@ -2757,7 +2775,7 @@ private: getApp().getHashRouter ().swapSet (signingHash, peers, SF_RELAYED)) { pPeers->foreach (send_if_not ( - boost::make_shared (*packet, protocol::mtVALIDATION), + boost::make_shared (*packet, protocol::mtVALIDATION), peer_in_set(peers))); } } @@ -2766,7 +2784,7 @@ private: catch (...) { WriteLog(lsTRACE, Peer) << "Exception processing validation"; - Peer::charge (peer, Resource::feeInvalidRequest); + charge (peer, Resource::feeInvalidRequest); } #endif } @@ -2774,18 +2792,13 @@ private: //------------------------------------------------------------------------------ -void Peer::charge (boost::weak_ptr & peer, Resource::Charge const& fee) -{ - Peer::pointer p (peer.lock()); - - if (p != nullptr) - p->charge (fee); -} - const boost::posix_time::seconds PeerImp::nodeVerifySeconds (15); //------------------------------------------------------------------------------ -std::string to_string (PeerImp const& peer) + +// to_string should not be used we should just use lexical_cast maybe + +inline std::string to_string (PeerImp const& peer) { if (peer.isInCluster()) return peer.getClusterNodeName(); @@ -2793,19 +2806,19 @@ std::string to_string (PeerImp const& peer) return peer.getRemoteAddress().to_string(); } -std::string to_string (PeerImp const* peer) +inline std::string to_string (PeerImp const* peer) { return to_string (*peer); } -std::ostream& operator<< (std::ostream& os, PeerImp const& peer) +inline std::ostream& operator<< (std::ostream& os, PeerImp const& peer) { os << to_string (peer); return os; } -std::ostream& operator<< (std::ostream& os, PeerImp const* peer) +inline std::ostream& operator<< (std::ostream& os, PeerImp const* peer) { os << to_string (peer); @@ -2814,7 +2827,7 @@ std::ostream& operator<< (std::ostream& os, PeerImp const* peer) //------------------------------------------------------------------------------ -std::string to_string (Peer const& peer) +inline std::string to_string (Peer const& peer) { if (peer.isInCluster()) return peer.getClusterNodeName(); @@ -2822,19 +2835,19 @@ std::string to_string (Peer const& peer) return peer.getRemoteAddress().to_string(); } -std::string to_string (Peer const* peer) +inline std::string to_string (Peer const* peer) { return to_string (*peer); } -std::ostream& operator<< (std::ostream& os, Peer const& peer) +inline std::ostream& operator<< (std::ostream& os, Peer const& peer) { os << to_string (peer); return os; } -std::ostream& operator<< (std::ostream& os, Peer const* peer) +inline std::ostream& operator<< (std::ostream& os, Peer const* peer) { os << to_string (peer); diff --git a/src/ripple_overlay/impl/Peers.cpp b/src/ripple_overlay/impl/Peers.cpp deleted file mode 100644 index 2f537d97d..000000000 --- a/src/ripple_overlay/impl/Peers.cpp +++ /dev/null @@ -1,637 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#include "PeerDoor.h" -#include "PeerImp.h" - -#include "../../ripple/common/seconds_clock.h" -#include "../../beast/beast/ByteOrder.h" - -#include - -#include -#include -#include - -namespace ripple { - -SETUP_LOG (Peer) - -class PeersLog; -template <> char const* LogPartition::getPartitionName () { return "Peers"; } - -class PeerFinderLog; -template <> char const* LogPartition::getPartitionName () { return "PeerFinder"; } - -class NameResolverLog; -template <> char const* LogPartition::getPartitionName () { return "NameResolver"; } - -/** Calls a function during static initialization. */ -struct static_call -{ - // Function must be callable as - // void f (void) const - // - template - static_call (Function const& f) - { - f (); - } -}; - -static static_call init_PeerFinderLog (&LogPartition::get ); -static static_call init_NameResolverLog (&LogPartition::get ); - -//------------------------------------------------------------------------------ - -/** A functor to visit all active peers and retrieve their JSON data */ -struct get_peer_json -{ - typedef Json::Value return_type; - - Json::Value json; - - get_peer_json () - { } - - void operator() (Peer::ref peer) - { - json.append (peer->json ()); - } - - Json::Value operator() () - { - return json; - } -}; - -//------------------------------------------------------------------------------ - -class PeersImp - : public Peers - , public PeerFinder::Callback - , public beast::LeakChecked -{ -public: - typedef std::unordered_map > PeersBySlot; - - typedef boost::unordered_map < - RippleAddress, Peer::pointer> PeerByPublicKey; - - typedef boost::unordered_map < - Peer::ShortId, Peer::pointer> PeerByShortId; - - std::recursive_mutex m_mutex; - - // Blocks us until dependent objects have been destroyed - std::condition_variable_any m_cond; - - // Number of dependencies that must be destroyed before we can stop - std::size_t m_child_count; - - beast::Journal m_journal; - Resource::Manager& m_resourceManager; - - std::unique_ptr m_peerFinder; - - boost::asio::io_service& m_io_service; - boost::asio::ssl::context& m_ssl_context; - - /** Associates slots to peers. */ - PeersBySlot m_peers; - - /** Tracks peers by their public key */ - PeerByPublicKey m_publicKeyMap; - - /** Tracks peers by their session ID */ - PeerByShortId m_shortIdMap; - - /** The peer door for regular SSL connections */ - std::unique_ptr m_doorDirect; - - /** The peer door for proxy connections */ - std::unique_ptr m_doorProxy; - - /** The resolver we use for peer hostnames */ - Resolver& m_resolver; - - /** Monotically increasing identifiers for peers */ - beast::Atomic m_nextShortId; - - //-------------------------------------------------------------------------- - // - // Peers - // - //-------------------------------------------------------------------------- - - PeersImp (Stoppable& parent, - Resource::Manager& resourceManager, - SiteFiles::Manager& siteFiles, - beast::File const& pathToDbFileOrDirectory, - Resolver& resolver, - boost::asio::io_service& io_service, - boost::asio::ssl::context& ssl_context) - : Peers (parent) - , m_child_count (1) - , m_journal (LogPartition::getJournal ()) - , m_resourceManager (resourceManager) - , m_peerFinder (add (PeerFinder::Manager::New ( - *this, - siteFiles, - pathToDbFileOrDirectory, - *this, - get_seconds_clock (), - LogPartition::getJournal ()))) - , m_io_service (io_service) - , m_ssl_context (ssl_context) - , m_resolver (resolver) - { - } - - ~PeersImp () - { - // Block until dependent objects have been destroyed. - // This is just to catch improper use of the Stoppable API. - // - std::unique_lock lock (m_mutex); - m_cond.wait (lock, [this] { - return this->m_child_count == 0; }); - } - - void accept ( - bool proxyHandshake, - boost::shared_ptr const& socket) - { - // An error getting an endpoint means the connection closed. - // Just do nothing and the socket will be closed by the caller. - boost::system::error_code ec; - auto const local_endpoint_native (socket->local_endpoint (ec)); - if (ec) - return; - auto const remote_endpoint_native (socket->remote_endpoint (ec)); - if (ec) - return; - - auto const local_endpoint ( - beast::IPAddressConversion::from_asio (local_endpoint_native)); - auto const remote_endpoint ( - beast::IPAddressConversion::from_asio (remote_endpoint_native)); - - PeerFinder::Slot::ptr const slot (m_peerFinder->new_inbound_slot ( - local_endpoint, remote_endpoint)); - - if (slot == nullptr) - return; - - MultiSocket::Flag flags ( - MultiSocket::Flag::server_role | MultiSocket::Flag::ssl_required); - - if (proxyHandshake) - flags = flags.with (MultiSocket::Flag::proxy); - - PeerImp::ptr const peer (boost::make_shared ( - socket, remote_endpoint, *this, m_resourceManager, *m_peerFinder, - slot, m_ssl_context, flags)); - - { - std::lock_guard lock (m_mutex); - { - std::pair const result ( - m_peers.emplace (slot, peer)); - assert (result.second); - } - ++m_child_count; - - // This has to happen while holding the lock, - // otherwise the socket might not be canceled during a stop. - peer->start (); - } - } - - void connect (beast::IP::Endpoint const& remote_endpoint) - { - if (isStopping()) - { - m_journal.debug << - "Skipping " << remote_endpoint << - " connect on stop"; - return; - } - - PeerFinder::Slot::ptr const slot ( - m_peerFinder->new_outbound_slot (remote_endpoint)); - - if (slot == nullptr) - return; - - MultiSocket::Flag const flags ( - MultiSocket::Flag::client_role | MultiSocket::Flag::ssl); - - PeerImp::ptr const peer (boost::make_shared ( - remote_endpoint, m_io_service, *this, m_resourceManager, - *m_peerFinder, slot, m_ssl_context, flags)); - - { - std::lock_guard lock (m_mutex); - { - std::pair const result ( - m_peers.emplace (slot, peer)); - assert (result.second); - } - ++m_child_count; - - // This has to happen while holding the lock, - // otherwise the socket might not be canceled during a stop. - peer->start (); - } - } - - //-------------------------------------------------------------------------- - - // Check for the stopped condition - // Caller must hold the mutex - void check_stopped () - { - // To be stopped, child Stoppable objects must be stopped - // and the count of dependent objects must be zero - if (areChildrenStopped () && m_child_count == 0) - { - m_cond.notify_all (); - m_journal.info << - "Stopped."; - stopped (); - } - } - - // Decrement the count of dependent objects - // Caller must hold the mutex - void release () - { - if (--m_child_count == 0) - check_stopped (); - } - - void remove (PeerFinder::Slot::ptr const& slot) - { - std::lock_guard lock (m_mutex); - - PeersBySlot::iterator const iter (m_peers.find (slot)); - assert (iter != m_peers.end ()); - m_peers.erase (iter); - - release(); - } - - //-------------------------------------------------------------------------- - // - // PeerFinder::Callback - // - //-------------------------------------------------------------------------- - - void connect (std::vector const& list) - { - for (std::vector ::const_iterator iter (list.begin()); - iter != list.end(); ++iter) - connect (*iter); - } - - void activate (PeerFinder::Slot::ptr const& slot) - { - m_journal.trace << - "Activate " << slot->remote_endpoint(); - - std::lock_guard lock (m_mutex); - - PeersBySlot::iterator const iter (m_peers.find (slot)); - assert (iter != m_peers.end ()); - PeerImp::ptr const peer (iter->second.lock()); - assert (peer != nullptr); - peer->activate (); - } - - void send (PeerFinder::Slot::ptr const& slot, - std::vector const& endpoints) - { - typedef std::vector List; - protocol::TMEndpoints tm; - for (List::const_iterator iter (endpoints.begin()); - iter != endpoints.end(); ++iter) - { - PeerFinder::Endpoint const& ep (*iter); - protocol::TMEndpoint& tme (*tm.add_endpoints()); - if (ep.address.is_v4()) - tme.mutable_ipv4()->set_ipv4( - beast::toNetworkByteOrder (ep.address.to_v4().value)); - else - tme.mutable_ipv4()->set_ipv4(0); - tme.mutable_ipv4()->set_ipv4port (ep.address.port()); - - tme.set_hops (ep.hops); - } - - tm.set_version (1); - - PackedMessage::pointer msg ( - boost::make_shared ( - tm, protocol::mtENDPOINTS)); - - { - std::lock_guard lock (m_mutex); - PeersBySlot::iterator const iter (m_peers.find (slot)); - assert (iter != m_peers.end ()); - PeerImp::ptr const peer (iter->second.lock()); - assert (peer != nullptr); - peer->sendPacket (msg, false); - } - } - - void disconnect (PeerFinder::Slot::ptr const& slot, bool graceful) - { - if (m_journal.trace) m_journal.trace << - "Disconnect " << slot->remote_endpoint () << - (graceful ? "gracefully" : ""); - - std::lock_guard lock (m_mutex); - - PeersBySlot::iterator const iter (m_peers.find (slot)); - assert (iter != m_peers.end ()); - PeerImp::ptr const peer (iter->second.lock()); - assert (peer != nullptr); - peer->close (graceful); - //peer->detach ("disc", false); - } - - //-------------------------------------------------------------------------- - // - // Stoppable - // - //-------------------------------------------------------------------------- - - void onPrepare () - { - PeerFinder::Config config; - - if (getConfig ().PEERS_MAX != 0) - config.maxPeers = getConfig ().PEERS_MAX; - - config.outPeers = config.calcOutPeers(); - - config.wantIncoming = - (! getConfig ().PEER_PRIVATE) && - (getConfig().peerListeningPort != 0); - - // if it's a private peer or we are running as standalone - // automatic connections would defeat the purpose. - config.autoConnect = - !getConfig().RUN_STANDALONE && - !getConfig().PEER_PRIVATE; - - config.listeningPort = getConfig().peerListeningPort; - - config.features = ""; - - // Enforce business rules - config.applyTuning (); - - m_peerFinder->setConfig (config); - - auto bootstrapIps (getConfig ().IPS); - - // If no IPs are specified, use the Ripple Labs round robin - // pool to get some servers to insert into the boot cache. - if (bootstrapIps.empty ()) - bootstrapIps.push_back ("r.ripple.com 51235"); - - if (!bootstrapIps.empty ()) - { - m_resolver.resolve (bootstrapIps, - [this]( - std::string const& name, - std::vector const& addresses) - { - std::vector ips; - - for (auto const& addr : addresses) - ips.push_back (to_string (addr)); - - std::string const base ("config: "); - - if (!ips.empty ()) - m_peerFinder->addFallbackStrings (base + name, ips); - }); - } - - // Add the ips_fixed from the rippled.cfg file - if (! getConfig ().RUN_STANDALONE && !getConfig ().IPS_FIXED.empty ()) - { - m_resolver.resolve (getConfig ().IPS_FIXED, - [this]( - std::string const& name, - std::vector const& addresses) - { - if (!addresses.empty ()) - m_peerFinder->addFixedPeer (name, addresses); - }); - } - - // Configure the peer doors, which allow the server to accept incoming - // peer connections: - if (! getConfig ().RUN_STANDALONE) - { - m_doorDirect = createPeerDoor ( - PeerDoor::sslRequired, - *this, - getConfig ().PEER_IP, - getConfig ().peerListeningPort, - m_io_service); - - if (getConfig ().peerPROXYListeningPort != 0) - { - m_doorProxy = createPeerDoor ( - PeerDoor::sslAndPROXYRequired, - *this, - getConfig ().PEER_IP, - getConfig ().peerPROXYListeningPort, - m_io_service); - } - } - } - - void onStart () - { - } - - /** Close all peer connections. - If `graceful` is true then active - Requirements: - Caller must hold the mutex. - */ - void close_all (bool graceful) - { - for (auto const& entry : m_peers) - { - PeerImp::ptr const peer (entry.second.lock()); - - // VFALCO The only case where the weak_ptr is expired should be if - // ~PeerImp is pre-empted before it calls m_peers.remove() - // - if (peer != nullptr) - peer->close (graceful); - } - } - - void onStop () - { - std::lock_guard lock (m_mutex); - // Take off the extra count we added in the constructor - release(); - - close_all (false); - } - - void onChildrenStopped () - { - std::lock_guard lock (m_mutex); - check_stopped (); - } - - //-------------------------------------------------------------------------- - // - // PropertyStream - // - //-------------------------------------------------------------------------- - - void onWrite (beast::PropertyStream::Map& stream) - { - } - - //-------------------------------------------------------------------------- - /** A peer has connected successfully - This is called after the peer handshake has been completed and during - peer activation. At this point, the peer address and the public key - are known. - */ - void onPeerActivated (Peer::ref peer) - { - // First assign this peer a new short ID - peer->setShortId(++m_nextShortId); - - std::lock_guard lock (m_mutex); - - // Now track this peer - std::pair idResult( - m_shortIdMap.emplace ( - boost::unordered::piecewise_construct, - boost::make_tuple (peer->getShortId()), - boost::make_tuple (peer))); - assert(idResult.second); - - std::pair keyResult( - m_publicKeyMap.emplace ( - boost::unordered::piecewise_construct, - boost::make_tuple (peer->getNodePublic()), - boost::make_tuple (peer))); - assert(keyResult.second); - - m_journal.debug << - "activated " << peer->getRemoteAddress() << - " (" << peer->getShortId() << - ":" << RipplePublicKey(peer->getNodePublic()) << ")"; - - // We just accepted this peer so we have non-zero active peers - assert(size() != 0); - } - - /** A peer is being disconnected - This is called during the disconnection of a known, activated peer. It - will not be called for outbound peer connections that don't succeed or - for connections of peers that are dropped prior to being activated. - */ - void onPeerDisconnect (Peer::ref peer) - { - std::lock_guard lock (m_mutex); - m_shortIdMap.erase (peer->getShortId ()); - m_publicKeyMap.erase (peer->getNodePublic ()); - } - - /** The number of active peers on the network - Active peers are only those peers that have completed the handshake - and are running the Ripple protocol. - */ - std::size_t size () - { - std::lock_guard lock (m_mutex); - return m_publicKeyMap.size (); - } - - // Returns information on verified peers. - Json::Value json () - { - return foreach (get_peer_json()); - } - - Peers::PeerSequence getActivePeers () - { - Peers::PeerSequence ret; - - std::lock_guard lock (m_mutex); - - ret.reserve (m_publicKeyMap.size ()); - - BOOST_FOREACH (PeerByPublicKey::value_type const& pair, m_publicKeyMap) - { - assert (!!pair.second); - ret.push_back (pair.second); - } - - return ret; - } - - Peer::pointer findPeerByShortID (Peer::ShortId const& id) - { - std::lock_guard lock (m_mutex); - PeerByShortId::iterator const iter ( - m_shortIdMap.find (id)); - if (iter != m_shortIdMap.end ()) - return iter->second; - return Peer::pointer(); - } -}; - -//------------------------------------------------------------------------------ - -Peers::~Peers () -{ -} - -Peers* Peers::New ( - Stoppable& parent, - Resource::Manager& resourceManager, - SiteFiles::Manager& siteFiles, - beast::File const& pathToDbFileOrDirectory, - Resolver& resolver, - boost::asio::io_service& io_service, - boost::asio::ssl::context& ssl_context) -{ - return new PeersImp (parent, resourceManager, siteFiles, - pathToDbFileOrDirectory, resolver, io_service, ssl_context); -} - -} diff --git a/src/ripple_overlay/ripple_overlay.cpp b/src/ripple_overlay/ripple_overlay.cpp index 7c74a494c..5d6bad774 100644 --- a/src/ripple_overlay/ripple_overlay.cpp +++ b/src/ripple_overlay/ripple_overlay.cpp @@ -19,17 +19,8 @@ #include "../../BeastConfig.h" -#include "ripple_overlay.h" - -#include "../ripple_app/ripple_app.h" - -#include "../ripple/validators/ripple_validators.h" -#include "../ripple/peerfinder/ripple_peerfinder.h" -#include "../ripple_app/misc/ProofOfWork.h" -#include "../ripple_app/misc/ProofOfWorkFactory.h" - -#include "impl/PackedMessage.cpp" +#include "impl/Message.cpp" +#include "impl/OverlayImpl.cpp" #include "impl/PeerImp.h" #include "impl/PeerDoor.cpp" -#include "impl/Peers.cpp" diff --git a/src/ripple_overlay/ripple_overlay.h b/src/ripple_overlay/ripple_overlay.h index d82445a21..531132cbc 100644 --- a/src/ripple_overlay/ripple_overlay.h +++ b/src/ripple_overlay/ripple_overlay.h @@ -19,11 +19,4 @@ #ifndef RIPPLE_OVERLAY_H_INCLUDED #define RIPPLE_OVERLAY_H_INCLUDED - -#include "../ripple_data/ripple_data.h" - -#include "api/PackedMessage.h" -#include "api/Peer.h" -#include "api/Peers.h" - #endif