Refactor ripple_overlay:

* Use rvalue move to receive accepted sockets
  * Split asio dependent APIs to their own class and file
  * Update documentation
  * Organize code into different files
  * Make some members private
  * Rename things for clarity
This commit is contained in:
Vinnie Falco
2014-04-09 20:38:23 -07:00
parent 898b7eb6f0
commit 62354350a3
54 changed files with 1728 additions and 1387 deletions

View File

@@ -1854,7 +1854,7 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple_overlay\impl\PackedMessage.cpp">
<ClCompile Include="..\..\src\ripple_overlay\impl\Message.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
@@ -1866,7 +1866,7 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple_overlay\impl\Peers.cpp">
<ClCompile Include="..\..\src\ripple_overlay\impl\OverlayImpl.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
@@ -2683,12 +2683,15 @@
<ClInclude Include="..\..\src\ripple_net\rpc\RPCErr.h" />
<ClInclude Include="..\..\src\ripple_net\rpc\RPCSub.h" />
<ClInclude Include="..\..\src\ripple_net\rpc\RPCUtil.h" />
<ClInclude Include="..\..\src\ripple_overlay\api\PackedMessage.h" />
<ClInclude Include="..\..\src\ripple_overlay\api\make_Overlay.h" />
<ClInclude Include="..\..\src\ripple_overlay\api\Message.h" />
<ClInclude Include="..\..\src\ripple_overlay\api\Peer.h" />
<ClInclude Include="..\..\src\ripple_overlay\api\Peers.h" />
<ClInclude Include="..\..\src\ripple_overlay\api\Overlay.h" />
<ClInclude Include="..\..\src\ripple_overlay\api\predicates.h" />
<ClInclude Include="..\..\src\ripple_overlay\impl\MessageStream.h" />
<ClInclude Include="..\..\src\ripple_overlay\impl\OverlayImpl.h" />
<ClInclude Include="..\..\src\ripple_overlay\impl\PeerDoor.h" />
<ClInclude Include="..\..\src\ripple_overlay\impl\PeerImp.h" />
<ClInclude Include="..\..\src\ripple_overlay\ripple_overlay.h" />
<ClInclude Include="..\..\src\ripple_rpc\api\ErrorCodes.h" />
<ClInclude Include="..\..\src\ripple_rpc\api\Manager.h" />
<ClInclude Include="..\..\src\ripple_rpc\api\Request.h" />

View File

@@ -1413,12 +1413,6 @@
<ClCompile Include="..\..\src\ripple_overlay\impl\PeerDoor.cpp">
<Filter>[2] Old Ripple\ripple_overlay\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple_overlay\impl\Peers.cpp">
<Filter>[2] Old Ripple\ripple_overlay\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple_overlay\impl\PackedMessage.cpp">
<Filter>[2] Old Ripple\ripple_overlay\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple_overlay\ripple_overlay.cpp">
<Filter>[2] Old Ripple\ripple_overlay</Filter>
</ClCompile>
@@ -1509,15 +1503,18 @@
<ClCompile Include="..\..\src\ripple_app\tx\LocalTxs.cpp">
<Filter>[2] Old Ripple\ripple_app\tx</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\common\tests\cross_offer.test.cpp">
<Filter>[1] Ripple\common\tests</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple_app\book\tests\Quality.test.cpp">
<Filter>[2] Old Ripple\ripple_app\book\tests</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple_app\book\tests\OfferStream.test.cpp">
<Filter>[2] Old Ripple\ripple_app\book\tests</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple_overlay\impl\Message.cpp">
<Filter>[2] Old Ripple\ripple_overlay\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple_overlay\impl\OverlayImpl.cpp">
<Filter>[2] Old Ripple\ripple_overlay\impl</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\src\ripple_basics\containers\RangeSet.h">
@@ -2984,15 +2981,6 @@
<ClInclude Include="..\..\src\ripple_overlay\api\Peer.h">
<Filter>[2] Old Ripple\ripple_overlay\api</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple_overlay\api\Peers.h">
<Filter>[2] Old Ripple\ripple_overlay\api</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple_overlay\api\PackedMessage.h">
<Filter>[2] Old Ripple\ripple_overlay\api</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple_overlay\ripple_overlay.h">
<Filter>[2] Old Ripple\ripple_overlay</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\handout.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
@@ -3105,6 +3093,24 @@
<ClInclude Include="..\..\src\ripple_app\book\Offer.h">
<Filter>[2] Old Ripple\ripple_app\book</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple_overlay\api\make_Overlay.h">
<Filter>[2] Old Ripple\ripple_overlay\api</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple_overlay\impl\OverlayImpl.h">
<Filter>[2] Old Ripple\ripple_overlay\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple_overlay\api\predicates.h">
<Filter>[2] Old Ripple\ripple_overlay\api</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple_overlay\impl\MessageStream.h">
<Filter>[2] Old Ripple\ripple_overlay\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple_overlay\api\Message.h">
<Filter>[2] Old Ripple\ripple_overlay\api</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple_overlay\api\Overlay.h">
<Filter>[2] Old Ripple\ripple_overlay\api</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<CustomBuild Include="..\..\src\ripple_data\protocol\ripple.proto">

View File

@@ -3,12 +3,15 @@
## Introduction
Each _peer_ (a running instance of the **rippled** program) on the Ripple network
maintains multiple TCP/IP connections to other peers (neighbors) who themselves
have neighboring peers. The resulting network is called a _peer to peer overlay
network_, or just [_overlay network_][overlay_network]. Messages passed along these
connections travel between peers and implement the communication layer of the
_Ripple peer protocol_.
The _Ripple payment network_ consists of a collection of _peers_ running the
**rippled software**. Each peer maintains multiple outgoing connections and
optional incoming connections to other peers. These connections are made over
both the public Internet and private local area networks. This network defines
a fully connected directed graph of nodes. Peers send and receive messages to
other connected peers. This peer to peer network, layered on top of the public
and private Internet, forms an [_overlay network_][overlay_network].
## Bootstrapping
When a peer comes online it needs a set of IP addresses to connect to in order to
gain initial entry into the overlay in a process called _bootstrapping_. Once they

View File

@@ -20,6 +20,8 @@
#ifndef RIPPLE_PEERFINDER_CALLBACK_H_INCLUDED
#define RIPPLE_PEERFINDER_CALLBACK_H_INCLUDED
#include "Types.h"
namespace ripple {
namespace PeerFinder {

View File

@@ -20,9 +20,12 @@
#ifndef RIPPLE_PEERFINDER_MANAGER_H_INCLUDED
#define RIPPLE_PEERFINDER_MANAGER_H_INCLUDED
#include "Config.h"
#include "Slot.h"
#include "Types.h"
#include "../../sitefiles/api/Manager.h"
#include "../../beast/modules/beast_core/files/File.h"
namespace ripple {

View File

@@ -20,6 +20,8 @@
#ifndef RIPPLE_PEERFINDER_SLOT_H_INCLUDED
#define RIPPLE_PEERFINDER_SLOT_H_INCLUDED
#include "../../types/api/RipplePublicKey.h"
#include "../../beast/beast/net/IPEndpoint.h"
#include <boost/optional.hpp>

View File

@@ -20,6 +20,8 @@
#ifndef RIPPLE_PEERFINDER_TYPES_H_INCLUDED
#define RIPPLE_PEERFINDER_TYPES_H_INCLUDED
#include "Endpoint.h"
#include "../../beast/beast/chrono/abstract_clock.h"
namespace ripple {

View File

@@ -20,9 +20,12 @@
#ifndef RIPPLE_RESOURCE_CONSUMER_H_INCLUDED
#define RIPPLE_RESOURCE_CONSUMER_H_INCLUDED
#include "Disposition.h"
namespace ripple {
namespace Resource {
struct Entry;
class Logic;
/** An endpoint that consumes resources. */

View File

@@ -20,6 +20,9 @@
#ifndef RIPPLE_RESOURCE_MANAGER_H_INCLUDED
#define RIPPLE_RESOURCE_MANAGER_H_INCLUDED
#include "Consumer.h"
#include "Gossip.h"
#include "../../beast/beast/insight/Collector.h"
#include "../../beast/beast/net/IPEndpoint.h"
#include "../../beast/beast/utility/Journal.h"

View File

@@ -20,6 +20,8 @@
#ifndef RIPPLE_SITEFILES_LISTENER_H_INCLUDED
#define RIPPLE_SITEFILES_LISTENER_H_INCLUDED
#include "SiteFile.h"
namespace ripple {
namespace SiteFiles {

View File

@@ -20,6 +20,8 @@
#ifndef RIPPLE_SITEFILES_MANAGER_H_INCLUDED
#define RIPPLE_SITEFILES_MANAGER_H_INCLUDED
#include "Listener.h"
#include "../../beast/beast/utility/PropertyStream.h"
namespace ripple {

View File

@@ -20,6 +20,9 @@
#ifndef RIPPLE_SITEFILES_SECTION_H_INCLUDED
#define RIPPLE_SITEFILES_SECTION_H_INCLUDED
#include <unordered_map>
#include <vector>
namespace ripple {
namespace SiteFiles {
@@ -30,7 +33,7 @@ namespace SiteFiles {
class Section
{
public:
typedef boost::unordered_map <std::string, std::string> MapType;
typedef std::unordered_map <std::string, std::string> MapType;
typedef std::vector <std::string> DataType;
Section(int = 0); // dummy argument for emplace()

View File

@@ -20,6 +20,11 @@
#ifndef RIPPLE_SITEFILES_SITEFILE_H_INCLUDED
#define RIPPLE_SITEFILES_SITEFILE_H_INCLUDED
#include "Section.h"
#include <string>
#include <unordered_map>
namespace ripple {
namespace SiteFiles {
@@ -28,7 +33,7 @@ class SiteFile
public:
SiteFile (int = 0); // dummy argument for emplace
typedef boost::unordered_map <std::string, Section> SectionsType;
typedef std::unordered_map <std::string, Section> SectionsType;
/** Retrieve a section by name. */
/** @{ */

View File

@@ -21,6 +21,8 @@
#define RIPPLE_TYPES_RIPPLEACCOUNTID_H_INCLUDED
#include "CryptoIdentifier.h"
#include "IdentifierType.h"
#include <array>
namespace ripple {

View File

@@ -20,6 +20,8 @@
#ifndef RIPPLE_TYPES_RIPPLEPUBLICKEYHASH_H_INCLUDED
#define RIPPLE_TYPES_RIPPLEPUBLICKEYHASH_H_INCLUDED
#include "SimpleIdentifier.h"
namespace ripple {
/** Traits for the public key hash. */

View File

@@ -26,6 +26,7 @@
#define RIPPLE_TYPES_UINT160_H_INCLUDED
#include "base_uint.h"
#include "strHex.h"
namespace ripple {

View File

@@ -26,6 +26,8 @@
#define RIPPLE_TYPES_UINT256_H_INCLUDED
#include "base_uint.h"
#include "Blob.h"
#include "ByteOrder.h"
#include <functional>

View File

@@ -25,6 +25,8 @@
#ifndef RIPPLE_TYPES_BASE_UINT_H_INCLUDED
#define RIPPLE_TYPES_BASE_UINT_H_INCLUDED
#include "strHex.h"
#include "../../beast/beast/container/hardened_hash.h"
#include <functional>

View File

@@ -17,6 +17,8 @@
*/
//==============================================================================
#include "../../ripple_overlay/api/predicates.h"
namespace ripple {
SETUP_LOG (LedgerConsensus)
@@ -786,7 +788,7 @@ public:
/** A peer has informed us that it can give us a transaction set
*/
bool peerHasSet (Peer::ref peer, uint256 const& hashSet
bool peerHasSet (Peer::ptr const& peer, uint256 const& hashSet
, protocol::TxSetStatus status)
{
if (status != protocol::tsHAVE) // Indirect requests for future support
@@ -811,7 +813,7 @@ public:
/** A peer has sent us some nodes from a transaction set
*/
SHAMapAddNode peerGaveNodes (Peer::ref peer
SHAMapAddNode peerGaveNodes (Peer::ptr const& peer
, uint256 const& setHash, const std::list<SHAMapNode>& nodeIDs
, const std::list< Blob >& nodeData)
{
@@ -963,8 +965,8 @@ private:
Blob validation = v->getSigned ();
protocol::TMValidation val;
val.set_validation (&validation[0], validation.size ());
getApp ().getPeers ().foreach (send_always (
boost::make_shared <PackedMessage> (
getApp ().overlay ().foreach (send_always (
boost::make_shared <Message> (
val, protocol::mtVALIDATION)));
WriteLog (lsINFO, LedgerConsensus)
<< "CNF Val " << newLCLHash;
@@ -1076,7 +1078,7 @@ private:
while (pit != peerList.end ())
{
Peer::pointer pr = pit->lock ();
Peer::ptr pr = pit->lock ();
if (!pr)
{
@@ -1100,14 +1102,14 @@ private:
: acquire(acq)
{ }
return_type operator() (Peer::ref peer) const
return_type operator() (Peer::ptr const& peer) const
{
if (peer->hasTxSet (acquire->getHash ()))
acquire->peerHas (peer);
}
};
getApp().getPeers ().foreach (build_acquire_list (acquire));
getApp().overlay ().foreach (build_acquire_list (acquire));
acquire->setTimer ();
}
@@ -1201,8 +1203,8 @@ private:
msg.set_rawtransaction (& (tx.front ()), tx.size ());
msg.set_status (protocol::tsNEW);
msg.set_receivetimestamp (getApp().getOPs ().getNetworkTimeNC ());
getApp ().getPeers ().foreach (send_always (
boost::make_shared<PackedMessage> (
getApp ().overlay ().foreach (send_always (
boost::make_shared<Message> (
msg, protocol::mtTRANSACTION)));
}
}
@@ -1240,8 +1242,8 @@ private:
Blob sig = mOurPosition->sign ();
prop.set_nodepubkey (&pubKey[0], pubKey.size ());
prop.set_signature (&sig[0], sig.size ());
getApp ().getPeers ().foreach (send_always (
boost::make_shared<PackedMessage> (
getApp ().overlay ().foreach (send_always (
boost::make_shared<Message> (
prop, protocol::mtPROPOSE_LEDGER)));
}
@@ -1253,8 +1255,8 @@ private:
protocol::TMHaveTransactionSet msg;
msg.set_hash (hash.begin (), 256 / 8);
msg.set_status (direct ? protocol::tsHAVE : protocol::tsCAN_GET);
getApp ().getPeers ().foreach (send_always (
boost::make_shared <PackedMessage> (
getApp ().overlay ().foreach (send_always (
boost::make_shared <Message> (
msg, protocol::mtHAVE_SET)));
}
@@ -1458,8 +1460,8 @@ private:
}
s.set_firstseq (uMin);
s.set_lastseq (uMax);
getApp ().getPeers ().foreach (send_always (
boost::make_shared <PackedMessage> (
getApp ().overlay ().foreach (send_always (
boost::make_shared <Message> (
s, protocol::mtSTATUS_CHANGE)));
WriteLog (lsTRACE, LedgerConsensus) << "send status change to peer";
}
@@ -1753,8 +1755,8 @@ private:
closetime
nodepubkey
signature
getApp ().getPeers ().foreach (send_if_not (
boost::make_shared<PackedMessage> (
getApp ().overlay ().foreach (send_if_not (
boost::make_shared<Message> (
set, protocol::mtPROPOSE_LEDGER),
peer_in_set(peers)));
}
@@ -1819,8 +1821,8 @@ private:
protocol::TMValidation val;
val.set_validation (&validation[0], validation.size ());
#if 0
getApp ().getPeers ().visit (RelayMessage (
boost::make_shared <PackedMessage> (
getApp ().overlay ().visit (RelayMessage (
boost::make_shared <Message> (
val, protocol::mtVALIDATION)));
#endif
getApp().getOPs ().setLastValidation (v);

View File

@@ -71,10 +71,10 @@ public:
virtual bool peerPosition (LedgerProposal::ref) = 0;
virtual bool peerHasSet (Peer::ref peer, uint256 const & set,
virtual bool peerHasSet (Peer::ptr const& peer, uint256 const & set,
protocol::TxSetStatus status) = 0;
virtual SHAMapAddNode peerGaveNodes (Peer::ref peer,
virtual SHAMapAddNode peerGaveNodes (Peer::ptr const& peer,
uint256 const & setHash,
const std::list<SHAMapNode>& nodeIDs,
const std::list< Blob >& nodeData) = 0;

View File

@@ -17,6 +17,8 @@
*/
//==============================================================================
#include "../../ripple_overlay/api/Overlay.h"
namespace ripple {
//SETUP_LOG (InboundLedger)
@@ -90,7 +92,7 @@ void InboundLedger::init (ScopedLockType& collectionLock)
// For historical nodes, wait a bit since a
// fetch pack is probably coming
if (mReason != fcHISTORY)
trigger (Peer::pointer ());
trigger (Peer::ptr ());
}
else if (!isFailed ())
{
@@ -261,7 +263,7 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&)
"No progress(" << pc <<
") for ledger " << mHash;
trigger (Peer::pointer ());
trigger (Peer::ptr ());
if (pc < 4)
addPeers ();
}
@@ -270,7 +272,7 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&)
/** Add more peers to the set, if possible */
void InboundLedger::addPeers ()
{
Peers::PeerSequence peerList = getApp().getPeers ().getActivePeers ();
Overlay::PeerSequence peerList = getApp().overlay ().getActivePeers ();
int vSize = peerList.size ();
@@ -297,7 +299,7 @@ void InboundLedger::addPeers ()
// First look for peers that are likely to have this ledger
for (int i = 0; i < vSize; ++i)
{
Peer::ref peer = peerList[ (i + firstPeer) % vSize];
Peer::ptr const& peer = peerList[ (i + firstPeer) % vSize];
if (peer->hasLedger (getHash (), mSeq))
{
@@ -406,7 +408,7 @@ bool InboundLedger::addOnComplete (
/** Request more nodes, perhaps from a specific peer
*/
void InboundLedger::trigger (Peer::ref peer)
void InboundLedger::trigger (Peer::ptr const& peer)
{
ScopedLockType sl (mLock);
@@ -484,7 +486,7 @@ void InboundLedger::trigger (Peer::ref peer)
}
}
PackedMessage::pointer packet (boost::make_shared <PackedMessage> (
Message::pointer packet (boost::make_shared <Message> (
tmBH, protocol::mtGET_OBJECTS));
{
ScopedLockType sl (mLock);
@@ -492,8 +494,8 @@ void InboundLedger::trigger (Peer::ref peer)
for (PeerSetMap::iterator it = mPeers.begin (), end = mPeers.end ();
it != end; ++it)
{
Peer::pointer iPeer (
getApp().getPeers ().findPeerByShortID (it->first));
Peer::ptr iPeer (
getApp().overlay ().findPeerByShortID (it->first));
if (iPeer)
{
@@ -1191,7 +1193,7 @@ void InboundLedger::runData ()
// breaking ties in favor of the peer that responded first.
BOOST_FOREACH (PeerDataPairType& entry, data)
{
Peer::pointer peer = entry.first.lock();
Peer::ptr peer = entry.first.lock();
if (peer)
{
int count = processData (peer, *(entry.second));

View File

@@ -82,7 +82,7 @@ public:
// VFALCO TODO Make this the Listener / Observer pattern
bool addOnComplete (std::function<void (InboundLedger::pointer)>);
void trigger (Peer::ref);
void trigger (Peer::ptr const&);
bool tryLocal ();
void addPeers ();
bool checkLocal ();
@@ -106,7 +106,7 @@ private:
void onTimer (bool progress, ScopedLockType& peerSetLock);
void newPeer (Peer::ref peer)
void newPeer (Peer::ptr const& peer)
{
trigger (peer);
}

View File

@@ -512,11 +512,11 @@ public:
*/
void getFetchPack (Ledger::ref nextLedger)
{
Peer::pointer target;
Peer::ptr target;
int count = 0;
Peers::PeerSequence peerList = getApp().getPeers ().getActivePeers ();
BOOST_FOREACH (const Peer::pointer & peer, peerList)
Overlay::PeerSequence peerList = getApp().overlay ().getActivePeers ();
BOOST_FOREACH (const Peer::ptr & peer, peerList)
{
if (peer->hasRange (nextLedger->getLedgerSeq() - 1, nextLedger->getLedgerSeq()))
{
@@ -533,7 +533,7 @@ public:
tmBH.set_query (true);
tmBH.set_type (protocol::TMGetObjectByHash::otFETCH_PACK);
tmBH.set_ledgerhash (nextLedger->getHash().begin (), 32);
PackedMessage::pointer packet = boost::make_shared<PackedMessage> (tmBH, protocol::mtGET_OBJECTS);
Message::pointer packet = boost::make_shared<Message> (tmBH, protocol::mtGET_OBJECTS);
target->sendPacket (packet, false);
WriteLog (lsTRACE, LedgerMaster) << "Requested fetch pack for " << nextLedger->getLedgerSeq() - 1;

View File

@@ -19,6 +19,7 @@
#include "../ripple/common/seconds_clock.h"
#include "../ripple_rpc/api/Manager.h"
#include "../ripple_overlay/api/make_Overlay.h"
#include "Tuning.h"
@@ -153,7 +154,7 @@ public:
std::unique_ptr <beast::asio::SSLContext> m_peerSSLContext;
std::unique_ptr <beast::asio::SSLContext> m_wsSSLContext;
std::unique_ptr <Peers> m_peers;
std::unique_ptr <Overlay> m_peers;
std::unique_ptr <RPCDoor> m_rpcDoor;
std::unique_ptr <WSDoor> m_wsPublicDoor;
std::unique_ptr <WSDoor> m_wsPrivateDoor;
@@ -482,7 +483,7 @@ public:
return *mProofOfWorkFactory;
}
Peers& getPeers ()
Overlay& overlay ()
{
return *m_peers;
}
@@ -682,13 +683,15 @@ public:
}
// VFALCO NOTE Unfortunately, in stand-alone mode some code still
// foolishly calls getPeers(). When this is fixed we can
// foolishly calls overlay(). When this is fixed we can
// move the instantiation inside a conditional:
//
// if (!getConfig ().RUN_STANDALONE)
m_peers.reset (add (Peers::New (m_mainIoPool, *m_resourceManager,
m_peers = make_Overlay (m_mainIoPool, *m_resourceManager,
*m_siteFiles, getConfig ().getModuleDatabasePath (),
*m_resolver, m_mainIoPool, m_peerSSLContext->get ())));
*m_resolver, m_mainIoPool, m_peerSSLContext->get ());
// add to Stoppable
add (*m_peers);
// SSL context used for WebSocket connections.
if (getConfig ().WEBSOCKET_SECURE)

View File

@@ -36,7 +36,7 @@ class IFeatures;
class IFeeVote;
class IHashRouter;
class LoadFeeTrack;
class Peers;
class Overlay;
class UniqueNodeList;
class JobQueue;
class InboundLedgers;
@@ -94,7 +94,7 @@ public:
virtual IHashRouter& getHashRouter () = 0;
virtual LoadFeeTrack& getFeeTrack () = 0;
virtual LoadManager& getLoadManager () = 0;
virtual Peers& getPeers () = 0;
virtual Overlay& overlay () = 0;
virtual ProofOfWorkFactory& getProofOfWorkFactory () = 0;
virtual UniqueNodeList& getUNL () = 0;
virtual Validations& getValidations () = 0;

View File

@@ -17,6 +17,8 @@
*/
//==============================================================================
#include "../../ripple_overlay/api/predicates.h"
#include "../../beast/modules/beast_core/thread/DeadlineTimer.h"
#include "../../beast/modules/beast_core/system/SystemStats.h"
@@ -245,7 +247,7 @@ public:
// VFALCO TODO Try to make all these private since they seem to be...private
//
void switchLastClosedLedger (Ledger::pointer newLedger, bool duringConsensus); // Used for the "jump" case
bool checkLastClosedLedger (const Peers::PeerSequence&, uint256& networkClosed);
bool checkLastClosedLedger (const Overlay::PeerSequence&, uint256& networkClosed);
int beginConsensus (uint256 const& networkClosed, Ledger::pointer closingLedger);
void tryStartConsensus ();
void endConsensus (bool correctLCL);
@@ -539,7 +541,7 @@ void NetworkOPsImp::processHeartbeatTimer ()
LoadManager& mgr (app.getLoadManager ());
mgr.resetDeadlockDetector ();
std::size_t const numPeers = getApp().getPeers ().size ();
std::size_t const numPeers = getApp().overlay ().size ();
// do we have sufficient peers? If not, we are disconnected.
if (numPeers < getConfig ().NETWORK_QUORUM)
@@ -614,8 +616,8 @@ void NetworkOPsImp::processClusterTimer ()
node.set_name (to_string (item.address));
node.set_cost (item.balance);
}
getApp ().getPeers ().foreach (send_if (
boost::make_shared<PackedMessage>(cluster, protocol::mtCLUSTER),
getApp ().overlay ().foreach (send_if (
boost::make_shared<Message>(cluster, protocol::mtCLUSTER),
peer_in_cluster ()));
setClusterTimer ();
}
@@ -889,8 +891,8 @@ void NetworkOPsImp::runTransactionQueue ()
tx.set_rawtransaction (&s.getData ().front (), s.getLength ());
tx.set_status (protocol::tsCURRENT);
tx.set_receivetimestamp (getNetworkTimeNC ()); // FIXME: This should be when we received it
getApp ().getPeers ().foreach (send_if_not (
boost::make_shared<PackedMessage> (tx, protocol::mtTRANSACTION),
getApp ().overlay ().foreach (send_if_not (
boost::make_shared<Message> (tx, protocol::mtTRANSACTION),
peer_in_set(peers)));
}
else
@@ -1020,8 +1022,8 @@ Transaction::pointer NetworkOPsImp::processTransactionCb (
tx.set_rawtransaction (&s.getData ().front (), s.getLength ());
tx.set_status (protocol::tsCURRENT);
tx.set_receivetimestamp (getNetworkTimeNC ()); // FIXME: This should be when we received it
getApp ().getPeers ().foreach (send_if_not (
boost::make_shared<PackedMessage> (tx, protocol::mtTRANSACTION),
getApp ().overlay ().foreach (send_if_not (
boost::make_shared<Message> (tx, protocol::mtTRANSACTION),
peer_in_set(peers)));
}
}
@@ -1221,7 +1223,7 @@ public:
void NetworkOPsImp::tryStartConsensus ()
{
uint256 networkClosed;
bool ledgerChange = checkLastClosedLedger (getApp().getPeers ().getActivePeers (), networkClosed);
bool ledgerChange = checkLastClosedLedger (getApp().overlay ().getActivePeers (), networkClosed);
if (networkClosed.isZero ())
return;
@@ -1253,7 +1255,7 @@ void NetworkOPsImp::tryStartConsensus ()
beginConsensus (networkClosed, m_ledgerMaster.getCurrentLedger ());
}
bool NetworkOPsImp::checkLastClosedLedger (const Peers::PeerSequence& peerList, uint256& networkClosed)
bool NetworkOPsImp::checkLastClosedLedger (const Overlay::PeerSequence& peerList, uint256& networkClosed)
{
// Returns true if there's an *abnormal* ledger issue, normal changing in TRACKING mode should return false
// Do we have sufficient validations for our last closed ledger? Or do sufficient nodes
@@ -1298,7 +1300,7 @@ bool NetworkOPsImp::checkLastClosedLedger (const Peers::PeerSequence& peerList,
ourVC.highNodeUsing = ourAddress;
}
BOOST_FOREACH (Peer::ref peer, peerList)
BOOST_FOREACH (Peer::ptr const& peer, peerList)
{
uint256 peerLedger = peer->getClosedLedgerHash ();
@@ -1430,8 +1432,8 @@ void NetworkOPsImp::switchLastClosedLedger (Ledger::pointer newLedger, bool duri
hash = newLedger->getHash ();
s.set_ledgerhash (hash.begin (), hash.size ());
getApp ().getPeers ().foreach (send_always (
boost::make_shared<PackedMessage> (s, protocol::mtSTATUS_CHANGE)));
getApp ().overlay ().foreach (send_always (
boost::make_shared<Message> (s, protocol::mtSTATUS_CHANGE)));
}
int NetworkOPsImp::beginConsensus (uint256 const& networkClosed, Ledger::pointer closingLedger)
@@ -1481,7 +1483,7 @@ bool NetworkOPsImp::haveConsensusObject ()
{
// we need to get into the consensus process
uint256 networkClosed;
Peers::PeerSequence peerList = getApp().getPeers ().getActivePeers ();
Overlay::PeerSequence peerList = getApp().overlay ().getActivePeers ();
bool ledgerChange = checkLastClosedLedger (peerList, networkClosed);
if (!ledgerChange)
@@ -1549,8 +1551,8 @@ void NetworkOPsImp::processTrustedProposal (LedgerProposal::pointer proposal,
if (getApp().getHashRouter ().swapSet (
proposal->getSuppressionID (), peers, SF_RELAYED))
{
getApp ().getPeers ().foreach (send_if_not (
boost::make_shared<PackedMessage> (*set, protocol::mtPROPOSE_LEDGER),
getApp ().overlay ().foreach (send_if_not (
boost::make_shared<Message> (*set, protocol::mtPROPOSE_LEDGER),
peer_in_set(peers)));
}
}
@@ -1640,9 +1642,9 @@ void NetworkOPsImp::endConsensus (bool correctLCL)
{
uint256 deadLedger = m_ledgerMaster.getClosedLedger ()->getParentHash ();
std::vector <Peer::pointer> peerList = getApp().getPeers ().getActivePeers ();
std::vector <Peer::ptr> peerList = getApp().overlay ().getActivePeers ();
BOOST_FOREACH (Peer::ref it, peerList)
BOOST_FOREACH (Peer::ptr const& it, peerList)
{
if (it && (it->getClosedLedgerHash () == deadLedger))
{
@@ -2203,7 +2205,7 @@ Json::Value NetworkOPsImp::getServerInfo (bool human, bool admin)
if (fp != 0)
info["fetch_pack"] = Json::UInt (fp);
info["peers"] = Json::UInt (getApp ().getPeers ().size ());
info["peers"] = Json::UInt (getApp ().overlay ().size ());
Json::Value lastClose = Json::objectValue;
lastClose["proposers"] = getApp().getOPs ().getPreviousProposers ();
@@ -3193,7 +3195,7 @@ void NetworkOPsImp::makeFetchPack (Job&, boost::weak_ptr<Peer> wPeer,
try
{
Peer::pointer peer = wPeer.lock ();
Peer::ptr peer = wPeer.lock ();
if (!peer)
return;
@@ -3236,7 +3238,7 @@ void NetworkOPsImp::makeFetchPack (Job&, boost::weak_ptr<Peer> wPeer,
while (wantLedger && (UptimeTimer::getInstance ().getElapsedSeconds () <= (uUptime + 1)));
m_journal.info << "Built fetch pack with " << reply.objects ().size () << " nodes";
PackedMessage::pointer msg = boost::make_shared<PackedMessage> (reply, protocol::mtGET_OBJECTS);
Message::pointer msg = boost::make_shared<Message> (reply, protocol::mtGET_OBJECTS);
peer->sendPacket (msg, false);
}
catch (...)

View File

@@ -17,6 +17,8 @@
*/
//==============================================================================
#include"../../ripple_overlay/api/Overlay.h"
namespace ripple {
class InboundLedger;
@@ -50,7 +52,7 @@ PeerSet::~PeerSet ()
{
}
bool PeerSet::peerHas (Peer::ref ptr)
bool PeerSet::peerHas (Peer::ptr const& ptr)
{
ScopedLockType sl (mLock);
@@ -61,7 +63,7 @@ bool PeerSet::peerHas (Peer::ref ptr)
return true;
}
void PeerSet::badPeer (Peer::ref ptr)
void PeerSet::badPeer (Peer::ptr const& ptr)
{
ScopedLockType sl (mLock);
mPeers.erase (ptr->getShortId ());
@@ -141,12 +143,12 @@ bool PeerSet::isActive ()
return !isDone ();
}
void PeerSet::sendRequest (const protocol::TMGetLedger& tmGL, Peer::ref peer)
void PeerSet::sendRequest (const protocol::TMGetLedger& tmGL, Peer::ptr const& peer)
{
if (!peer)
sendRequest (tmGL);
else
peer->sendPacket (boost::make_shared<PackedMessage> (tmGL, protocol::mtGET_LEDGER), false);
peer->sendPacket (boost::make_shared<Message> (tmGL, protocol::mtGET_LEDGER), false);
}
void PeerSet::sendRequest (const protocol::TMGetLedger& tmGL)
@@ -156,12 +158,12 @@ void PeerSet::sendRequest (const protocol::TMGetLedger& tmGL)
if (mPeers.empty ())
return;
PackedMessage::pointer packet (
boost::make_shared<PackedMessage> (tmGL, protocol::mtGET_LEDGER));
Message::pointer packet (
boost::make_shared<Message> (tmGL, protocol::mtGET_LEDGER));
for (auto const& p : mPeers)
{
Peer::pointer peer (getApp().getPeers ().findPeerByShortID (p.first));
Peer::ptr peer (getApp().overlay ().findPeerByShortID (p.first));
if (peer)
peer->sendPacket (packet, false);
@@ -188,7 +190,7 @@ std::size_t PeerSet::getPeerCount () const
for (auto const& p : mPeers)
{
if (getApp ().getPeers ().findPeerByShortID (p.first))
if (getApp ().overlay ().findPeerByShortID (p.first))
++ret;
}

View File

@@ -20,6 +20,8 @@
#ifndef RIPPLE_PEERSET_H
#define RIPPLE_PEERSET_H
#include "../../ripple_overlay/api/Peer.h"
namespace ripple {
/** A set of peers used to acquire data.
@@ -81,15 +83,15 @@ public:
// VFALCO TODO Rename this to addPeerToSet
//
bool peerHas (Peer::ref);
bool peerHas (Peer::ptr const&);
// VFALCO Workaround for MSVC std::function which doesn't swallow return types.
void peerHasVoid (Peer::ref peer)
void peerHasVoid (Peer::ptr const& peer)
{
peerHas (peer);
}
void badPeer (Peer::ref);
void badPeer (Peer::ptr const&);
void setTimer ();
@@ -113,7 +115,7 @@ protected:
clock_type& clock, beast::Journal journal);
virtual ~PeerSet () = 0;
virtual void newPeer (Peer::ref) = 0;
virtual void newPeer (Peer::ptr const&) = 0;
virtual void onTimer (bool progress, ScopedLockType&) = 0;
virtual boost::weak_ptr<PeerSet> pmDowncast () = 0;
@@ -128,7 +130,7 @@ protected:
void invokeOnTimer ();
void sendRequest (const protocol::TMGetLedger& message);
void sendRequest (const protocol::TMGetLedger& message, Peer::ref peer);
void sendRequest (const protocol::TMGetLedger& message, Peer::ptr const& peer);
protected:
beast::Journal m_journal;

View File

@@ -33,7 +33,6 @@
#include <boost/array.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/streambuf.hpp>
//#include <boost/iostreams/concepts.hpp>
#include <boost/iostreams/stream.hpp>
#include <boost/mem_fn.hpp>
#include <boost/pointer_cast.hpp>
@@ -53,7 +52,7 @@
#include "../ripple/common/ResolverAsio.h"
//#include "../beast/modules/beast_sqdb/beast_sqdb.h"
// VFALCO TODO Remove this include
#include "../beast/modules/beast_sqlite/beast_sqlite.h"
// Order matters here. If you get compile errors,
@@ -62,8 +61,6 @@
#include "../../ripple/common/KeyCache.h"
#include "../../ripple/common/TaggedCache.h"
#include "../../ripple_overlay/ripple_overlay.h"
#include "data/Database.h"
#include "data/DatabaseCon.h"
#include "data/SqliteDatabase.h"

View File

@@ -17,6 +17,8 @@
*/
//==============================================================================
#include "../../ripple_overlay/api/Overlay.h"
#include "../../beast/beast/unit_test/suite.h"
namespace ripple {
@@ -863,7 +865,7 @@ Json::Value RPCHandler::doConnect (Json::Value params,
beast::IP::Endpoint ip (beast::IP::Endpoint::from_string(params["ip"].asString ()));
if (! is_unspecified (ip))
getApp().getPeers ().connect (ip.at_port(iPort));
getApp().overlay ().connect (ip.at_port(iPort));
return "connecting";
}
@@ -934,7 +936,7 @@ Json::Value RPCHandler::doPeers (Json::Value, Resource::Charge& loadType, Applic
{
Json::Value jvResult (Json::objectValue);
jvResult["peers"] = getApp().getPeers ().json ();
jvResult["peers"] = getApp().overlay ().json ();
getApp().getUNL().addClusterStatus(jvResult);

View File

@@ -17,6 +17,8 @@
*/
//==============================================================================
#include "../../ripple_overlay/api/Overlay.h"
namespace ripple {
//SETUP_LOG (TransactionAcquire)
@@ -110,8 +112,8 @@ void TransactionAcquire::onTimer (bool progress, ScopedLockType& psl)
WriteLog (lsWARNING, TransactionAcquire) << "Out of peers for TX set " << getHash ();
bool found = false;
Peers::PeerSequence peerList = getApp().getPeers ().getActivePeers ();
BOOST_FOREACH (Peer::ref peer, peerList)
Overlay::PeerSequence peerList = getApp().overlay ().getActivePeers ();
BOOST_FOREACH (Peer::ptr const& peer, peerList)
{
if (peer->hasTxSet (getHash ()))
{
@@ -122,12 +124,12 @@ void TransactionAcquire::onTimer (bool progress, ScopedLockType& psl)
if (!found)
{
BOOST_FOREACH (Peer::ref peer, peerList)
BOOST_FOREACH (Peer::ptr const& peer, peerList)
peerHas (peer);
}
}
else if (!progress)
trigger (Peer::pointer ());
trigger (Peer::ptr ());
}
boost::weak_ptr<PeerSet> TransactionAcquire::pmDowncast ()
@@ -135,7 +137,7 @@ boost::weak_ptr<PeerSet> TransactionAcquire::pmDowncast ()
return boost::dynamic_pointer_cast<PeerSet> (shared_from_this ());
}
void TransactionAcquire::trigger (Peer::ref peer)
void TransactionAcquire::trigger (Peer::ptr const& peer)
{
if (mComplete)
{
@@ -196,7 +198,7 @@ void TransactionAcquire::trigger (Peer::ref peer)
}
SHAMapAddNode TransactionAcquire::takeNodes (const std::list<SHAMapNode>& nodeIDs,
const std::list< Blob >& data, Peer::ref peer)
const std::list< Blob >& data, Peer::ptr const& peer)
{
if (mComplete)
{

View File

@@ -44,20 +44,20 @@ public:
}
SHAMapAddNode takeNodes (const std::list<SHAMapNode>& IDs,
const std::list< Blob >& data, Peer::ref);
const std::list< Blob >& data, Peer::ptr const&);
private:
SHAMap::pointer mMap;
bool mHaveRoot;
void onTimer (bool progress, ScopedLockType& peerSetLock);
void newPeer (Peer::ref peer)
void newPeer (Peer::ptr const& peer)
{
trigger (peer);
}
void done ();
void trigger (Peer::ref);
void trigger (Peer::ptr const&);
boost::weak_ptr<PeerSet> pmDowncast ();
};

View File

@@ -61,14 +61,6 @@ public:
keyBytes = 32,
};
/** The type used to hold the hash.
The hahes are fixed size, SHA256.
@note The key size can be retrieved with `Hash::sizeInBytes`
*/
typedef beast::UnsignedInteger <32> Hash;
// Please use this one. For a reference use Ptr const&
typedef boost::shared_ptr <NodeObject> Ptr;

View File

@@ -32,6 +32,8 @@
#ifndef RIPPLE_BASE58DATA_H
#define RIPPLE_BASE58DATA_H
#include ".././ripple/types/api/Base58.h"
namespace ripple {
class CBase58Data

View File

@@ -20,6 +20,8 @@
#ifndef RIPPLE_PROTOCOL_H
#define RIPPLE_PROTOCOL_H
#include <cstdint>
namespace ripple {
/** Protocol specific constants, types, and data.

View File

@@ -20,8 +20,20 @@
#ifndef RIPPLE_RIPPLEADDRESS_H
#define RIPPLE_RIPPLEADDRESS_H
#include "../crypto/Base58Data.h"
#include "../ripple/types/api/UInt160.h"
#include "../ripple/types/api/RippleAccountID.h"
#include "../ripple/types/api/RippleAccountPrivateKey.h"
#include "../ripple/types/api/RippleAccountPublicKey.h"
#include "../ripple/types/api/RipplePrivateKey.h"
#include "../ripple/types/api/RipplePublicKey.h"
#include "../ripple/types/api/RipplePublicKeyHash.h"
#include "../ripple/sslutil/api/ECDSACanonical.h"
struct bignum_st;
typedef struct bignum_st BIGNUM;
namespace ripple {
//

View File

@@ -226,7 +226,7 @@ message TMGetContacts
message TMGetPeers
{
required uint32 doWeNeedThis = 1; // yes since you are asserting that the packet size isn't 0 in PackedMessage
required uint32 doWeNeedThis = 1; // yes since you are asserting that the packet size isn't 0 in Message
}
message TMIPv4Endpoint

View File

@@ -1,4 +1,16 @@
# Overlay
A module which manages peer connections that operate the _Ripple
peer protocol_.
## Introduction
The _Ripple payment network_ consists of a collection of _peers_ running the
**rippled software**. Each peer maintains multiple outgoing connections and
optional incoming connections to other peers. These connections are made over
both the public Internet and private local area networks. This network defines
a fully connected directed graph of nodes. Peers send and receive messages to
other connected peers. This peer to peer network, layered on top of the public
and private Internet, forms an [_overlay network_][overlay_network]. The
contents of the messages and the behavior of peers in response to the messages,
plus the information exchanged during the handshaking phase of connection
establishment, defines the _Peer Protocol_.
[overlay_network]: http://en.wikipedia.org/wiki/Overlay_network

View File

@@ -17,8 +17,8 @@
*/
//==============================================================================
#ifndef RIPPLE_OVERLAY_PACKEDMESSAGE_H_INCLUDED
#define RIPPLE_OVERLAY_PACKEDMESSAGE_H_INCLUDED
#ifndef RIPPLE_OVERLAY_MESSAGE_H_INCLUDED
#define RIPPLE_OVERLAY_MESSAGE_H_INCLUDED
#include "ripple.pb.h"
@@ -28,7 +28,7 @@
namespace ripple {
// VFALCO NOTE If we forward declare PackedMessage and write out shared_ptr
// VFALCO NOTE If we forward declare Message and write out shared_ptr
// instead of using the in-class typedef, we can remove the entire
// ripple.pb.h from the main headers.
//
@@ -36,42 +36,37 @@ namespace ripple {
// packaging of messages into length/type-prepended buffers
// ready for transmission.
//
// PackedMessage implements simple "packing" of protocol buffers Messages into
// Message implements simple "packing" of protocol buffers Messages into
// a string prepended by a header specifying the message length.
// MessageType should be a Message class generated by the protobuf compiler.
//
class PackedMessage : public boost::enable_shared_from_this <PackedMessage>
class Message : public boost::enable_shared_from_this <Message>
{
public:
typedef boost::shared_ptr< ::google::protobuf::Message > MessagePointer;
typedef boost::shared_ptr<PackedMessage> pointer;
typedef boost::shared_ptr<Message> pointer;
public:
/** Number of bytes in a message header.
*/
static size_t const kHeaderBytes = 6;
PackedMessage (::google::protobuf::Message const& message, int type);
Message (::google::protobuf::Message const& message, int type);
/** Retrieve the packed message data.
*/
// VFALCO TODO shouldn't this be const?
std::vector <uint8_t>& getBuffer ()
/** Retrieve the packed message data. */
std::vector <uint8_t> const&
getBuffer () const
{
return mBuffer;
}
/** Determine bytewise equality.
*/
bool operator == (PackedMessage const& other) const;
/** Determine bytewise equality. */
bool operator == (Message const& other) const;
/** Calculate the length of a packed message.
*/
/** Calculate the length of a packed message. */
static unsigned getLength (std::vector <uint8_t> const& buf);
/** Determine the type of a packed message.
*/
/** Determine the type of a packed message. */
static int getType (std::vector <uint8_t> const& buf);
private:

View File

@@ -0,0 +1,131 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_OVERLAY_OVERLAY_H_INCLUDED
#define RIPPLE_OVERLAY_OVERLAY_H_INCLUDED
#include "Peer.h"
// VFALCO TODO Remove this include dependency it shouldn't be needed
#include "../../ripple/peerfinder/api/Slot.h"
#include "../../beast/beast/threads/Stoppable.h"
#include "../../beast/beast/utility/PropertyStream.h"
#include "../../beast/beast/cxx14/type_traits.h" // <type_traits>
namespace ripple {
/** Manages the set of connected peers. */
class Overlay
: public beast::Stoppable
, public beast::PropertyStream::Source
{
protected:
// VFALCO NOTE The requirement of this constructor is an
// unfortunate problem with the API for
// Stoppable and PropertyStream
//
Overlay (Stoppable& parent)
: Stoppable ("Overlay", parent)
, beast::PropertyStream::Source ("peers")
{
}
public:
typedef std::vector <Peer::ptr> PeerSequence;
virtual ~Overlay () = default;
// VFALCO NOTE These should be a private API
/** @{ */
virtual void remove (PeerFinder::Slot::ptr const& slot) = 0;
/** @} */
virtual void connect (beast::IP::Endpoint const& address) = 0;
// Notification that a peer has connected.
virtual void onPeerActivated (Peer::ptr const& peer) = 0;
// Notification that a peer has disconnected.
virtual void onPeerDisconnect (Peer::ptr const& peer) = 0;
virtual std::size_t size () = 0;
virtual Json::Value json () = 0;
virtual PeerSequence getActivePeers () = 0;
// Peer 64-bit ID function
virtual Peer::ptr findPeerByShortID (Peer::ShortId const& id) = 0;
/** Visit every active peer and return a value
The functor must:
- Be callable as:
void operator()(Peer::ptr const& peer);
- Must have the following typedef:
typedef void return_type;
- Be callable as:
Function::return_type operator()() const;
@param f the functor to call with every peer
@returns `f()`
@note The functor is passed by value!
*/
template<typename Function>
std::enable_if_t <
! std::is_void <typename Function::return_type>::value,
typename Function::return_type
>
foreach(Function f)
{
PeerSequence peers (getActivePeers());
for(PeerSequence::const_iterator i = peers.begin(); i != peers.end(); ++i)
f (*i);
return f();
}
/** Visit every active peer
The visitor functor must:
- Be callable as:
void operator()(Peer::ptr const& peer);
- Must have the following typedef:
typedef void return_type;
@param f the functor to call with every peer
*/
template <class Function>
std::enable_if_t <
std::is_void <typename Function::return_type>::value,
typename Function::return_type
>
foreach(Function f)
{
PeerSequence peers (getActivePeers());
for(PeerSequence::const_iterator i = peers.begin(); i != peers.end(); ++i)
f (*i);
}
};
}
#endif

View File

@@ -20,81 +20,60 @@
#ifndef RIPPLE_OVERLAY_PEER_H_INCLUDED
#define RIPPLE_OVERLAY_PEER_H_INCLUDED
#include "../../beast/beast/net/IPEndpoint.h"
#include "Message.h"
#include <boost/asio.hpp>
#include "../../ripple/json/ripple_json.h"
#include "../../ripple/types/api/UInt256.h"
#include "../../ripple_data/protocol/RippleAddress.h"
#include "../../beast/beast/net/IPEndpoint.h"
namespace ripple {
typedef boost::asio::ip::tcp::socket NativeSocketType;
namespace Resource {
class Charge;
class Manager;
}
namespace PeerFinder {
class Manager;
}
class Peers;
/** Represents a peer connection in the overlay.
*/
class Peer : private beast::LeakChecked <Peer>
/** Represents a peer connection in the overlay. */
class Peer
{
public:
typedef boost::shared_ptr <Peer> Ptr;
// DEPRECATED typedefs.
typedef boost::shared_ptr <Peer> pointer;
typedef pointer const& ref;
/** Uniquely identifies a particular connection of a peer
This works upto a restart of rippled.
*/
typedef boost::shared_ptr <Peer> ptr;
/** Uniquely identifies a particular connection of a peer. */
typedef std::uint32_t ShortId;
virtual void sendPacket (const PackedMessage::pointer& packet, bool onStrand) = 0;
/** Adjust this peer's load balance based on the type of load imposed.
@note Formerly named punishPeer
*/
virtual void charge (Resource::Charge const& fee) = 0;
static void charge (boost::weak_ptr <Peer>& peer, Resource::Charge const& fee);
virtual Json::Value json () = 0;
virtual bool isInCluster () const = 0;
virtual std::string getClusterNodeName() const = 0;
virtual uint256 const& getClosedLedgerHash () const = 0;
virtual bool hasLedger (uint256 const& hash, std::uint32_t seq) const = 0;
virtual void getLedger (protocol::TMGetLedger &) = 0;
virtual void ledgerRange (std::uint32_t& minSeq, std::uint32_t& maxSeq) const = 0;
virtual bool hasTxSet (uint256 const& hash) const = 0;
virtual void setShortId(Peer::ShortId shortId) = 0;
virtual ShortId getShortId () const = 0;
virtual const RippleAddress& getNodePublic () const = 0;
virtual void cycleStatus () = 0;
virtual bool supportsVersion (int version) = 0;
virtual bool hasRange (std::uint32_t uMin, std::uint32_t uMax) = 0;
//
// Network
//
virtual void sendPacket (const Message::pointer& packet, bool onStrand) = 0;
virtual beast::IP::Endpoint getRemoteAddress() const = 0;
virtual NativeSocketType& getNativeSocket () = 0;
/** Adjust this peer's load balance based on the type of load imposed. */
virtual void charge (Resource::Charge const& fee) = 0;
//
// Identity
//
virtual ShortId getShortId () const = 0;
virtual const RippleAddress& getNodePublic () const = 0;
virtual Json::Value json () = 0;
virtual bool isInCluster () const = 0;
virtual std::string getClusterNodeName() const = 0;
//
// Ledger
//
virtual uint256 const& getClosedLedgerHash () const = 0;
virtual bool hasLedger (uint256 const& hash, std::uint32_t seq) const = 0;
virtual void ledgerRange (std::uint32_t& minSeq, std::uint32_t& maxSeq) const = 0;
virtual bool hasTxSet (uint256 const& hash) const = 0;
virtual void cycleStatus () = 0;
virtual bool supportsVersion (int version) = 0;
virtual bool hasRange (std::uint32_t uMin, std::uint32_t uMax) = 0;
};
}

View File

@@ -1,300 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_PEERS_H_INCLUDED
#define RIPPLE_PEERS_H_INCLUDED
// VFALCO TODO Remove this include dependency it shouldn't be needed
#include "../../ripple/peerfinder/api/Slot.h"
#include "../../ripple/common/Resolver.h"
#include "../../beast/beast/threads/Stoppable.h"
#include "../../beast/modules/beast_core/files/File.h"
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <set>
namespace ripple {
namespace PeerFinder {
struct Endpoint;
class Manager;
}
namespace Resource {
class Manager;
}
namespace SiteFiles {
class Manager;
}
//------------------------------------------------------------------------------
/** Manages the set of connected peers. */
class Peers
: public beast::Stoppable
, public beast::PropertyStream::Source
{
protected:
// VFALCO NOTE The requirement of this constructor is an
// unfortunate problem with the API for
// Stoppable and PropertyStream
//
Peers (Stoppable& parent)
: Stoppable ("Peers", parent)
, beast::PropertyStream::Source ("peers")
{
}
public:
typedef std::vector <Peer::pointer> PeerSequence;
static Peers* New (
Stoppable& parent,
Resource::Manager& resourceManager,
SiteFiles::Manager& siteFiles,
beast::File const& pathToDbFileOrDirectory,
Resolver& resolver,
boost::asio::io_service& io_service,
boost::asio::ssl::context& context);
virtual ~Peers () = 0;
// VFALCO NOTE These should be a private API
/** @{ */
virtual void remove (PeerFinder::Slot::ptr const& slot) = 0;
/** @} */
virtual void accept (bool proxyHandshake,
boost::shared_ptr <NativeSocketType> const& socket) = 0;
virtual void connect (beast::IP::Endpoint const& address) = 0;
// Notification that a peer has connected.
virtual void onPeerActivated (Peer::ref peer) = 0;
// Notification that a peer has disconnected.
virtual void onPeerDisconnect (Peer::ref peer) = 0;
virtual std::size_t size () = 0;
virtual Json::Value json () = 0;
virtual PeerSequence getActivePeers () = 0;
// Peer 64-bit ID function
virtual Peer::pointer findPeerByShortID (Peer::ShortId const& id) = 0;
/** Visit every active peer and return a value
The functor must:
- Be callable as:
void operator()(Peer::ref peer);
- Must have the following typedef:
typedef void return_type;
- Be callable as:
Function::return_type operator()() const;
@param f the functor to call with every peer
@returns `f()`
@note The functor is passed by value!
*/
template<typename Function>
typename boost::disable_if <
boost::is_void <typename Function::return_type>,
typename Function::return_type>::type
foreach(Function f)
{
PeerSequence peers (getActivePeers());
for(PeerSequence::const_iterator i = peers.begin(); i != peers.end(); ++i)
f (*i);
return f();
}
/** Visit every active peer
The visitor functor must:
- Be callable as:
void operator()(Peer::ref peer);
- Must have the following typedef:
typedef void return_type;
@param f the functor to call with every peer
*/
template <class Function>
typename boost::enable_if <
boost::is_void <typename Function::return_type>,
typename Function::return_type>::type
foreach(Function f)
{
PeerSequence peers (getActivePeers());
for(PeerSequence::const_iterator i = peers.begin(); i != peers.end(); ++i)
f (*i);
}
};
/** Sends a message to all peers */
struct send_always
{
typedef void return_type;
PackedMessage::pointer const& msg;
send_always(PackedMessage::pointer const& m)
: msg(m)
{ }
void operator()(Peer::ref peer) const
{
peer->sendPacket (msg, false);
}
};
//------------------------------------------------------------------------------
/** Sends a message to match peers */
template <typename Predicate>
struct send_if_pred
{
typedef void return_type;
PackedMessage::pointer const& msg;
Predicate const& predicate;
send_if_pred(PackedMessage::pointer const& m, Predicate const& p)
: msg(m), predicate(p)
{ }
void operator()(Peer::ref peer) const
{
if (predicate (peer))
peer->sendPacket (msg, false);
}
};
/** Helper function to aid in type deduction */
template <typename Predicate>
send_if_pred<Predicate> send_if (
PackedMessage::pointer const& m,
Predicate const &f)
{
return send_if_pred<Predicate>(m, f);
}
//------------------------------------------------------------------------------
/** Sends a message to non-matching peers */
template <typename Predicate>
struct send_if_not_pred
{
typedef void return_type;
PackedMessage::pointer const& msg;
Predicate const& predicate;
send_if_not_pred(PackedMessage::pointer const& m, Predicate const& p)
: msg(m), predicate(p)
{ }
void operator()(Peer::ref peer) const
{
if (!predicate (peer))
peer->sendPacket (msg, false);
}
};
/** Helper function to aid in type deduction */
template <typename Predicate>
send_if_not_pred<Predicate> send_if_not (
PackedMessage::pointer const& m,
Predicate const &f)
{
return send_if_not_pred<Predicate>(m, f);
}
//------------------------------------------------------------------------------
/** Select the specific peer */
struct match_peer
{
Peer const* matchPeer;
match_peer (Peer const* match = nullptr)
: matchPeer (match)
{ }
bool operator() (Peer::ref peer) const
{
if(matchPeer && (peer.get () == matchPeer))
return true;
return false;
}
};
//------------------------------------------------------------------------------
/** Select all peers (except optional excluded) that are in our cluster */
struct peer_in_cluster
{
match_peer skipPeer;
peer_in_cluster (Peer const* skip = nullptr)
: skipPeer (skip)
{ }
bool operator() (Peer::ref peer) const
{
if (skipPeer (peer))
return false;
if (!peer->isInCluster ())
return false;
return true;
}
};
//------------------------------------------------------------------------------
/** Select all peers that are in the specified set */
struct peer_in_set
{
std::set <Peer::ShortId> const& peerSet;
peer_in_set (std::set<Peer::ShortId> const& peers)
: peerSet (peers)
{ }
bool operator() (Peer::ref peer) const
{
if (peerSet.count (peer->getShortId ()) == 0)
return false;
return true;
}
};
}
#endif

View File

@@ -0,0 +1,62 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_OVERLAY_MAKE_OVERLAY_H_INCLUDED
#define RIPPLE_OVERLAY_MAKE_OVERLAY_H_INCLUDED
#include "Overlay.h"
#include "../../ripple/resource/api/Manager.h"
#include "../../ripple/sitefiles/api/Manager.h"
#include "../../ripple/common/Resolver.h"
#include "../../beast/beast/threads/Stoppable.h"
#include "../../beast/modules/beast_core/files/File.h"
#include <boost/asio/io_service.hpp>
#include <boost/asio/ssl/context.hpp>
namespace ripple {
// VFALCO This is separated so that users of the Overlay interface do not need
// to know about creation details such as asio or ssl.
/** Creates the implementation of Overlay.
@param parent
@param resourceManager
@param siteFiles
@param pathToDbFileOrDirectory
@param resolver
@param io_service
@param context
*/
std::unique_ptr <Overlay>
make_Overlay (
beast::Stoppable& parent,
Resource::Manager& resourceManager,
SiteFiles::Manager& siteFiles,
beast::File const& pathToDbFileOrDirectory,
Resolver& resolver,
boost::asio::io_service& io_service,
boost::asio::ssl::context& ssl_context);
} // ripple
#endif

View File

@@ -0,0 +1,174 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_OVERLAY_PREDICATES_H_INCLUDED
#define RIPPLE_OVERLAY_PREDICATES_H_INCLUDED
#include "Message.h"
#include "Peer.h"
#include <set>
namespace ripple {
/** Sends a message to all peers */
struct send_always
{
typedef void return_type;
Message::pointer const& msg;
send_always(Message::pointer const& m)
: msg(m)
{ }
void operator()(Peer::ptr const& peer) const
{
peer->sendPacket (msg, false);
}
};
//------------------------------------------------------------------------------
/** Sends a message to match peers */
template <typename Predicate>
struct send_if_pred
{
typedef void return_type;
Message::pointer const& msg;
Predicate const& predicate;
send_if_pred(Message::pointer const& m, Predicate const& p)
: msg(m), predicate(p)
{ }
void operator()(Peer::ptr const& peer) const
{
if (predicate (peer))
peer->sendPacket (msg, false);
}
};
/** Helper function to aid in type deduction */
template <typename Predicate>
send_if_pred<Predicate> send_if (
Message::pointer const& m,
Predicate const &f)
{
return send_if_pred<Predicate>(m, f);
}
//------------------------------------------------------------------------------
/** Sends a message to non-matching peers */
template <typename Predicate>
struct send_if_not_pred
{
typedef void return_type;
Message::pointer const& msg;
Predicate const& predicate;
send_if_not_pred(Message::pointer const& m, Predicate const& p)
: msg(m), predicate(p)
{ }
void operator()(Peer::ptr const& peer) const
{
if (!predicate (peer))
peer->sendPacket (msg, false);
}
};
/** Helper function to aid in type deduction */
template <typename Predicate>
send_if_not_pred<Predicate> send_if_not (
Message::pointer const& m,
Predicate const &f)
{
return send_if_not_pred<Predicate>(m, f);
}
//------------------------------------------------------------------------------
/** Select the specific peer */
struct match_peer
{
Peer const* matchPeer;
match_peer (Peer const* match = nullptr)
: matchPeer (match)
{ }
bool operator() (Peer::ptr const& peer) const
{
if(matchPeer && (peer.get () == matchPeer))
return true;
return false;
}
};
//------------------------------------------------------------------------------
/** Select all peers (except optional excluded) that are in our cluster */
struct peer_in_cluster
{
match_peer skipPeer;
peer_in_cluster (Peer const* skip = nullptr)
: skipPeer (skip)
{ }
bool operator() (Peer::ptr const& peer) const
{
if (skipPeer (peer))
return false;
if (!peer->isInCluster ())
return false;
return true;
}
};
//------------------------------------------------------------------------------
/** Select all peers that are in the specified set */
struct peer_in_set
{
std::set <Peer::ShortId> const& peerSet;
peer_in_set (std::set<Peer::ShortId> const& peers)
: peerSet (peers)
{ }
bool operator() (Peer::ptr const& peer) const
{
if (peerSet.count (peer->getShortId ()) == 0)
return false;
return true;
}
};
}
#endif

View File

@@ -17,9 +17,13 @@
*/
//==============================================================================
#include "../api/Message.h"
#include <cstdint>
namespace ripple {
PackedMessage::PackedMessage (::google::protobuf::Message const& message, int type)
Message::Message (::google::protobuf::Message const& message, int type)
{
unsigned const messageBytes = message.ByteSize ();
@@ -31,24 +35,24 @@ PackedMessage::PackedMessage (::google::protobuf::Message const& message, int ty
if (messageBytes != 0)
{
message.SerializeToArray (&mBuffer [PackedMessage::kHeaderBytes], messageBytes);
message.SerializeToArray (&mBuffer [Message::kHeaderBytes], messageBytes);
#ifdef BEAST_DEBUG
//Log::out() << "PackedMessage: type=" << type << ", datalen=" << msg_size;
//Log::out() << "Message: type=" << type << ", datalen=" << msg_size;
#endif
}
}
bool PackedMessage::operator== (PackedMessage const& other) const
bool Message::operator== (Message const& other) const
{
return mBuffer == other.mBuffer;
}
unsigned PackedMessage::getLength (std::vector <uint8_t> const& buf)
unsigned Message::getLength (std::vector <uint8_t> const& buf)
{
unsigned result;
if (buf.size () >= PackedMessage::kHeaderBytes)
if (buf.size () >= Message::kHeaderBytes)
{
result = buf [0];
result <<= 8;
@@ -66,9 +70,9 @@ unsigned PackedMessage::getLength (std::vector <uint8_t> const& buf)
return result;
}
int PackedMessage::getType (std::vector<uint8_t> const& buf)
int Message::getType (std::vector<uint8_t> const& buf)
{
if (buf.size () < PackedMessage::kHeaderBytes)
if (buf.size () < Message::kHeaderBytes)
return 0;
int ret = buf[4];
@@ -77,15 +81,15 @@ int PackedMessage::getType (std::vector<uint8_t> const& buf)
return ret;
}
void PackedMessage::encodeHeader (unsigned size, int type)
void Message::encodeHeader (unsigned size, int type)
{
assert (mBuffer.size () >= PackedMessage::kHeaderBytes);
mBuffer[0] = static_cast<boost::uint8_t> ((size >> 24) & 0xFF);
mBuffer[1] = static_cast<boost::uint8_t> ((size >> 16) & 0xFF);
mBuffer[2] = static_cast<boost::uint8_t> ((size >> 8) & 0xFF);
mBuffer[3] = static_cast<boost::uint8_t> (size & 0xFF);
mBuffer[4] = static_cast<boost::uint8_t> ((type >> 8) & 0xFF);
mBuffer[5] = static_cast<boost::uint8_t> (type & 0xFF);
assert (mBuffer.size () >= Message::kHeaderBytes);
mBuffer[0] = static_cast<std::uint8_t> ((size >> 24) & 0xFF);
mBuffer[1] = static_cast<std::uint8_t> ((size >> 16) & 0xFF);
mBuffer[2] = static_cast<std::uint8_t> ((size >> 8) & 0xFF);
mBuffer[3] = static_cast<std::uint8_t> (size & 0xFF);
mBuffer[4] = static_cast<std::uint8_t> ((type >> 8) & 0xFF);
mBuffer[5] = static_cast<std::uint8_t> (type & 0xFF);
}
}

View File

@@ -0,0 +1,43 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_OVERLAY_MESSAGESTREAM_H_INCLUDED
#define RIPPLE_OVERLAY_MESSAGESTREAM_H_INCLUDED
#include <cstddef>
namespace ripple {
/** Turns blocks of incoming data into protocol messages. */
class MessageStream
{
private:
std::size_t m_bytes;
std::vector <uint8_t> m_buffer;
public:
void
write (void const* buffer, std::size_t bytes)
{
}
};
} // ripple
#endif

View File

@@ -0,0 +1,598 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "OverlayImpl.h"
#include "PeerDoor.h"
#include "PeerImp.h"
#include "../../beast/beast/ByteOrder.h"
namespace ripple {
SETUP_LOG (Peer)
class PeersLog;
template <> char const* LogPartition::getPartitionName <PeersLog> () { return "Overlay"; }
class PeerFinderLog;
template <> char const* LogPartition::getPartitionName <PeerFinderLog> () { return "PeerFinder"; }
class NameResolverLog;
template <> char const* LogPartition::getPartitionName <NameResolverLog> () { return "NameResolver"; }
/** Calls a function during static initialization. */
struct static_call
{
// Function must be callable as
// void f (void) const
//
template <class Function>
static_call (Function const& f)
{
f ();
}
};
static static_call init_PeerFinderLog (&LogPartition::get <PeerFinderLog>);
static static_call init_NameResolverLog (&LogPartition::get <NameResolverLog>);
//------------------------------------------------------------------------------
/** A functor to visit all active peers and retrieve their JSON data */
struct get_peer_json
{
typedef Json::Value return_type;
Json::Value json;
get_peer_json ()
{ }
void operator() (Peer::ptr const& peer)
{
json.append (peer->json ());
}
Json::Value operator() ()
{
return json;
}
};
//------------------------------------------------------------------------------
OverlayImpl::OverlayImpl (Stoppable& parent,
Resource::Manager& resourceManager,
SiteFiles::Manager& siteFiles,
beast::File const& pathToDbFileOrDirectory,
Resolver& resolver,
boost::asio::io_service& io_service,
boost::asio::ssl::context& ssl_context)
: Overlay (parent)
, m_child_count (1)
, m_journal (LogPartition::getJournal <PeersLog> ())
, m_resourceManager (resourceManager)
, m_peerFinder (add (PeerFinder::Manager::New (
*this,
siteFiles,
pathToDbFileOrDirectory,
*this,
get_seconds_clock (),
LogPartition::getJournal <PeerFinderLog> ())))
, m_io_service (io_service)
, m_ssl_context (ssl_context)
, m_resolver (resolver)
{
}
OverlayImpl::~OverlayImpl ()
{
// Block until dependent objects have been destroyed.
// This is just to catch improper use of the Stoppable API.
//
std::unique_lock <decltype(m_mutex)> lock (m_mutex);
m_cond.wait (lock, [this] {
return this->m_child_count == 0; });
}
void
OverlayImpl::accept (bool proxyHandshake, socket_type&& socket)
{
// An error getting an endpoint means the connection closed.
// Just do nothing and the socket will be closed by the caller.
boost::system::error_code ec;
auto const local_endpoint_native (socket.local_endpoint (ec));
if (ec)
return;
auto const remote_endpoint_native (socket.remote_endpoint (ec));
if (ec)
return;
auto const local_endpoint (
beast::IPAddressConversion::from_asio (local_endpoint_native));
auto const remote_endpoint (
beast::IPAddressConversion::from_asio (remote_endpoint_native));
PeerFinder::Slot::ptr const slot (m_peerFinder->new_inbound_slot (
local_endpoint, remote_endpoint));
if (slot == nullptr)
return;
MultiSocket::Flag flags (
MultiSocket::Flag::server_role | MultiSocket::Flag::ssl_required);
if (proxyHandshake)
flags = flags.with (MultiSocket::Flag::proxy);
PeerImp::ptr const peer (boost::make_shared <PeerImp> (
std::move (socket), remote_endpoint, *this, m_resourceManager,
*m_peerFinder, slot, m_ssl_context, flags));
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
{
std::pair <PeersBySlot::iterator, bool> const result (
m_peers.emplace (slot, peer));
assert (result.second);
}
++m_child_count;
// This has to happen while holding the lock,
// otherwise the socket might not be canceled during a stop.
peer->start ();
}
}
void
OverlayImpl::connect (beast::IP::Endpoint const& remote_endpoint)
{
if (isStopping())
{
m_journal.debug <<
"Skipping " << remote_endpoint <<
" connect on stop";
return;
}
PeerFinder::Slot::ptr const slot (
m_peerFinder->new_outbound_slot (remote_endpoint));
if (slot == nullptr)
return;
MultiSocket::Flag const flags (
MultiSocket::Flag::client_role | MultiSocket::Flag::ssl);
PeerImp::ptr const peer (boost::make_shared <PeerImp> (
remote_endpoint, m_io_service, *this, m_resourceManager,
*m_peerFinder, slot, m_ssl_context, flags));
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
{
std::pair <PeersBySlot::iterator, bool> const result (
m_peers.emplace (slot, peer));
assert (result.second);
}
++m_child_count;
// This has to happen while holding the lock,
// otherwise the socket might not be canceled during a stop.
peer->start ();
}
}
Peer::ShortId
OverlayImpl::next_id()
{
return ++m_nextShortId;
}
//--------------------------------------------------------------------------
// Check for the stopped condition
// Caller must hold the mutex
void
OverlayImpl::check_stopped ()
{
// To be stopped, child Stoppable objects must be stopped
// and the count of dependent objects must be zero
if (areChildrenStopped () && m_child_count == 0)
{
m_cond.notify_all ();
m_journal.info <<
"Stopped.";
stopped ();
}
}
// Decrement the count of dependent objects
// Caller must hold the mutex
void
OverlayImpl::release ()
{
if (--m_child_count == 0)
check_stopped ();
}
void
OverlayImpl::remove (PeerFinder::Slot::ptr const& slot)
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
PeersBySlot::iterator const iter (m_peers.find (slot));
assert (iter != m_peers.end ());
m_peers.erase (iter);
release();
}
//--------------------------------------------------------------------------
//
// PeerFinder::Callback
//
//--------------------------------------------------------------------------
void
OverlayImpl::connect (std::vector <beast::IP::Endpoint> const& list)
{
for (std::vector <beast::IP::Endpoint>::const_iterator iter (list.begin());
iter != list.end(); ++iter)
connect (*iter);
}
void
OverlayImpl::activate (PeerFinder::Slot::ptr const& slot)
{
m_journal.trace <<
"Activate " << slot->remote_endpoint();
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
PeersBySlot::iterator const iter (m_peers.find (slot));
assert (iter != m_peers.end ());
PeerImp::ptr const peer (iter->second.lock());
assert (peer != nullptr);
peer->activate ();
}
void
OverlayImpl::send (PeerFinder::Slot::ptr const& slot,
std::vector <PeerFinder::Endpoint> const& endpoints)
{
typedef std::vector <PeerFinder::Endpoint> List;
protocol::TMEndpoints tm;
for (List::const_iterator iter (endpoints.begin());
iter != endpoints.end(); ++iter)
{
PeerFinder::Endpoint const& ep (*iter);
protocol::TMEndpoint& tme (*tm.add_endpoints());
if (ep.address.is_v4())
tme.mutable_ipv4()->set_ipv4(
beast::toNetworkByteOrder (ep.address.to_v4().value));
else
tme.mutable_ipv4()->set_ipv4(0);
tme.mutable_ipv4()->set_ipv4port (ep.address.port());
tme.set_hops (ep.hops);
}
tm.set_version (1);
Message::pointer msg (
boost::make_shared <Message> (
tm, protocol::mtENDPOINTS));
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
PeersBySlot::iterator const iter (m_peers.find (slot));
assert (iter != m_peers.end ());
PeerImp::ptr const peer (iter->second.lock());
assert (peer != nullptr);
peer->sendPacket (msg, false);
}
}
void
OverlayImpl::disconnect (PeerFinder::Slot::ptr const& slot, bool graceful)
{
if (m_journal.trace) m_journal.trace <<
"Disconnect " << slot->remote_endpoint () <<
(graceful ? "gracefully" : "");
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
PeersBySlot::iterator const iter (m_peers.find (slot));
assert (iter != m_peers.end ());
PeerImp::ptr const peer (iter->second.lock());
assert (peer != nullptr);
peer->close (graceful);
//peer->detach ("disc", false);
}
//--------------------------------------------------------------------------
//
// Stoppable
//
//--------------------------------------------------------------------------
void
OverlayImpl::onPrepare ()
{
PeerFinder::Config config;
if (getConfig ().PEERS_MAX != 0)
config.maxPeers = getConfig ().PEERS_MAX;
config.outPeers = config.calcOutPeers();
config.wantIncoming =
(! getConfig ().PEER_PRIVATE) &&
(getConfig().peerListeningPort != 0);
// if it's a private peer or we are running as standalone
// automatic connections would defeat the purpose.
config.autoConnect =
!getConfig().RUN_STANDALONE &&
!getConfig().PEER_PRIVATE;
config.listeningPort = getConfig().peerListeningPort;
config.features = "";
// Enforce business rules
config.applyTuning ();
m_peerFinder->setConfig (config);
auto bootstrapIps (getConfig ().IPS);
// If no IPs are specified, use the Ripple Labs round robin
// pool to get some servers to insert into the boot cache.
if (bootstrapIps.empty ())
bootstrapIps.push_back ("r.ripple.com 51235");
if (!bootstrapIps.empty ())
{
m_resolver.resolve (bootstrapIps,
[this](
std::string const& name,
std::vector <beast::IP::Endpoint> const& addresses)
{
std::vector <std::string> ips;
for (auto const& addr : addresses)
ips.push_back (to_string (addr));
std::string const base ("config: ");
if (!ips.empty ())
m_peerFinder->addFallbackStrings (base + name, ips);
});
}
// Add the ips_fixed from the rippled.cfg file
if (! getConfig ().RUN_STANDALONE && !getConfig ().IPS_FIXED.empty ())
{
m_resolver.resolve (getConfig ().IPS_FIXED,
[this](
std::string const& name,
std::vector <beast::IP::Endpoint> const& addresses)
{
if (!addresses.empty ())
m_peerFinder->addFixedPeer (name, addresses);
});
}
// Configure the peer doors, which allow the server to accept incoming
// peer connections:
if (! getConfig ().RUN_STANDALONE)
{
m_doorDirect = make_PeerDoor (
PeerDoor::sslRequired,
*this,
getConfig ().PEER_IP,
getConfig ().peerListeningPort,
m_io_service);
if (getConfig ().peerPROXYListeningPort != 0)
{
m_doorProxy = make_PeerDoor (
PeerDoor::sslAndPROXYRequired,
*this,
getConfig ().PEER_IP,
getConfig ().peerPROXYListeningPort,
m_io_service);
}
}
}
void
OverlayImpl::onStart ()
{
}
/** Close all peer connections.
If `graceful` is true then active
Requirements:
Caller must hold the mutex.
*/
void
OverlayImpl::close_all (bool graceful)
{
for (auto const& entry : m_peers)
{
PeerImp::ptr const peer (entry.second.lock());
// VFALCO The only case where the weak_ptr is expired should be if
// ~PeerImp is pre-empted before it calls m_peers.remove()
//
if (peer != nullptr)
peer->close (graceful);
}
}
void
OverlayImpl::onStop ()
{
if (m_doorDirect)
m_doorDirect->stop();
if (m_doorProxy)
m_doorProxy->stop();
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
// Take off the extra count we added in the constructor
release();
close_all (false);
}
void
OverlayImpl::onChildrenStopped ()
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
check_stopped ();
}
//--------------------------------------------------------------------------
//
// PropertyStream
//
//--------------------------------------------------------------------------
void
OverlayImpl::onWrite (beast::PropertyStream::Map& stream)
{
}
//--------------------------------------------------------------------------
/** A peer has connected successfully
This is called after the peer handshake has been completed and during
peer activation. At this point, the peer address and the public key
are known.
*/
void
OverlayImpl::onPeerActivated (Peer::ptr const& peer)
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
// Now track this peer
{
auto const result (m_shortIdMap.emplace (
std::piecewise_construct,
std::make_tuple (peer->getShortId()),
std::make_tuple (peer)));
assert(result.second);
}
{
auto const result (m_publicKeyMap.emplace (
boost::unordered::piecewise_construct,
boost::make_tuple (peer->getNodePublic()),
boost::make_tuple (peer)));
assert(result.second);
}
m_journal.debug <<
"activated " << peer->getRemoteAddress() <<
" (" << peer->getShortId() <<
":" << RipplePublicKey(peer->getNodePublic()) << ")";
// We just accepted this peer so we have non-zero active peers
assert(size() != 0);
}
/** A peer is being disconnected
This is called during the disconnection of a known, activated peer. It
will not be called for outbound peer connections that don't succeed or
for connections of peers that are dropped prior to being activated.
*/
void
OverlayImpl::onPeerDisconnect (Peer::ptr const& peer)
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
m_shortIdMap.erase (peer->getShortId ());
m_publicKeyMap.erase (peer->getNodePublic ());
}
/** The number of active peers on the network
Active peers are only those peers that have completed the handshake
and are running the Ripple protocol.
*/
std::size_t
OverlayImpl::size ()
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
return m_publicKeyMap.size ();
}
// Returns information on verified peers.
Json::Value
OverlayImpl::json ()
{
return foreach (get_peer_json());
}
Overlay::PeerSequence
OverlayImpl::getActivePeers ()
{
Overlay::PeerSequence ret;
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
ret.reserve (m_publicKeyMap.size ());
BOOST_FOREACH (PeerByPublicKey::value_type const& pair, m_publicKeyMap)
{
assert (!!pair.second);
ret.push_back (pair.second);
}
return ret;
}
Peer::ptr
OverlayImpl::findPeerByShortID (Peer::ShortId const& id)
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
PeerByShortId::iterator const iter (
m_shortIdMap.find (id));
if (iter != m_shortIdMap.end ())
return iter->second;
return Peer::ptr();
}
//------------------------------------------------------------------------------
std::unique_ptr <Overlay>
make_Overlay (
beast::Stoppable& parent,
Resource::Manager& resourceManager,
SiteFiles::Manager& siteFiles,
beast::File const& pathToDbFileOrDirectory,
Resolver& resolver,
boost::asio::io_service& io_service,
boost::asio::ssl::context& ssl_context)
{
return std::make_unique <OverlayImpl> (parent, resourceManager, siteFiles,
pathToDbFileOrDirectory, resolver, io_service, ssl_context);
}
}

View File

@@ -0,0 +1,226 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_OVERLAY_OVERLAYIMPL_H_INCLUDED
#define RIPPLE_OVERLAY_OVERLAYIMPL_H_INCLUDED
#include "../api/Overlay.h"
#include "../../ripple/common/Resolver.h"
#include "../../ripple/common/seconds_clock.h"
#include "../../ripple/peerfinder/api/Callback.h"
#include "../../ripple/peerfinder/api/Manager.h"
#include "../../ripple/resource/api/Manager.h"
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/context.hpp>
#include <boost/unordered_map.hpp>
#include "../../beast/beast/cxx14/memory.h" // <memory>
#include <cassert>
#include <condition_variable>
#include <mutex>
#include <unordered_map>
namespace ripple {
class PeerDoor;
class PeerImp;
class OverlayImpl
: public Overlay
, public PeerFinder::Callback
{
public:
typedef boost::asio::ip::tcp::socket socket_type;
typedef std::unordered_map <PeerFinder::Slot::ptr,
boost::weak_ptr <PeerImp>> PeersBySlot;
typedef boost::unordered_map <
RippleAddress, Peer::ptr> PeerByPublicKey;
typedef std::unordered_map <
Peer::ShortId, Peer::ptr> PeerByShortId;
std::recursive_mutex m_mutex;
// Blocks us until dependent objects have been destroyed
std::condition_variable_any m_cond;
// Number of dependencies that must be destroyed before we can stop
std::size_t m_child_count;
beast::Journal m_journal;
Resource::Manager& m_resourceManager;
std::unique_ptr <PeerFinder::Manager> m_peerFinder;
boost::asio::io_service& m_io_service;
boost::asio::ssl::context& m_ssl_context;
/** Associates slots to peers. */
PeersBySlot m_peers;
/** Tracks peers by their public key */
PeerByPublicKey m_publicKeyMap;
/** Tracks peers by their session ID */
PeerByShortId m_shortIdMap;
/** The peer door for regular SSL connections */
std::unique_ptr <PeerDoor> m_doorDirect;
/** The peer door for proxy connections */
std::unique_ptr <PeerDoor> m_doorProxy;
/** The resolver we use for peer hostnames */
Resolver& m_resolver;
/** Monotically increasing identifiers for peers */
beast::Atomic <Peer::ShortId> m_nextShortId;
//--------------------------------------------------------------------------
OverlayImpl (Stoppable& parent,
Resource::Manager& resourceManager,
SiteFiles::Manager& siteFiles,
beast::File const& pathToDbFileOrDirectory,
Resolver& resolver,
boost::asio::io_service& io_service,
boost::asio::ssl::context& ssl_context);
~OverlayImpl ();
OverlayImpl (OverlayImpl const&) = delete;
OverlayImpl& operator= (OverlayImpl const&) = delete;
/** Process an incoming connection using the Peer protocol.
The caller transfers ownership of the socket via rvalue move.
@param proxyHandshake `true` If a PROXY handshake is required.
@param socket A socket in the accepted state.
*/
void
accept (bool proxyHandshake,
socket_type&& socket);
void
connect (beast::IP::Endpoint const& remote_endpoint);
Peer::ShortId
next_id();
//--------------------------------------------------------------------------
void
check_stopped ();
void
release ();
void
remove (PeerFinder::Slot::ptr const& slot);
//
// PeerFinder::Callback
//
void
connect (std::vector <beast::IP::Endpoint> const& list);
void
activate (PeerFinder::Slot::ptr const& slot);
void
send (PeerFinder::Slot::ptr const& slot,
std::vector <PeerFinder::Endpoint> const& endpoints);
void
disconnect (PeerFinder::Slot::ptr const& slot, bool graceful);
//
// Stoppable
//
void
onPrepare () override;
void
onStart () override;
/** Close all peer connections.
If `graceful` is true then active
Requirements:
Caller must hold the mutex.
*/
void
close_all (bool graceful);
void
onStop () override;
void
onChildrenStopped () override;
//
// PropertyStream
//
void
onWrite (beast::PropertyStream::Map& stream);
//--------------------------------------------------------------------------
/** A peer has connected successfully
This is called after the peer handshake has been completed and during
peer activation. At this point, the peer address and the public key
are known.
*/
void
onPeerActivated (Peer::ptr const& peer);
/** A peer is being disconnected
This is called during the disconnection of a known, activated peer. It
will not be called for outbound peer connections that don't succeed or
for connections of peers that are dropped prior to being activated.
*/
void
onPeerDisconnect (Peer::ptr const& peer);
/** The number of active peers on the network
Active peers are only those peers that have completed the handshake
and are running the Ripple protocol.
*/
std::size_t
size ();
// Returns information on verified peers.
Json::Value
json ();
Overlay::PeerSequence
getActivePeers ();
Peer::ptr
findPeerByShortID (Peer::ShortId const& id);
};
} // ripple
#endif

View File

@@ -17,6 +17,7 @@
*/
//==============================================================================
#include "OverlayImpl.h"
#include "PeerDoor.h"
namespace ripple {
@@ -27,16 +28,24 @@ class PeerDoorImp
: public PeerDoor
, public beast::LeakChecked <PeerDoorImp>
{
private:
OverlayImpl& m_overlay;
beast::Journal m_journal;
Kind m_kind;
boost::asio::ip::tcp::acceptor m_acceptor;
boost::asio::deadline_timer m_acceptDelay;
NativeSocketType m_socket;
public:
PeerDoorImp (Kind kind, Peers& peers,
boost::asio::ip::tcp::endpoint const &ep,
boost::asio::io_service& io_service)
: PeerDoor (static_cast<Stoppable&>(peers))
, m_peers (peers)
PeerDoorImp (Kind kind, OverlayImpl& overlay,
boost::asio::ip::tcp::endpoint const &ep,
boost::asio::io_service& io_service)
: m_overlay (overlay)
, m_journal (LogPartition::getJournal <PeerDoor> ())
, m_kind (kind)
, m_acceptor (io_service, ep)
, m_acceptDelay (io_service)
, m_socket (io_service)
{
m_journal.info <<
"Listening on " <<
@@ -47,8 +56,18 @@ public:
async_accept ();
}
~PeerDoorImp ()
void
stop()
{
{
boost::system::error_code ec;
m_acceptDelay.cancel (ec);
}
{
boost::system::error_code ec;
m_acceptor.cancel (ec);
}
}
//--------------------------------------------------------------------------
@@ -57,14 +76,9 @@ public:
//
void async_accept ()
{
boost::shared_ptr <NativeSocketType> socket (
boost::make_shared <NativeSocketType> (
m_acceptor.get_io_service()));
m_acceptor.async_accept (*socket,
m_acceptor.async_accept (m_socket,
boost::bind (&PeerDoorImp::handleAccept, this,
boost::asio::placeholders::error,
socket));
boost::asio::placeholders::error));
}
//--------------------------------------------------------------------------
@@ -73,7 +87,7 @@ public:
//
void handleTimer (boost::system::error_code ec)
{
if (ec == boost::asio::error::operation_aborted || isStopping ())
if (ec == boost::asio::error::operation_aborted)
return;
async_accept ();
@@ -81,10 +95,9 @@ public:
// Called when the accept socket wait completes
//
void handleAccept (boost::system::error_code ec,
boost::shared_ptr <NativeSocketType> const& socket)
void handleAccept (boost::system::error_code ec)
{
if (ec == boost::asio::error::operation_aborted || isStopping ())
if (ec == boost::asio::error::operation_aborted)
return;
bool delay = false;
@@ -93,7 +106,7 @@ public:
{
bool const proxyHandshake (m_kind == sslAndPROXYRequired);
m_peers.accept (proxyHandshake, socket);
m_overlay.accept (proxyHandshake, std::move(m_socket));
}
else
{
@@ -103,6 +116,8 @@ public:
m_journal.info << "Error " << ec;
}
m_socket.close(ec);
if (delay)
{
m_acceptDelay.expires_from_now (boost::posix_time::milliseconds (500));
@@ -114,43 +129,13 @@ public:
async_accept ();
}
}
//--------------------------------------------------------------------------
void onStop ()
{
{
boost::system::error_code ec;
m_acceptDelay.cancel (ec);
}
{
boost::system::error_code ec;
m_acceptor.cancel (ec);
}
stopped ();
}
private:
Peers& m_peers;
beast::Journal m_journal;
Kind m_kind;
boost::asio::ip::tcp::acceptor m_acceptor;
boost::asio::deadline_timer m_acceptDelay;
};
//------------------------------------------------------------------------------
PeerDoor::PeerDoor (Stoppable& parent)
: Stoppable ("PeerDoor", parent)
{
}
//------------------------------------------------------------------------------
std::unique_ptr<PeerDoor>
createPeerDoor (
PeerDoor::Kind kind, Peers& peers,
make_PeerDoor (
PeerDoor::Kind kind, OverlayImpl& overlay,
std::string const& ip, int port,
boost::asio::io_service& io_service)
{
@@ -161,7 +146,7 @@ createPeerDoor (
boost::asio::ip::address ().from_string (
ip.empty () ? "0.0.0.0" : ip), port);
return std::make_unique<PeerDoorImp>(kind, peers, ep, io_service);
return std::make_unique<PeerDoorImp>(kind, overlay, ep, io_service);
}
}

View File

@@ -20,29 +20,31 @@
#ifndef RIPPLE_PEERDOOR_H_INCLUDED
#define RIPPLE_PEERDOOR_H_INCLUDED
#include "OverlayImpl.h"
#include "../../beast/beast/cxx14/memory.h" // <memory>
namespace ripple {
/** Handles incoming connections from peers. */
class PeerDoor : public beast::Stoppable
class PeerDoor
{
protected:
explicit PeerDoor (Stoppable& parent);
public:
virtual ~PeerDoor () { }
virtual ~PeerDoor () = default;
enum Kind
{
sslRequired,
sslAndPROXYRequired
};
virtual
void stop() = 0;
};
std::unique_ptr <PeerDoor>
createPeerDoor (
PeerDoor::Kind kind, Peers& peers,
make_PeerDoor (
PeerDoor::Kind kind, OverlayImpl& overlay,
std::string const& ip, int port,
boost::asio::io_service& io_service);

View File

@@ -20,10 +20,25 @@
#ifndef RIPPLE_OVERLAY_PEERIMP_H_INCLUDED
#define RIPPLE_OVERLAY_PEERIMP_H_INCLUDED
#include "../api/predicates.h"
#include "../../ripple/common/MultiSocket.h"
#include "../../ripple_data/protocol/Protocol.h"
#include "../ripple/validators/ripple_validators.h"
#include "../ripple/peerfinder/ripple_peerfinder.h"
#include "../ripple_app/misc/ProofOfWork.h"
#include "../ripple_app/misc/ProofOfWorkFactory.h"
// VFALCO This is unfortunate. Comment this out and
// just include what is needed.
#include "../ripple_app/ripple_app.h"
#include <cstdint>
namespace ripple {
typedef boost::asio::ip::tcp::socket NativeSocketType;
class PeerImp;
std::string to_string (Peer const& peer);
@@ -40,36 +55,10 @@ std::ostream& operator<< (std::ostream& os, PeerImp const* peer);
//------------------------------------------------------------------------------
struct get_usable_peers
{
typedef Peers::PeerSequence return_type;
Peers::PeerSequence usablePeers;
uint256 const& txHash;
Peer const* skip;
get_usable_peers(uint256 const& hash, Peer const* s)
: txHash(hash), skip(s)
{ }
void operator() (Peer::ref peer)
{
if (peer->hasTxSet (txHash) && (peer.get () != skip))
usablePeers.push_back (peer);
}
return_type operator() ()
{
return usablePeers;
}
};
//------------------------------------------------------------------------------
class PeerImp
: public Peer
, public CountedObject <PeerImp>
, public boost::enable_shared_from_this <PeerImp>
, private beast::LeakChecked <Peer>
{
private:
/** Time alloted for a peer to send a HELLO message (DEPRECATED) */
@@ -127,7 +116,7 @@ private:
return;
}
getNativeSocket ().async_connect (
m_socket->next_layer <NativeSocketType>().async_connect (
beast::IPAddressConversion::to_asio_endpoint (m_remoteAddress),
m_strand.wrap (boost::bind (&PeerImp::onConnect,
shared_from_this (), boost::asio::placeholders::error)));
@@ -155,7 +144,7 @@ public:
typedef boost::shared_ptr <PeerImp> ptr;
boost::shared_ptr <NativeSocketType> m_shared_socket;
NativeSocketType m_owned_socket;
beast::Journal m_journal;
@@ -173,7 +162,7 @@ public:
//
Resource::Manager& m_resourceManager;
PeerFinder::Manager& m_peerFinder;
Peers& m_peers;
OverlayImpl& m_overlay;
bool m_inbound;
std::unique_ptr <MultiSocket> m_socket;
@@ -205,8 +194,8 @@ public:
boost::asio::deadline_timer m_timer;
std::vector<uint8_t> m_readBuffer;
std::list<PackedMessage::pointer> mSendQ;
PackedMessage::pointer mSendingPacket;
std::list<Message::pointer> mSendQ;
Message::pointer mSendingPacket;
protocol::TMStatusChange mLastStatus;
protocol::TMHello mHello;
@@ -221,31 +210,31 @@ public:
//--------------------------------------------------------------------------
/** New incoming peer from the specified socket */
PeerImp (
boost::shared_ptr <NativeSocketType> const& socket,
NativeSocketType&& socket,
beast::IP::Endpoint remoteAddress,
Peers& peers,
OverlayImpl& overlay,
Resource::Manager& resourceManager,
PeerFinder::Manager& peerFinder,
PeerFinder::Slot::ptr const& slot,
boost::asio::ssl::context& ssl_context,
MultiSocket::Flag flags)
: m_shared_socket (socket)
: m_owned_socket (std::move (socket))
, m_journal (LogPartition::getJournal <Peer> ())
, m_shortId (0)
, m_remoteAddress (remoteAddress)
, m_resourceManager (resourceManager)
, m_peerFinder (peerFinder)
, m_peers (peers)
, m_overlay (overlay)
, m_inbound (true)
, m_socket (MultiSocket::New (
*socket, ssl_context, flags.asBits ()))
, m_strand (socket->get_io_service())
m_owned_socket, ssl_context, flags.asBits ()))
, m_strand (m_owned_socket.get_io_service())
, m_state (stateConnected)
, m_detaching (false)
, m_clusterNode (false)
, m_minLedger (0)
, m_maxLedger (0)
, m_timer (socket->get_io_service())
, m_timer (m_owned_socket.get_io_service())
, m_slot (slot)
, m_was_canceled (false)
{
@@ -260,18 +249,19 @@ public:
PeerImp (
beast::IP::Endpoint remoteAddress,
boost::asio::io_service& io_service,
Peers& peers,
OverlayImpl& overlay,
Resource::Manager& resourceManager,
PeerFinder::Manager& peerFinder,
PeerFinder::Slot::ptr const& slot,
boost::asio::ssl::context& ssl_context,
MultiSocket::Flag flags)
: m_journal (LogPartition::getJournal <Peer> ())
: m_owned_socket (io_service)
, m_journal (LogPartition::getJournal <Peer> ())
, m_shortId (0)
, m_remoteAddress (remoteAddress)
, m_resourceManager (resourceManager)
, m_peerFinder (peerFinder)
, m_peers (peers)
, m_overlay (overlay)
, m_inbound (false)
, m_socket (MultiSocket::New (
io_service, ssl_context, flags.asBits ()))
@@ -287,15 +277,14 @@ public:
{
}
virtual ~PeerImp ()
virtual
~PeerImp ()
{
m_peers.remove (m_slot);
m_overlay.remove (m_slot);
}
NativeSocketType& getNativeSocket ()
{
return m_socket->next_layer <NativeSocketType> ();
}
PeerImp (PeerImp const&) = delete;
PeerImp& operator= (PeerImp const&) = delete;
MultiSocket& getStream ()
{
@@ -346,7 +335,7 @@ public:
m_peerFinder.on_closed (m_slot);
if (m_state == stateActive)
m_peers.onPeerDisconnect (shared_from_this ());
m_overlay.onPeerDisconnect (shared_from_this ());
m_state = stateGracefulClose;
@@ -442,7 +431,9 @@ public:
{
bassert (m_state == stateHandshaked);
m_state = stateActive;
m_peers.onPeerActivated(shared_from_this ());
bassert(m_shortId == 0);
m_shortId = m_overlay.next_id();
m_overlay.onPeerActivated(shared_from_this ());
}
void start ()
@@ -461,7 +452,7 @@ public:
//--------------------------------------------------------------------------
void sendPacket (const PackedMessage::pointer& packet, bool onStrand)
void sendPacket (const Message::pointer& packet, bool onStrand)
{
if (packet)
{
@@ -486,12 +477,12 @@ public:
void sendGetPeers ()
{
// Ask peer for known other peers.
protocol::TMGetPeers getPeers;
protocol::TMGetPeers msg;
getPeers.set_doweneedthis (1);
msg.set_doweneedthis (1);
PackedMessage::pointer packet = boost::make_shared<PackedMessage> (
getPeers, protocol::mtGET_PEERS);
Message::pointer packet = boost::make_shared<Message> (
msg, protocol::mtGET_PEERS);
sendPacket (packet, true);
}
@@ -502,6 +493,14 @@ public:
detach ("resource");
}
static void charge (boost::weak_ptr <Peer>& peer, Resource::Charge const& fee)
{
Peer::ptr p (peer.lock());
if (p != nullptr)
p->charge (fee);
}
Json::Value json ()
{
Json::Value ret (Json::objectValue);
@@ -617,13 +616,6 @@ public:
return false;
}
void setShortId(Peer::ShortId shortId)
{
bassert((m_shortId == 0) && (shortId != 0));
m_shortId = shortId;
}
Peer::ShortId getShortId () const
{
return m_shortId;
@@ -696,7 +688,7 @@ private:
if (!mSendQ.empty ())
{
PackedMessage::pointer packet = mSendQ.front ();
Message::pointer packet = mSendQ.front ();
if (packet)
{
@@ -722,7 +714,7 @@ private:
return;
}
unsigned msg_len = PackedMessage::getLength (m_readBuffer);
unsigned msg_len = Message::getLength (m_readBuffer);
// WRITEME: Compare to maximum message length, abort if too large
if ((msg_len > (32 * 1024 * 1024)) || (msg_len == 0))
@@ -824,7 +816,7 @@ private:
void processReadBuffer ()
{
// must not hold peer lock
int type = PackedMessage::getType (m_readBuffer);
int type = Message::getType (m_readBuffer);
LoadEvent::autoptr event (
getApp().getJobQueue ().getLoadEventAP (jtPEER, "Peer::read"));
@@ -848,7 +840,7 @@ private:
return;
}
size_t msgLen (m_readBuffer.size () - PackedMessage::kHeaderBytes);
size_t msgLen (m_readBuffer.size () - Message::kHeaderBytes);
switch (type)
{
@@ -857,7 +849,7 @@ private:
event->reName ("Peer::hello");
protocol::TMHello msg;
if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvHello (msg);
else
@@ -870,7 +862,7 @@ private:
event->reName ("Peer::cluster");
protocol::TMCluster msg;
if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvCluster (msg);
else
@@ -883,7 +875,7 @@ private:
event->reName ("Peer::errormessage");
protocol::TMErrorMsg msg;
if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvErrorMessage (msg);
else
@@ -896,7 +888,7 @@ private:
event->reName ("Peer::ping");
protocol::TMPing msg;
if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvPing (msg);
else
@@ -909,7 +901,7 @@ private:
event->reName ("Peer::getcontacts");
protocol::TMGetContacts msg;
if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvGetContacts (msg);
else
@@ -922,7 +914,7 @@ private:
event->reName ("Peer::contact");
protocol::TMContact msg;
if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvContact (msg);
else
@@ -935,7 +927,7 @@ private:
event->reName ("Peer::getpeers");
protocol::TMGetPeers msg;
if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvGetPeers (msg);
else
@@ -948,7 +940,7 @@ private:
event->reName ("Peer::peers");
protocol::TMPeers msg;
if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvPeers (msg);
else
@@ -961,7 +953,7 @@ private:
event->reName ("Peer::endpoints");
protocol::TMEndpoints msg;
if(msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if(msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvEndpoints (msg);
else
@@ -974,7 +966,7 @@ private:
event->reName ("Peer::searchtransaction");
protocol::TMSearchTransaction msg;
if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvSearchTransaction (msg);
else
@@ -987,7 +979,7 @@ private:
event->reName ("Peer::getaccount");
protocol::TMGetAccount msg;
if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvGetAccount (msg);
else
@@ -1000,7 +992,7 @@ private:
event->reName ("Peer::account");
protocol::TMAccount msg;
if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvAccount (msg);
else
@@ -1013,7 +1005,7 @@ private:
event->reName ("Peer::transaction");
protocol::TMTransaction msg;
if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvTransaction (msg);
else
@@ -1026,7 +1018,7 @@ private:
event->reName ("Peer::statuschange");
protocol::TMStatusChange msg;
if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvStatus (msg);
else
@@ -1040,7 +1032,7 @@ private:
boost::shared_ptr<protocol::TMProposeSet> msg (
boost::make_shared<protocol::TMProposeSet> ());
if (msg->ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg->ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvPropose (msg);
else
@@ -1054,7 +1046,7 @@ private:
boost::shared_ptr<protocol::TMGetLedger> msg (
boost::make_shared<protocol::TMGetLedger> ());
if (msg->ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg->ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvGetLedger (msg);
else
@@ -1068,7 +1060,7 @@ private:
boost::shared_ptr<protocol::TMLedgerData> msg (
boost::make_shared<protocol::TMLedgerData> ());
if (msg->ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg->ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvLedger (msg);
else
@@ -1081,7 +1073,7 @@ private:
event->reName ("Peer::haveset");
protocol::TMHaveTransactionSet msg;
if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvHaveTxSet (msg);
else
@@ -1095,7 +1087,7 @@ private:
boost::shared_ptr<protocol::TMValidation> msg (
boost::make_shared<protocol::TMValidation> ());
if (msg->ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg->ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvValidation (msg);
else
@@ -1108,7 +1100,7 @@ private:
{
protocol::TM msg;
if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes], msgLen))
if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes], msgLen))
recv (msg);
else
m_journal.warning << "parse error: " << type;
@@ -1123,7 +1115,7 @@ private:
boost::shared_ptr<protocol::TMGetObjectByHash> msg =
boost::make_shared<protocol::TMGetObjectByHash> ();
if (msg->ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg->ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvGetObjectByHash (msg);
else
@@ -1136,7 +1128,7 @@ private:
event->reName ("Peer::proofofwork");
protocol::TMProofWork msg;
if (msg.ParseFromArray (&m_readBuffer[PackedMessage::kHeaderBytes],
if (msg.ParseFromArray (&m_readBuffer[Message::kHeaderBytes],
msgLen))
recvProofWork (msg);
else
@@ -1158,7 +1150,7 @@ private:
if (!m_detaching)
{
m_readBuffer.clear ();
m_readBuffer.resize (PackedMessage::kHeaderBytes);
m_readBuffer.resize (Message::kHeaderBytes);
boost::asio::async_read (getStream (),
boost::asio::buffer (m_readBuffer),
@@ -1171,17 +1163,17 @@ private:
void startReadBody (unsigned msg_len)
{
// The first PackedMessage::kHeaderBytes bytes of m_readbuf already
// The first Message::kHeaderBytes bytes of m_readbuf already
// contains the header. Expand it to fit in the body as well, and
// start async read into the body.
if (!m_detaching)
{
m_readBuffer.resize (PackedMessage::kHeaderBytes + msg_len);
m_readBuffer.resize (Message::kHeaderBytes + msg_len);
boost::asio::async_read (getStream (),
boost::asio::buffer (
&m_readBuffer [PackedMessage::kHeaderBytes], msg_len),
&m_readBuffer [Message::kHeaderBytes], msg_len),
m_strand.wrap (boost::bind (
&PeerImp::handleReadBody,
boost::static_pointer_cast <PeerImp> (shared_from_this ()),
@@ -1190,7 +1182,7 @@ private:
}
}
void sendPacketForce (const PackedMessage::pointer& packet)
void sendPacketForce (const Message::pointer& packet)
{
// must be on IO strand
if (!m_detaching)
@@ -1339,7 +1331,7 @@ private:
h.set_ledgerprevious (hash.begin (), hash.GetSerializeSize ());
}
PackedMessage::pointer packet = boost::make_shared<PackedMessage> (
Message::pointer packet = boost::make_shared<Message> (
h, protocol::mtHELLO);
sendPacket (packet, true);
@@ -1597,7 +1589,7 @@ private:
isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
"recvValidation->checkValidation",
BIND_TYPE (
&PeerImp::checkValidation, P_1, &m_peers, val,
&PeerImp::checkValidation, P_1, &m_overlay, val,
isTrusted, m_clusterNode, packet,
boost::weak_ptr<Peer> (shared_from_this ())));
}
@@ -1638,7 +1630,7 @@ private:
// response with some data here anyways, and send if non-empty.
sendPacket (
boost::make_shared<PackedMessage> (peers, protocol::mtPEERS),
boost::make_shared<Message> (peers, protocol::mtPEERS),
true);
#endif
}
@@ -1761,7 +1753,7 @@ private:
" of " << packet.objects_size () <<
" for " << to_string (this);
sendPacket (
boost::make_shared<PackedMessage> (reply, protocol::mtGET_OBJECTS),
boost::make_shared<Message> (reply, protocol::mtGET_OBJECTS),
true);
}
else
@@ -1824,7 +1816,7 @@ private:
if (packet.type () == protocol::TMPing::ptPING)
{
packet.set_type (protocol::TMPing::ptPONG);
sendPacket (boost::make_shared<PackedMessage> (packet, protocol::mtPING), true);
sendPacket (boost::make_shared<Message> (packet, protocol::mtPING), true);
}
}
@@ -1847,7 +1839,7 @@ private:
void recvGetLedger (boost::shared_ptr<protocol::TMGetLedger> const& packet)
{
getApp().getJobQueue().addJob (jtPACK, "recvGetLedger",
std::bind (&sGetLedger, boost::weak_ptr<Peer> (shared_from_this ()), packet));
std::bind (&sGetLedger, boost::weak_ptr<PeerImp> (shared_from_this ()), packet));
}
void recvLedger (boost::shared_ptr<protocol::TMLedgerData> const& packet_ptr)
@@ -1862,12 +1854,12 @@ private:
if (packet.has_requestcookie ())
{
Peer::pointer target = m_peers.findPeerByShortID (packet.requestcookie ());
Peer::ptr target = m_overlay.findPeerByShortID (packet.requestcookie ());
if (target)
{
packet.clear_requestcookie ();
target->sendPacket (boost::make_shared<PackedMessage> (packet, protocol::mtLEDGER_DATA), false);
target->sendPacket (boost::make_shared<Message> (packet, protocol::mtLEDGER_DATA), false);
}
else
{
@@ -2067,7 +2059,7 @@ private:
getApp().getJobQueue ().addJob (isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
"recvPropose->checkPropose", BIND_TYPE (
&PeerImp::checkPropose, P_1, &m_peers, packet, proposal, consensusLCL,
&PeerImp::checkPropose, P_1, &m_overlay, packet, proposal, consensusLCL,
m_nodePublicKey, boost::weak_ptr<Peer> (shared_from_this ()), m_clusterNode));
}
@@ -2228,7 +2220,31 @@ private:
{
m_journal.debug << "Trying to route TX set request";
Peers::PeerSequence usablePeers (m_peers.foreach (
struct get_usable_peers
{
typedef Overlay::PeerSequence return_type;
Overlay::PeerSequence usablePeers;
uint256 const& txHash;
Peer const* skip;
get_usable_peers(uint256 const& hash, Peer const* s)
: txHash(hash), skip(s)
{ }
void operator() (Peer::ptr const& peer)
{
if (peer->hasTxSet (txHash) && (peer.get () != skip))
usablePeers.push_back (peer);
}
return_type operator() ()
{
return usablePeers;
}
};
Overlay::PeerSequence usablePeers (m_overlay.foreach (
get_usable_peers (txHash, this)));
if (usablePeers.empty ())
@@ -2237,10 +2253,10 @@ private:
return;
}
Peer::ref selectedPeer = usablePeers[rand () % usablePeers.size ()];
Peer::ptr const& selectedPeer = usablePeers[rand () % usablePeers.size ()];
packet.set_requestcookie (getShortId ());
selectedPeer->sendPacket (
boost::make_shared<PackedMessage> (packet, protocol::mtGET_LEDGER),
boost::make_shared<Message> (packet, protocol::mtGET_LEDGER),
false);
return;
}
@@ -2297,9 +2313,9 @@ private:
if (packet.has_ledgerseq ())
seq = packet.ledgerseq ();
Peers::PeerSequence peerList = m_peers.getActivePeers ();
Peers::PeerSequence usablePeers;
BOOST_FOREACH (Peer::ref peer, peerList)
Overlay::PeerSequence peerList = m_overlay.getActivePeers ();
Overlay::PeerSequence usablePeers;
BOOST_FOREACH (Peer::ptr const& peer, peerList)
{
if (peer->hasLedger (ledgerhash, seq) && (peer.get () != this))
usablePeers.push_back (peer);
@@ -2311,10 +2327,10 @@ private:
return;
}
Peer::ref selectedPeer = usablePeers[rand () % usablePeers.size ()];
Peer::ptr const& selectedPeer = usablePeers[rand () % usablePeers.size ()];
packet.set_requestcookie (getShortId ());
selectedPeer->sendPacket (
boost::make_shared<PackedMessage> (packet, protocol::mtGET_LEDGER), false);
boost::make_shared<Message> (packet, protocol::mtGET_LEDGER), false);
m_journal.debug << "Ledger request routed";
return;
}
@@ -2404,7 +2420,7 @@ private:
}
}
PackedMessage::pointer oPacket = boost::make_shared<PackedMessage> (reply, protocol::mtLEDGER_DATA);
Message::pointer oPacket = boost::make_shared<Message> (reply, protocol::mtLEDGER_DATA);
sendPacket (oPacket, false);
return;
}
@@ -2488,15 +2504,17 @@ private:
}
}
PackedMessage::pointer oPacket = boost::make_shared<PackedMessage> (reply, protocol::mtLEDGER_DATA);
Message::pointer oPacket = boost::make_shared<Message> (reply, protocol::mtLEDGER_DATA);
sendPacket (oPacket, false);
}
// This is dispatched by the job queue
static void sGetLedger (boost::weak_ptr<Peer> wPeer,
boost::shared_ptr<protocol::TMGetLedger> packet)
static
void
sGetLedger (boost::weak_ptr<PeerImp> wPeer,
boost::shared_ptr <protocol::TMGetLedger> packet)
{
boost::shared_ptr<Peer> peer = wPeer.lock ();
boost::shared_ptr<PeerImp> peer = wPeer.lock ();
if (peer)
peer->getLedger (*packet);
@@ -2584,14 +2602,14 @@ private:
}
else
{
Peer::pointer pptr (peer.lock ());
Peer::ptr pptr (peer.lock ());
if (pptr)
{
protocol::TMProofWork reply;
reply.set_token (pow->getToken ());
reply.set_response (solution.begin (), solution.size ());
pptr->sendPacket (boost::make_shared<PackedMessage> (reply, protocol::mtPROOFOFWORK), false);
pptr->sendPacket (boost::make_shared<Message> (reply, protocol::mtPROOFOFWORK), false);
}
else
{
@@ -2612,7 +2630,7 @@ private:
getApp().getLedgerMaster().getValidLedgerIndex()))
{ // Transaction has expired
getApp().getHashRouter().setFlag(stx->getTransactionID(), SF_BAD);
Peer::charge (peer, Resource::feeUnwantedData);
charge (peer, Resource::feeUnwantedData);
return;
}
@@ -2623,7 +2641,7 @@ private:
if (tx->getStatus () == INVALID)
{
getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_BAD);
Peer::charge (peer, Resource::feeInvalidSignature);
charge (peer, Resource::feeInvalidSignature);
return;
}
else
@@ -2636,14 +2654,14 @@ private:
catch (...)
{
getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_BAD);
Peer::charge (peer, Resource::feeInvalidRequest);
charge (peer, Resource::feeInvalidRequest);
}
#endif
}
// Called from our JobQueue
static void checkPropose (Job& job, Peers* pPeers, boost::shared_ptr<protocol::TMProposeSet> packet,
static void checkPropose (Job& job, Overlay* pPeers, boost::shared_ptr<protocol::TMProposeSet> packet,
LedgerProposal::pointer proposal, uint256 consensusLCL, RippleAddress nodePublic,
boost::weak_ptr<Peer> peer, bool fromCluster)
{
@@ -2667,10 +2685,10 @@ private:
if (!fromCluster && !proposal->checkSign (set.signature ()))
{
Peer::pointer p = peer.lock ();
Peer::ptr p = peer.lock ();
WriteLog(lsWARNING, Peer) << "proposal with previous ledger fails sig check: " <<
*p;
Peer::charge (peer, Resource::feeInvalidSignature);
charge (peer, Resource::feeInvalidSignature);
return;
}
else
@@ -2705,7 +2723,7 @@ private:
proposal->getSuppressionID (), peers, SF_RELAYED))
{
pPeers->foreach (send_if_not (
boost::make_shared<PackedMessage> (set, protocol::mtPROPOSE_LEDGER),
boost::make_shared<Message> (set, protocol::mtPROPOSE_LEDGER),
peer_in_set(peers)));
}
}
@@ -2715,7 +2733,7 @@ private:
}
}
static void checkValidation (Job&, Peers* pPeers, SerializedValidation::pointer val, bool isTrusted, bool isCluster,
static void checkValidation (Job&, Overlay* pPeers, SerializedValidation::pointer val, bool isTrusted, bool isCluster,
boost::shared_ptr<protocol::TMValidation> packet, boost::weak_ptr<Peer> peer)
{
#ifndef TRUST_NETWORK
@@ -2727,12 +2745,12 @@ private:
if (!isCluster && !val->isValid (signingHash))
{
WriteLog(lsWARNING, Peer) << "Validation is invalid";
Peer::charge (peer, Resource::feeInvalidRequest);
charge (peer, Resource::feeInvalidRequest);
return;
}
std::string source;
Peer::pointer lp = peer.lock ();
Peer::ptr lp = peer.lock ();
if (lp)
source = to_string(*lp);
@@ -2757,7 +2775,7 @@ private:
getApp().getHashRouter ().swapSet (signingHash, peers, SF_RELAYED))
{
pPeers->foreach (send_if_not (
boost::make_shared<PackedMessage> (*packet, protocol::mtVALIDATION),
boost::make_shared<Message> (*packet, protocol::mtVALIDATION),
peer_in_set(peers)));
}
}
@@ -2766,7 +2784,7 @@ private:
catch (...)
{
WriteLog(lsTRACE, Peer) << "Exception processing validation";
Peer::charge (peer, Resource::feeInvalidRequest);
charge (peer, Resource::feeInvalidRequest);
}
#endif
}
@@ -2774,18 +2792,13 @@ private:
//------------------------------------------------------------------------------
void Peer::charge (boost::weak_ptr <Peer>& peer, Resource::Charge const& fee)
{
Peer::pointer p (peer.lock());
if (p != nullptr)
p->charge (fee);
}
const boost::posix_time::seconds PeerImp::nodeVerifySeconds (15);
//------------------------------------------------------------------------------
std::string to_string (PeerImp const& peer)
// to_string should not be used we should just use lexical_cast maybe
inline std::string to_string (PeerImp const& peer)
{
if (peer.isInCluster())
return peer.getClusterNodeName();
@@ -2793,19 +2806,19 @@ std::string to_string (PeerImp const& peer)
return peer.getRemoteAddress().to_string();
}
std::string to_string (PeerImp const* peer)
inline std::string to_string (PeerImp const* peer)
{
return to_string (*peer);
}
std::ostream& operator<< (std::ostream& os, PeerImp const& peer)
inline std::ostream& operator<< (std::ostream& os, PeerImp const& peer)
{
os << to_string (peer);
return os;
}
std::ostream& operator<< (std::ostream& os, PeerImp const* peer)
inline std::ostream& operator<< (std::ostream& os, PeerImp const* peer)
{
os << to_string (peer);
@@ -2814,7 +2827,7 @@ std::ostream& operator<< (std::ostream& os, PeerImp const* peer)
//------------------------------------------------------------------------------
std::string to_string (Peer const& peer)
inline std::string to_string (Peer const& peer)
{
if (peer.isInCluster())
return peer.getClusterNodeName();
@@ -2822,19 +2835,19 @@ std::string to_string (Peer const& peer)
return peer.getRemoteAddress().to_string();
}
std::string to_string (Peer const* peer)
inline std::string to_string (Peer const* peer)
{
return to_string (*peer);
}
std::ostream& operator<< (std::ostream& os, Peer const& peer)
inline std::ostream& operator<< (std::ostream& os, Peer const& peer)
{
os << to_string (peer);
return os;
}
std::ostream& operator<< (std::ostream& os, Peer const* peer)
inline std::ostream& operator<< (std::ostream& os, Peer const* peer)
{
os << to_string (peer);

View File

@@ -1,637 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "PeerDoor.h"
#include "PeerImp.h"
#include "../../ripple/common/seconds_clock.h"
#include "../../beast/beast/ByteOrder.h"
#include <boost/config.hpp>
#include <cassert>
#include <condition_variable>
#include <mutex>
namespace ripple {
SETUP_LOG (Peer)
class PeersLog;
template <> char const* LogPartition::getPartitionName <PeersLog> () { return "Peers"; }
class PeerFinderLog;
template <> char const* LogPartition::getPartitionName <PeerFinderLog> () { return "PeerFinder"; }
class NameResolverLog;
template <> char const* LogPartition::getPartitionName <NameResolverLog> () { return "NameResolver"; }
/** Calls a function during static initialization. */
struct static_call
{
// Function must be callable as
// void f (void) const
//
template <class Function>
static_call (Function const& f)
{
f ();
}
};
static static_call init_PeerFinderLog (&LogPartition::get <PeerFinderLog>);
static static_call init_NameResolverLog (&LogPartition::get <NameResolverLog>);
//------------------------------------------------------------------------------
/** A functor to visit all active peers and retrieve their JSON data */
struct get_peer_json
{
typedef Json::Value return_type;
Json::Value json;
get_peer_json ()
{ }
void operator() (Peer::ref peer)
{
json.append (peer->json ());
}
Json::Value operator() ()
{
return json;
}
};
//------------------------------------------------------------------------------
class PeersImp
: public Peers
, public PeerFinder::Callback
, public beast::LeakChecked <PeersImp>
{
public:
typedef std::unordered_map <PeerFinder::Slot::ptr,
boost::weak_ptr <PeerImp>> PeersBySlot;
typedef boost::unordered_map <
RippleAddress, Peer::pointer> PeerByPublicKey;
typedef boost::unordered_map <
Peer::ShortId, Peer::pointer> PeerByShortId;
std::recursive_mutex m_mutex;
// Blocks us until dependent objects have been destroyed
std::condition_variable_any m_cond;
// Number of dependencies that must be destroyed before we can stop
std::size_t m_child_count;
beast::Journal m_journal;
Resource::Manager& m_resourceManager;
std::unique_ptr <PeerFinder::Manager> m_peerFinder;
boost::asio::io_service& m_io_service;
boost::asio::ssl::context& m_ssl_context;
/** Associates slots to peers. */
PeersBySlot m_peers;
/** Tracks peers by their public key */
PeerByPublicKey m_publicKeyMap;
/** Tracks peers by their session ID */
PeerByShortId m_shortIdMap;
/** The peer door for regular SSL connections */
std::unique_ptr <PeerDoor> m_doorDirect;
/** The peer door for proxy connections */
std::unique_ptr <PeerDoor> m_doorProxy;
/** The resolver we use for peer hostnames */
Resolver& m_resolver;
/** Monotically increasing identifiers for peers */
beast::Atomic <Peer::ShortId> m_nextShortId;
//--------------------------------------------------------------------------
//
// Peers
//
//--------------------------------------------------------------------------
PeersImp (Stoppable& parent,
Resource::Manager& resourceManager,
SiteFiles::Manager& siteFiles,
beast::File const& pathToDbFileOrDirectory,
Resolver& resolver,
boost::asio::io_service& io_service,
boost::asio::ssl::context& ssl_context)
: Peers (parent)
, m_child_count (1)
, m_journal (LogPartition::getJournal <PeersLog> ())
, m_resourceManager (resourceManager)
, m_peerFinder (add (PeerFinder::Manager::New (
*this,
siteFiles,
pathToDbFileOrDirectory,
*this,
get_seconds_clock (),
LogPartition::getJournal <PeerFinderLog> ())))
, m_io_service (io_service)
, m_ssl_context (ssl_context)
, m_resolver (resolver)
{
}
~PeersImp ()
{
// Block until dependent objects have been destroyed.
// This is just to catch improper use of the Stoppable API.
//
std::unique_lock <decltype(m_mutex)> lock (m_mutex);
m_cond.wait (lock, [this] {
return this->m_child_count == 0; });
}
void accept (
bool proxyHandshake,
boost::shared_ptr <NativeSocketType> const& socket)
{
// An error getting an endpoint means the connection closed.
// Just do nothing and the socket will be closed by the caller.
boost::system::error_code ec;
auto const local_endpoint_native (socket->local_endpoint (ec));
if (ec)
return;
auto const remote_endpoint_native (socket->remote_endpoint (ec));
if (ec)
return;
auto const local_endpoint (
beast::IPAddressConversion::from_asio (local_endpoint_native));
auto const remote_endpoint (
beast::IPAddressConversion::from_asio (remote_endpoint_native));
PeerFinder::Slot::ptr const slot (m_peerFinder->new_inbound_slot (
local_endpoint, remote_endpoint));
if (slot == nullptr)
return;
MultiSocket::Flag flags (
MultiSocket::Flag::server_role | MultiSocket::Flag::ssl_required);
if (proxyHandshake)
flags = flags.with (MultiSocket::Flag::proxy);
PeerImp::ptr const peer (boost::make_shared <PeerImp> (
socket, remote_endpoint, *this, m_resourceManager, *m_peerFinder,
slot, m_ssl_context, flags));
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
{
std::pair <PeersBySlot::iterator, bool> const result (
m_peers.emplace (slot, peer));
assert (result.second);
}
++m_child_count;
// This has to happen while holding the lock,
// otherwise the socket might not be canceled during a stop.
peer->start ();
}
}
void connect (beast::IP::Endpoint const& remote_endpoint)
{
if (isStopping())
{
m_journal.debug <<
"Skipping " << remote_endpoint <<
" connect on stop";
return;
}
PeerFinder::Slot::ptr const slot (
m_peerFinder->new_outbound_slot (remote_endpoint));
if (slot == nullptr)
return;
MultiSocket::Flag const flags (
MultiSocket::Flag::client_role | MultiSocket::Flag::ssl);
PeerImp::ptr const peer (boost::make_shared <PeerImp> (
remote_endpoint, m_io_service, *this, m_resourceManager,
*m_peerFinder, slot, m_ssl_context, flags));
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
{
std::pair <PeersBySlot::iterator, bool> const result (
m_peers.emplace (slot, peer));
assert (result.second);
}
++m_child_count;
// This has to happen while holding the lock,
// otherwise the socket might not be canceled during a stop.
peer->start ();
}
}
//--------------------------------------------------------------------------
// Check for the stopped condition
// Caller must hold the mutex
void check_stopped ()
{
// To be stopped, child Stoppable objects must be stopped
// and the count of dependent objects must be zero
if (areChildrenStopped () && m_child_count == 0)
{
m_cond.notify_all ();
m_journal.info <<
"Stopped.";
stopped ();
}
}
// Decrement the count of dependent objects
// Caller must hold the mutex
void release ()
{
if (--m_child_count == 0)
check_stopped ();
}
void remove (PeerFinder::Slot::ptr const& slot)
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
PeersBySlot::iterator const iter (m_peers.find (slot));
assert (iter != m_peers.end ());
m_peers.erase (iter);
release();
}
//--------------------------------------------------------------------------
//
// PeerFinder::Callback
//
//--------------------------------------------------------------------------
void connect (std::vector <beast::IP::Endpoint> const& list)
{
for (std::vector <beast::IP::Endpoint>::const_iterator iter (list.begin());
iter != list.end(); ++iter)
connect (*iter);
}
void activate (PeerFinder::Slot::ptr const& slot)
{
m_journal.trace <<
"Activate " << slot->remote_endpoint();
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
PeersBySlot::iterator const iter (m_peers.find (slot));
assert (iter != m_peers.end ());
PeerImp::ptr const peer (iter->second.lock());
assert (peer != nullptr);
peer->activate ();
}
void send (PeerFinder::Slot::ptr const& slot,
std::vector <PeerFinder::Endpoint> const& endpoints)
{
typedef std::vector <PeerFinder::Endpoint> List;
protocol::TMEndpoints tm;
for (List::const_iterator iter (endpoints.begin());
iter != endpoints.end(); ++iter)
{
PeerFinder::Endpoint const& ep (*iter);
protocol::TMEndpoint& tme (*tm.add_endpoints());
if (ep.address.is_v4())
tme.mutable_ipv4()->set_ipv4(
beast::toNetworkByteOrder (ep.address.to_v4().value));
else
tme.mutable_ipv4()->set_ipv4(0);
tme.mutable_ipv4()->set_ipv4port (ep.address.port());
tme.set_hops (ep.hops);
}
tm.set_version (1);
PackedMessage::pointer msg (
boost::make_shared <PackedMessage> (
tm, protocol::mtENDPOINTS));
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
PeersBySlot::iterator const iter (m_peers.find (slot));
assert (iter != m_peers.end ());
PeerImp::ptr const peer (iter->second.lock());
assert (peer != nullptr);
peer->sendPacket (msg, false);
}
}
void disconnect (PeerFinder::Slot::ptr const& slot, bool graceful)
{
if (m_journal.trace) m_journal.trace <<
"Disconnect " << slot->remote_endpoint () <<
(graceful ? "gracefully" : "");
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
PeersBySlot::iterator const iter (m_peers.find (slot));
assert (iter != m_peers.end ());
PeerImp::ptr const peer (iter->second.lock());
assert (peer != nullptr);
peer->close (graceful);
//peer->detach ("disc", false);
}
//--------------------------------------------------------------------------
//
// Stoppable
//
//--------------------------------------------------------------------------
void onPrepare ()
{
PeerFinder::Config config;
if (getConfig ().PEERS_MAX != 0)
config.maxPeers = getConfig ().PEERS_MAX;
config.outPeers = config.calcOutPeers();
config.wantIncoming =
(! getConfig ().PEER_PRIVATE) &&
(getConfig().peerListeningPort != 0);
// if it's a private peer or we are running as standalone
// automatic connections would defeat the purpose.
config.autoConnect =
!getConfig().RUN_STANDALONE &&
!getConfig().PEER_PRIVATE;
config.listeningPort = getConfig().peerListeningPort;
config.features = "";
// Enforce business rules
config.applyTuning ();
m_peerFinder->setConfig (config);
auto bootstrapIps (getConfig ().IPS);
// If no IPs are specified, use the Ripple Labs round robin
// pool to get some servers to insert into the boot cache.
if (bootstrapIps.empty ())
bootstrapIps.push_back ("r.ripple.com 51235");
if (!bootstrapIps.empty ())
{
m_resolver.resolve (bootstrapIps,
[this](
std::string const& name,
std::vector <beast::IP::Endpoint> const& addresses)
{
std::vector <std::string> ips;
for (auto const& addr : addresses)
ips.push_back (to_string (addr));
std::string const base ("config: ");
if (!ips.empty ())
m_peerFinder->addFallbackStrings (base + name, ips);
});
}
// Add the ips_fixed from the rippled.cfg file
if (! getConfig ().RUN_STANDALONE && !getConfig ().IPS_FIXED.empty ())
{
m_resolver.resolve (getConfig ().IPS_FIXED,
[this](
std::string const& name,
std::vector <beast::IP::Endpoint> const& addresses)
{
if (!addresses.empty ())
m_peerFinder->addFixedPeer (name, addresses);
});
}
// Configure the peer doors, which allow the server to accept incoming
// peer connections:
if (! getConfig ().RUN_STANDALONE)
{
m_doorDirect = createPeerDoor (
PeerDoor::sslRequired,
*this,
getConfig ().PEER_IP,
getConfig ().peerListeningPort,
m_io_service);
if (getConfig ().peerPROXYListeningPort != 0)
{
m_doorProxy = createPeerDoor (
PeerDoor::sslAndPROXYRequired,
*this,
getConfig ().PEER_IP,
getConfig ().peerPROXYListeningPort,
m_io_service);
}
}
}
void onStart ()
{
}
/** Close all peer connections.
If `graceful` is true then active
Requirements:
Caller must hold the mutex.
*/
void close_all (bool graceful)
{
for (auto const& entry : m_peers)
{
PeerImp::ptr const peer (entry.second.lock());
// VFALCO The only case where the weak_ptr is expired should be if
// ~PeerImp is pre-empted before it calls m_peers.remove()
//
if (peer != nullptr)
peer->close (graceful);
}
}
void onStop ()
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
// Take off the extra count we added in the constructor
release();
close_all (false);
}
void onChildrenStopped ()
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
check_stopped ();
}
//--------------------------------------------------------------------------
//
// PropertyStream
//
//--------------------------------------------------------------------------
void onWrite (beast::PropertyStream::Map& stream)
{
}
//--------------------------------------------------------------------------
/** A peer has connected successfully
This is called after the peer handshake has been completed and during
peer activation. At this point, the peer address and the public key
are known.
*/
void onPeerActivated (Peer::ref peer)
{
// First assign this peer a new short ID
peer->setShortId(++m_nextShortId);
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
// Now track this peer
std::pair<PeerByShortId::iterator, bool> idResult(
m_shortIdMap.emplace (
boost::unordered::piecewise_construct,
boost::make_tuple (peer->getShortId()),
boost::make_tuple (peer)));
assert(idResult.second);
std::pair<PeerByPublicKey::iterator, bool> keyResult(
m_publicKeyMap.emplace (
boost::unordered::piecewise_construct,
boost::make_tuple (peer->getNodePublic()),
boost::make_tuple (peer)));
assert(keyResult.second);
m_journal.debug <<
"activated " << peer->getRemoteAddress() <<
" (" << peer->getShortId() <<
":" << RipplePublicKey(peer->getNodePublic()) << ")";
// We just accepted this peer so we have non-zero active peers
assert(size() != 0);
}
/** A peer is being disconnected
This is called during the disconnection of a known, activated peer. It
will not be called for outbound peer connections that don't succeed or
for connections of peers that are dropped prior to being activated.
*/
void onPeerDisconnect (Peer::ref peer)
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
m_shortIdMap.erase (peer->getShortId ());
m_publicKeyMap.erase (peer->getNodePublic ());
}
/** The number of active peers on the network
Active peers are only those peers that have completed the handshake
and are running the Ripple protocol.
*/
std::size_t size ()
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
return m_publicKeyMap.size ();
}
// Returns information on verified peers.
Json::Value json ()
{
return foreach (get_peer_json());
}
Peers::PeerSequence getActivePeers ()
{
Peers::PeerSequence ret;
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
ret.reserve (m_publicKeyMap.size ());
BOOST_FOREACH (PeerByPublicKey::value_type const& pair, m_publicKeyMap)
{
assert (!!pair.second);
ret.push_back (pair.second);
}
return ret;
}
Peer::pointer findPeerByShortID (Peer::ShortId const& id)
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
PeerByShortId::iterator const iter (
m_shortIdMap.find (id));
if (iter != m_shortIdMap.end ())
return iter->second;
return Peer::pointer();
}
};
//------------------------------------------------------------------------------
Peers::~Peers ()
{
}
Peers* Peers::New (
Stoppable& parent,
Resource::Manager& resourceManager,
SiteFiles::Manager& siteFiles,
beast::File const& pathToDbFileOrDirectory,
Resolver& resolver,
boost::asio::io_service& io_service,
boost::asio::ssl::context& ssl_context)
{
return new PeersImp (parent, resourceManager, siteFiles,
pathToDbFileOrDirectory, resolver, io_service, ssl_context);
}
}

View File

@@ -19,17 +19,8 @@
#include "../../BeastConfig.h"
#include "ripple_overlay.h"
#include "../ripple_app/ripple_app.h"
#include "../ripple/validators/ripple_validators.h"
#include "../ripple/peerfinder/ripple_peerfinder.h"
#include "../ripple_app/misc/ProofOfWork.h"
#include "../ripple_app/misc/ProofOfWorkFactory.h"
#include "impl/PackedMessage.cpp"
#include "impl/Message.cpp"
#include "impl/OverlayImpl.cpp"
#include "impl/PeerImp.h"
#include "impl/PeerDoor.cpp"
#include "impl/Peers.cpp"

View File

@@ -19,11 +19,4 @@
#ifndef RIPPLE_OVERLAY_H_INCLUDED
#define RIPPLE_OVERLAY_H_INCLUDED
#include "../ripple_data/ripple_data.h"
#include "api/PackedMessage.h"
#include "api/Peer.h"
#include "api/Peers.h"
#endif