Add PeerFinder::Checker for testing endpoints

This commit is contained in:
Vinnie Falco
2013-10-04 19:11:10 -07:00
parent 625780621b
commit bb29c8ba85
25 changed files with 919 additions and 221 deletions

View File

@@ -72,6 +72,12 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\json\ripple_json.cpp" />
<ClCompile Include="..\..\src\ripple\peerfinder\impl\Checker.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\Config.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
@@ -1635,7 +1641,11 @@
<ClInclude Include="..\..\src\ripple\peerfinder\api\Endpoint.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\api\Manager.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\api\Types.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Checker.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\CheckerAdapter.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\EndpointCache.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\LegacyEndpoint.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\LegacyEndpointCache.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Logic.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\PeerInfo.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Slots.h" />
@@ -1643,6 +1653,7 @@
<ClInclude Include="..\..\src\ripple\peerfinder\impl\SourceStrings.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Store.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\StoreSqdb.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Tuning.h" />
<ClInclude Include="..\..\src\ripple\peerfinder\ripple_peerfinder.h" />
<ClInclude Include="..\..\src\ripple\rpc\api\Handler.h" />
<ClInclude Include="..\..\src\ripple\rpc\api\Manager.h" />
@@ -1672,6 +1683,7 @@
<ClInclude Include="..\..\src\ripple\types\api\base_uint.h" />
<ClInclude Include="..\..\src\ripple\types\api\Blob.h" />
<ClInclude Include="..\..\src\ripple\types\api\ByteOrder.h" />
<ClInclude Include="..\..\src\ripple\types\api\CycledSet.h" />
<ClInclude Include="..\..\src\ripple\types\api\IdentifierStorage.h" />
<ClInclude Include="..\..\src\ripple\types\api\IdentifierType.h" />
<ClInclude Include="..\..\src\ripple\types\api\HashMaps.h" />

View File

@@ -1089,6 +1089,9 @@
<ClCompile Include="..\..\src\ripple\json\impl\Tests.cpp">
<Filter>[1] Ripple\json\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\peerfinder\impl\Checker.cpp">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\src\ripple_basics\containers\KeyCache.h">
@@ -2196,6 +2199,24 @@
<ClInclude Include="..\..\src\ripple\validators\impl\Count.h">
<Filter>[1] Ripple\validators\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Checker.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\Tuning.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\LegacyEndpoint.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\types\api\CycledSet.h">
<Filter>[1] Ripple\types\api</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\LegacyEndpointCache.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\peerfinder\impl\CheckerAdapter.h">
<Filter>[1] Ripple\peerfinder\impl</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<CustomBuild Include="..\..\src\ripple_data\protocol\ripple.proto">

View File

@@ -20,9 +20,6 @@
#ifndef RIPPLE_PEERFINDER_CALLBACK_H_INCLUDED
#define RIPPLE_PEERFINDER_CALLBACK_H_INCLUDED
#include "Endpoint.h"
#include "Types.h"
namespace ripple {
namespace PeerFinder {

View File

@@ -20,8 +20,6 @@
#ifndef RIPPLE_PEERFINDER_ENDPOINT_H_INCLUDED
#define RIPPLE_PEERFINDER_ENDPOINT_H_INCLUDED
#include "Types.h"
namespace ripple {
namespace PeerFinder {

View File

@@ -0,0 +1,183 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
namespace ripple {
namespace PeerFinder {
class CheckerImp
: public Checker
, private Thread
, private LeakChecked <CheckerImp>
{
private:
class Request;
struct State
{
List <Request> list;
};
typedef SharedData <State> SharedState;
SharedState m_state;
boost::asio::io_service m_io_service;
boost::optional <boost::asio::io_service::work> m_work;
//--------------------------------------------------------------------------
static boost::asio::ip::tcp::endpoint fromIPEndpoint (
IPEndpoint const& ipEndpoint)
{
if (ipEndpoint.isV4 ())
{
return boost::asio::ip::tcp::endpoint (
boost::asio::ip::address_v4 (
ipEndpoint.v4().value),
ipEndpoint.port ());
}
bassertfalse;
return boost::asio::ip::tcp::endpoint ();
}
//--------------------------------------------------------------------------
class Request
: public SharedObject
, public List <Request>::Node
, private LeakChecked <Request>
{
public:
typedef SharedPtr <Request> Ptr;
typedef boost::asio::ip::tcp Protocol;
typedef boost::system::error_code error_code;
typedef Protocol::socket socket_type;
typedef Protocol::endpoint endpoint_type;
CheckerImp& m_owner;
boost::asio::io_service& m_io_service;
IPEndpoint m_address;
AbstractHandler <void (Result)> m_handler;
socket_type m_socket;
boost::system::error_code m_error;
bool m_canAccept;
Request (CheckerImp& owner, boost::asio::io_service& io_service,
IPEndpoint const& address, AbstractHandler <void (Result)> handler)
: m_owner (owner)
, m_io_service (io_service)
, m_address (address)
, m_handler (handler)
, m_socket (m_io_service)
, m_canAccept (false)
{
m_owner.add (*this);
m_socket.async_connect (fromIPEndpoint (m_address),
wrapHandler (boost::bind (&Request::handle_connect, Ptr(this),
boost::asio::placeholders::error), m_handler));
}
~Request ()
{
Result result;
result.address = m_address;
result.error = m_error;
m_io_service.wrap (m_handler) (result);
m_owner.remove (*this);
}
void cancel ()
{
m_socket.cancel();
}
void handle_connect (boost::system::error_code ec)
{
m_error = ec;
if (ec)
return;
m_canAccept = true;
}
};
//--------------------------------------------------------------------------
void add (Request& request)
{
SharedState::Access state (m_state);
state->list.push_back (request);
}
void remove (Request& request)
{
SharedState::Access state (m_state);
state->list.erase (state->list.iterator_to (request));
}
void run ()
{
m_io_service.run ();
}
public:
CheckerImp ()
: Thread ("PeerFinder::Checker")
, m_work (boost::in_place (boost::ref (m_io_service)))
{
startThread ();
}
~CheckerImp ()
{
// cancel pending i/o
cancel();
// destroy the io_service::work object
m_work = boost::none;
// signal and wait for the thread to exit gracefully
stopThread ();
}
void cancel ()
{
SharedState::Access state (m_state);
for (List <Request>::iterator iter (state->list.begin());
iter != state->list.end(); ++iter)
iter->cancel();
}
void async_test (IPEndpoint const& endpoint,
AbstractHandler <void (Result)> handler)
{
new Request (*this, m_io_service, endpoint, handler);
}
};
//------------------------------------------------------------------------------
Checker* Checker::New ()
{
return new CheckerImp;
}
}
}

View File

@@ -0,0 +1,89 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_CHECKER_H_INCLUDED
#define RIPPLE_PEERFINDER_CHECKER_H_INCLUDED
namespace ripple {
namespace PeerFinder {
/** Tests remote listening sockets to make sure they are connectible. */
class Checker
{
public:
/** Create the service.
This will automatically start the associated thread and io_service.
*/
static Checker* New ();
/** Destroy the service.
Any pending I/O operations will be canceled. This call blocks until
all pending operations complete (either with success or with
operation_aborted) and the associated thread and io_service have
no more work remaining.
*/
virtual ~Checker () { }
/** Cancel pending I/O.
This issues cancel orders for all pending I/O operations and then
returns immediately. Handlers will receive operation_aborted errors,
or if they were already queued they will complete normally.
*/
virtual void cancel () = 0;
struct Result
{
Result ()
: canAccept (false)
{ }
/** The original address. */
IPEndpoint address;
/** The error code from the operation. */
boost::system::error_code error;
/** `true` if the endpoint is reachable, else `false`.
Only defined if no error occurred.
*/
bool canAccept;
};
/** Performs an async connection test on the specified endpoint.
The port must be non-zero.
Handler will be called with this signature:
void (Result const& result);
*/
template <typename Handler>
void async_test (IPEndpoint const& endpoint,
BEAST_MOVE_ARG(Handler) handler)
{
async_test (endpoint,
AbstractHandler <void (Result)> (
BEAST_MOVE_CAST(Handler)(handler)));
}
virtual void async_test (IPEndpoint const& endpoint,
AbstractHandler <void (Result)> handler) = 0;
};
}
}
#endif

View File

@@ -0,0 +1,83 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_CHECKERADAPTER_H_INCLUDED
#define RIPPLE_PEERFINDER_CHECKERADAPTER_H_INCLUDED
namespace ripple {
namespace PeerFinder {
/** Adapts a ServiceQueue to dispatch Checker handler completions.
This lets the Logic have its Checker handler get dispatched
on the ServiceQueue instead of an io_service thread. Otherwise,
Logic would need a ServiceQueue to dispatch from its handler.
*/
class CheckerAdapter : public Checker
{
private:
ServiceQueue& m_queue;
ScopedPointer <Checker> m_checker;
struct Handler
{
ServiceQueue* m_queue;
AbstractHandler <void (Checker::Result)> m_handler;
Handler (
ServiceQueue& queue,
AbstractHandler <void (Checker::Result)> handler)
: m_queue (&queue)
, m_handler (handler)
{ }
void operator() (Checker::Result result)
{
m_queue->wrap (m_handler) (result);
}
};
public:
explicit CheckerAdapter (ServiceQueue& queue)
: m_queue (queue)
, m_checker (Checker::New())
{
}
~CheckerAdapter ()
{
// Have to do this before other fields get destroyed
m_checker = nullptr;
}
void cancel ()
{
m_checker->cancel();
}
void async_test (IPEndpoint const& endpoint,
AbstractHandler <void (Checker::Result)> handler)
{
m_checker->async_test (endpoint, Handler (m_queue, handler));
}
};
}
}
#endif

View File

@@ -17,8 +17,6 @@
*/
//==============================================================================
#include "../api/Config.h"
namespace ripple {
namespace PeerFinder {

View File

@@ -20,10 +20,6 @@
#ifndef RIPPLE_PEERFINDER_ENDPOINTCACHE_H_INCLUDED
#define RIPPLE_PEERFINDER_ENDPOINTCACHE_H_INCLUDED
#include "../../ripple/types/api/AgedHistory.h"
#include "../api/Types.h"
namespace ripple {
namespace PeerFinder {

View File

@@ -0,0 +1,52 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_LEGACYENDPOINT_H_INCLUDED
#define RIPPLE_PEERFINDER_LEGACYENDPOINT_H_INCLUDED
namespace ripple {
namespace PeerFinder {
struct LegacyEndpoint
{
LegacyEndpoint ()
: checked (false)
, canAccept (false)
{ }
LegacyEndpoint (IPEndpoint const& address_)
: address (address_)
{ }
IPEndpoint address;
// When we last gave the endpoint out for connection attempts
RelativeTime mutable lastGet;
// True if we ever tried to connect
bool mutable checked;
// The result of the last connect attempt
bool mutable canAccept;
};
}
}
#endif

View File

@@ -0,0 +1,125 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_LEGACYENDPOINTCACHE_H_INCLUDED
#define RIPPLE_PEERFINDER_LEGACYENDPOINTCACHE_H_INCLUDED
namespace ripple {
namespace PeerFinder {
/** A container for managing the cache of legacy endpoints. */
class LegacyEndpointCache
{
private:
typedef boost::multi_index_container <
LegacyEndpoint, boost::multi_index::indexed_by <
boost::multi_index::hashed_unique <
BOOST_MULTI_INDEX_MEMBER(PeerFinder::LegacyEndpoint,IPEndpoint,address),
IPEndpoint::hasher>
>
> MapType;
MapType m_map;
public:
typedef std::vector <LegacyEndpoint const*> FlattenedList;
LegacyEndpointCache ()
{
}
~LegacyEndpointCache ()
{
}
/** Attempt to insert the endpoint.
The caller is responsible for making sure the address is valid.
The return value provides a reference to the new or existing endpoint.
The bool indicates whether or not the insertion took place.
*/
std::pair <LegacyEndpoint&, bool> insert (IPEndpoint const& address)
{
std::pair <MapType::iterator, bool> result (
m_map.insert (LegacyEndpoint (address)));
return std::make_pair (*result.first, result.second);
}
/** Returns a pointer to the legacy endpoint or nullptr. */
LegacyEndpoint const* find (IPEndpoint const& address)
{
MapType::iterator iter (m_map.find (address));
if (iter != m_map.end())
return &*iter;
return nullptr;
}
/** Updates the metadata following a connection attempt.
@param canAccept A flag indicating if the connection succeeded.
*/
void checked (IPEndpoint const& address, bool canAccept)
{
LegacyEndpoint const* endpoint (find (address));
if (endpoint != nullptr)
{
endpoint->checked = true;
endpoint->canAccept = canAccept;
}
}
struct Compare
{
bool operator() (LegacyEndpoint const* lhs,
LegacyEndpoint const* rhs) const
{
return lhs->lastGet < rhs->lastGet;
}
};
/** Appends up to n addresses for establishing outbound peers. */
void get (std::size_t n, std::vector <IPEndpoint>& result) const
{
FlattenedList list (flatten());
std::random_shuffle (list.begin(), list.end());
std::sort (list.begin(), list.end(), Compare());
n = std::min (n, list.size());
RelativeTime const now (RelativeTime::fromStartup());
for (FlattenedList::iterator iter (list.begin());
n-- && iter!=list.end(); ++iter)
{
result.push_back ((*iter)->address);
(*iter)->lastGet = now;
}
}
/** Returns a flattened array of pointers to the legacy endpoints. */
FlattenedList flatten () const
{
FlattenedList list;
list.reserve (m_map.size());
for (MapType::iterator iter (m_map.begin());
iter != m_map.end(); ++iter)
list.push_back (&*iter);
return list;
}
};
}
}
#endif

View File

@@ -20,45 +20,9 @@
#ifndef RIPPLE_PEERFINDER_LOGIC_H_INCLUDED
#define RIPPLE_PEERFINDER_LOGIC_H_INCLUDED
#include "../../ripple/types/api/AgedHistory.h"
#include "PeerInfo.h"
#include "Slots.h"
#include "Store.h"
#include <set>
#include "beast/modules/beast_core/system/BeforeBoost.h"
#include <boost/regex.hpp>
#include <boost/unordered_set.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/key_extractors.hpp>
namespace ripple {
namespace PeerFinder {
// Tunable constants
enum
{
// How often we will try to make outgoing connections
secondsPerConnect = 10,
// How often we send or accept mtENDPOINTS messages per peer
secondsPerEndpoints = 5,
// How many Endpoint to send in each mtENDPOINTS
numberOfEndpoints = 10,
// The most Endpoint we will accept in mtENDPOINTS
numberOfEndpointsMax = 20,
// How many legacy endpoints to keep in our cache
numberOfLegacyEndpoints = 1000,
// How often legacy endpoints are updated in the database
legacyEndpointUpdateSeconds = 60 * 60
};
//--------------------------------------------------------------------------
/*
@@ -89,8 +53,6 @@ inline bool operator== (EndpointInfo const& lhs, EndpointInfo const& rhs)
return lhs.endpoint == rhs.endpoint;
}
typedef AgedHistory <std::set <IPEndpoint> > LegacyEndpoints;
//--------------------------------------------------------------------------
typedef boost::multi_index_container <
@@ -138,6 +100,7 @@ public:
Callback& m_callback;
Store& m_store;
Checker& m_checker;
Journal m_journal;
Config m_config;
@@ -150,16 +113,21 @@ public:
// Our view of the current set of connected peers.
Peers m_peers;
LegacyEndpoints m_legacyEndpoints;
bool m_legacyEndpointsDirty;
LegacyEndpointCache m_legacyCache;
bool m_legacyCacheDirty;
//----------------------------------------------------------------------
Logic (Callback& callback, Store& store, Journal journal)
Logic (
Callback& callback,
Store& store,
Checker& checker,
Journal journal)
: m_callback (callback)
, m_store (store)
, m_checker (checker)
, m_journal (journal)
, m_legacyEndpointsDirty (false)
, m_legacyCacheDirty (false)
{
}
@@ -174,8 +142,7 @@ public:
m_store.loadLegacyEndpoints (list);
for (List::const_iterator iter (list.begin());
iter != list.end(); ++iter)
m_legacyEndpoints->insert (*iter);
m_legacyEndpoints.swap();
m_legacyCache.insert (*iter);
m_journal.debug << "Loaded " << list.size() << " legacy endpoints";
}
@@ -253,17 +220,6 @@ public:
//
void createLegacyEndpointList (std::vector <IPEndpoint>& list)
{
list.clear ();
list.reserve (m_legacyEndpoints.front().size() +
m_legacyEndpoints.back().size());
for (LegacyEndpoints::container_type::const_iterator iter (
m_legacyEndpoints.front().begin()); iter != m_legacyEndpoints.front().end(); ++iter)
list.push_back (*iter);
for (LegacyEndpoints::container_type::const_iterator iter (
m_legacyEndpoints.back().begin()); iter != m_legacyEndpoints.back().end(); ++iter)
list.push_back (*iter);
}
// Make outgoing connections to bring us up to desired out count
@@ -272,13 +228,9 @@ public:
{
if (m_slots.outDesired > m_slots.outboundCount)
{
int const needed (m_slots.outDesired - m_slots.outboundCount);
std::vector <IPEndpoint> list;
createLegacyEndpointList (list);
std::random_shuffle (list.begin(), list.end());
int needed = m_slots.outDesired - m_slots.outboundCount;
if (needed > list.size())
needed = list.size();
m_legacyCache.get (needed, list);
#if RIPPLE_USE_PEERFINDER
m_callback.connectPeerEndpoints (list);
@@ -290,6 +242,7 @@ public:
//
void fetch (Source& source)
{
#if 0
m_journal.debug << "Fetching " << source.name();
Source::IPEndpoints endpoints;
@@ -299,16 +252,17 @@ public:
{
for (Source::IPEndpoints::const_iterator iter (endpoints.begin());
iter != endpoints.end(); ++iter)
m_legacyEndpoints->insert (*iter);
m_legacyCache->insert (*iter);
if (m_legacyEndpoints->size() > (numberOfLegacyEndpoints/2))
if (m_legacyCache->size() > (numberOfLegacyEndpoints/2))
{
m_legacyEndpoints.swap();
m_legacyEndpoints->clear();
m_legacyCache.swap();
m_legacyCache->clear();
}
m_legacyEndpointsDirty = true;
m_legacyCacheDirty = true;
}
#endif
}
//----------------------------------------------------------------------
@@ -398,6 +352,75 @@ public:
}
}
// Called when the Checker completes a connectivity test
//
void onCheckEndpoint (PeerID const& id, Checker::Result const& result)
{
if (result.error == boost::asio::error::operation_aborted)
return;
Peers::iterator iter (m_peers.find (id));
if (iter != m_peers.end())
{
PeerInfo const& peer (*iter);
if (! result.error)
{
peer.checked = true;
peer.canAccept = result.canAccept;
if (peer.canAccept)
m_journal.info << "Peer " << peer.address <<
" passed listening test";
else
m_journal.warning << "Peer " << peer.address <<
" cannot accept incoming connections";
}
else
{
// VFALCO TODO Should we retry depending on the error?
peer.checked = true;
peer.canAccept = false;
m_journal.error << "Listening test for " <<
peer.address << " failed: " <<
result.error.message();
}
}
else
{
// The peer disconnected before we finished the check
m_journal.debug << "Finished listening test for " <<
id << " but the peer disconnected. ";
}
}
// Called when the Checker completes a connectivity test for a legacy address
//
void onCheckLegacyEndpoint (IPEndpoint const& endpoint,
Checker::Result const& result)
{
if (result.error == boost::asio::error::operation_aborted)
return;
RelativeTime const now (RelativeTime::fromStartup());
if (! result.error)
{
if (result.canAccept)
m_journal.info << "Legacy address " << endpoint <<
" passed listening test";
else
m_journal.warning << "Legacy address " << endpoint <<
" cannot accept incoming connections";
}
else
{
m_journal.error << "Listening test for legacy address " <<
endpoint << " failed: " << result.error.message();
}
}
// Processes a list of Endpoint received from a peer.
//
void onPeerEndpoints (PeerID const& id, std::vector <Endpoint> endpoints)
@@ -426,6 +449,34 @@ public:
}
// process the list
{
bool foundZeroHops (false);
bool chargedPenalty (false);
for (std::vector <Endpoint>::const_iterator iter (endpoints.begin());
iter != endpoints.end(); ++iter)
{
Endpoint const& endpoint (*iter);
if (endpoint.hops == 0)
{
if (! foundZeroHops)
{
foundZeroHops = true;
m_checker.async_test (endpoint.address.withPort (
endpoint.port), bind (&Logic::onCheckEndpoint,
this, id, _1));
}
else if (! chargedPenalty)
{
// Only charge them once (?)
chargedPenalty = true;
// More than one zero-hops message?!
m_journal.warning << "Charging peer " << peer.address <<
" for sending more than one hops==0 endpoint";
m_callback.chargePeerLoadPenalty (id);
}
}
}
}
peer.whenReceiveEndpoints = now + secondsPerEndpoints;
}
@@ -433,8 +484,6 @@ public:
{
m_journal.warning << "Charging peer " << peer.address <<
" for sending too quickly";
// Peer sent mtENDPOINTS too often
m_callback.chargePeerLoadPenalty (id);
}
}
@@ -446,28 +495,25 @@ public:
void onPeerLegacyEndpoint (IPEndpoint const& ep)
{
if (ep.isPublic())
{
// insert into front container
std::pair <LegacyEndpoints::container_type::iterator, bool> result (
m_legacyEndpoints->insert (ep));
// filter invalid addresses
if (! ep.isPublic())
return;
if (ep.port() == 0)
return;
std::pair <LegacyEndpoint&, bool> result (
m_legacyCache.insert (ep));
// erase from back container if its new
if (result.second)
{
std::size_t const n (m_legacyEndpoints.back().erase (ep));
if (n == 0)
{
m_legacyEndpointsDirty = true;
// its new
m_legacyCacheDirty = true;
m_journal.trace << "Legacy endpoint: " << ep;
}
}
if (m_legacyEndpoints->size() > (numberOfLegacyEndpoints/2))
{
m_legacyEndpoints.swap();
m_legacyEndpoints->clear();
}
m_checker.async_test (ep, bind (
&Logic::onCheckLegacyEndpoint,
this, ep, _1));
}
}
@@ -475,9 +521,10 @@ public:
//
void storeLegacyEndpoints ()
{
if (!m_legacyEndpointsDirty)
if (!m_legacyCacheDirty)
return;
#if 0
std::vector <IPEndpoint> list;
createLegacyEndpointList (list);
@@ -486,7 +533,8 @@ public:
m_store.storeLegacyEndpoints (list);
m_legacyEndpointsDirty = false;
m_legacyCacheDirty = false;
#endif
}
};

View File

@@ -172,9 +172,6 @@ Revised Gnutella Ping Pong Scheme
http://rfc-gnutella.sourceforge.net/src/pong-caching.html
*/
#include "Logic.h"
#include "StoreSqdb.h"
namespace ripple {
namespace PeerFinder {
@@ -188,6 +185,7 @@ public:
ServiceQueue m_queue;
Journal m_journal;
StoreSqdb m_store;
CheckerAdapter m_checker;
Logic m_logic;
DeadlineTimer m_connectTimer;
DeadlineTimer m_endpointsTimer;
@@ -200,11 +198,12 @@ public:
, Thread ("PeerFinder")
, m_journal (journal)
, m_store (journal)
, m_logic (callback, m_store, journal)
, m_checker (m_queue)
, m_logic (callback, m_store, m_checker, journal)
, m_connectTimer (this)
, m_endpointsTimer (this)
{
#if 0
#if 1
#if BEAST_MSVC
if (beast_isRunningUnderDebugger())
{
@@ -281,6 +280,8 @@ public:
void onStop ()
{
m_checker.cancel ();
if (this->Thread::isThreadRunning ())
{
m_journal.debug << "Stopping";

View File

@@ -20,12 +20,6 @@
#ifndef RIPPLE_PEERFINDER_PEERINFO_H_INCLUDED
#define RIPPLE_PEERFINDER_PEERINFO_H_INCLUDED
#include "../../ripple/types/api/AgedHistory.h"
#include "../api/Types.h"
#include <set>
namespace ripple {
namespace PeerFinder {
@@ -42,6 +36,8 @@ struct PeerInfo
: id (id_)
, address (address_)
, inbound (inbound_)
, checked (inbound_ ? false : true)
, canAccept (inbound_ ? false : true)
, whenSendEndpoints (RelativeTime::fromStartup())
, whenReceiveEndpoints (RelativeTime::fromStartup())
{
@@ -51,6 +47,14 @@ struct PeerInfo
IPEndpoint address;
bool inbound;
// Tells us if we checked the connection. Outbound connections
// are always considered checked since we successfuly connected.
bool mutable checked;
// Set to indicate if the connection can receive incoming at the
// address advertised in mtENDPOINTS. Only valid if checked is true
bool mutable canAccept;
// The time after which we will send the peer mtENDPOINTS
RelativeTime mutable whenSendEndpoints;

View File

@@ -20,8 +20,6 @@
#ifndef RIPPLE_PEERFINDER_SLOTS_H_INCLUDED
#define RIPPLE_PEERFINDER_SLOTS_H_INCLUDED
#include "../api/Config.h"
namespace ripple {
namespace PeerFinder {

View File

@@ -17,8 +17,6 @@
*/
//==============================================================================
#include "SourceStrings.h"
namespace ripple {
namespace PeerFinder {

View File

@@ -20,8 +20,6 @@
#ifndef RIPPLE_PEERFINDER_SOURCESTRINGS_H_INCLUDED
#define RIPPLE_PEERFINDER_SOURCESTRINGS_H_INCLUDED
#include "Source.h"
namespace ripple {
namespace PeerFinder {

View File

@@ -20,10 +20,6 @@
#ifndef RIPPLE_PEERFINDER_STORESQDB_H_INCLUDED
#define RIPPLE_PEERFINDER_STORESQDB_H_INCLUDED
#include "beast/modules/beast_sqdb/beast_sqdb.h"
#include "Store.h"
namespace ripple {
namespace PeerFinder {

View File

@@ -17,8 +17,6 @@
*/
//==============================================================================
#include "Logic.h"
namespace ripple {
namespace PeerFinder {

View File

@@ -0,0 +1,51 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_PEERFINDER_TUNING_H_INCLUDED
#define RIPPLE_PEERFINDER_TUNING_H_INCLUDED
namespace ripple {
namespace PeerFinder {
// Tunable constants
enum
{
// How often we will try to make outgoing connections
secondsPerConnect = 10,
// How often we send or accept mtENDPOINTS messages per peer
secondsPerEndpoints = 5,
// How many Endpoint to send in each mtENDPOINTS
numberOfEndpoints = 10,
// The most Endpoint we will accept in mtENDPOINTS
numberOfEndpointsMax = 20,
// How many legacy endpoints to keep in our cache
numberOfLegacyEndpoints = 1000,
// How often legacy endpoints are updated in the database
legacyEndpointUpdateSeconds = 60 * 60
};
}
}
#endif

View File

@@ -17,21 +17,49 @@
*/
//==============================================================================
#include "BeastConfig.h"
#include "ripple_peerfinder.h"
#include "../../ripple/types/api/AgedHistory.h"
#include <set>
#include "beast/modules/beast_core/system/BeforeBoost.h"
#include <boost/optional.hpp>
#include <boost/regex.hpp>
#include <boost/unordered_set.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/key_extractors.hpp>
#include "beast/modules/beast_sqdb/beast_sqdb.h"
#include "beast/modules/beast_asio/beast_asio.h"
namespace ripple {
using namespace beast;
}
# include "impl/Source.h"
# include "impl/SourceStrings.h"
#include "impl/SourceStrings.cpp"
# include "impl/Checker.h"
#include "impl/CheckerAdapter.h"
#include "impl/EndpointCache.h"
#include "impl/Slots.h"
#include "impl/Source.h"
#include "impl/SourceStrings.h"
# include "impl/LegacyEndpoint.h"
# include "impl/LegacyEndpointCache.h"
# include "impl/PeerInfo.h"
# include "impl/Store.h"
# include "impl/Tuning.h"
#include "impl/StoreSqdb.h"
#include "impl/Logic.h"
#include "impl/Checker.cpp"
#include "impl/Config.cpp"
#include "impl/Endpoint.cpp"
#include "impl/EndpointCache.cpp"
#include "impl/Manager.cpp"
#include "impl/Slots.cpp"
#include "impl/SourceStrings.cpp"
#include "impl/Tests.cpp"

View File

@@ -28,10 +28,10 @@ using namespace beast;
#include "../types/api/RipplePublicKey.h"
#include "api/Types.h"
#include "api/Endpoint.h"
#include "api/Config.h"
# include "api/Endpoint.h"
# include "api/Types.h"
#include "api/Callback.h"
#include "api/Config.h"
#include "api/Manager.h"
#endif

View File

@@ -0,0 +1,110 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_TYPES_CYCLEDSET_H_INCLUDED
#define RIPPLE_TYPES_CYCLEDSET_H_INCLUDED
namespace ripple {
/** Cycled set of unique keys.
This provides a system of remembering a set of keys, with aging. Two
containers are kept. When one container fills, the other is cleared
and a swap is performed. A key is considered present if it is in either
container.
*/
template <class Key,
class Hash = typename Key::hasher,
class KeyEqual = std::equal_to <Key>,
class Allocator = std::allocator <Key> >
class CycledSet
{
private:
typedef boost::unordered_set<
Key, Hash, KeyEqual, Allocator> ContainerType;
typedef typename ContainerType::iterator iterator;
public:
typedef typename ContainerType::key_type key_type;
typedef typename ContainerType::value_type value_type;
typedef typename ContainerType::size_type size_type;
typedef typename ContainerType::difference_type difference_type;
typedef typename ContainerType::hasher hasher;
typedef typename ContainerType::key_equal key_equal;
typedef typename ContainerType::allocator_type allocator_type;
typedef typename ContainerType::reference reference;
typedef typename ContainerType::const_reference const_reference;
typedef typename ContainerType::pointer pointer;
typedef typename ContainerType::const_pointer const_pointer;
explicit CycledSet (
size_type item_max,
Hash hash = Hash(),
KeyEqual equal = KeyEqual(),
Allocator alloc = Allocator())
: m_max (item_max)
, m_hash (hash)
, m_equal (equal)
, m_alloc (alloc)
, m_front (m_max, hash, equal, alloc)
, m_back (m_max, hash, equal, alloc)
{
}
// Returns `true` if the next real insert would swap
bool full() const
{
return m_front.size() >= m_max;
}
// Adds the key to the front if its not in either map
bool insert (key_type const& key)
{
if (full())
cycle ();
if (m_back.find (key) != m_back.end())
return false;
std::pair <iterator, bool> result (
m_front.insert (key));
if (result.second)
return true;
return false;
}
void cycle ()
{
std::swap (m_front, m_back);
m_front.clear ();
#if BOOST_VERSION > 105400
m_front.reserve (m_max);
#endif
}
private:
size_type m_max;
hasher m_hash;
key_equal m_equal;
allocator_type m_alloc;
ContainerType m_front;
ContainerType m_back;
};
}
#endif

View File

@@ -26,6 +26,7 @@
#include "beast/modules/beast_core/system/BeforeBoost.h"
#include <boost/functional/hash.hpp>
#include <boost/unordered_set.hpp>
// For ByteOrder
#if BEAST_WIN32
@@ -43,6 +44,7 @@ using namespace beast;
}
#include "api/AgedHistory.h"
#include "api/CycledSet.h"
# include "api/Blob.h"
# include "api/Base58.h"
# include "api/ByteOrder.h"

View File

@@ -153,94 +153,6 @@ private:
Info m_back_info;
};
//------------------------------------------------------------------------------
/** Cycled set of unique keys. */
template <class Key,
class Hash = typename Key::hasher,
class KeyEqual = std::equal_to <Key>,
class Allocator = std::allocator <Key> >
class CycledSet
{
private:
typedef boost::unordered_set <
Key, Hash, KeyEqual, Allocator> ContainerType;
typedef typename ContainerType::iterator iterator;
public:
typedef typename ContainerType::key_type key_type;
typedef typename ContainerType::value_type value_type;
typedef typename ContainerType::size_type size_type;
typedef typename ContainerType::difference_type difference_type;
typedef typename ContainerType::hasher hasher;
typedef typename ContainerType::key_equal key_equal;
typedef typename ContainerType::allocator_type allocator_type;
typedef typename ContainerType::reference reference;
typedef typename ContainerType::const_reference const_reference;
typedef typename ContainerType::pointer pointer;
typedef typename ContainerType::const_pointer const_pointer;
explicit CycledSet (
size_type item_max,
Hash hash = Hash(),
KeyEqual equal = KeyEqual(),
Allocator alloc = Allocator())
: m_max (item_max)
, m_hash (hash)
, m_equal (equal)
, m_alloc (alloc)
, m_front (m_max, hash, equal, alloc)
, m_back (m_max, hash, equal, alloc)
{
}
// Returns `true` if the next real insert would swap
bool full() const
{
return m_front.size() >= m_max;
}
// Adds the key to the front if its not in either map
bool insert (key_type const& key)
{
if (full())
cycle ();
if (m_back.find (key) != m_back.end())
return false;
std::pair <iterator, bool> result (
m_front.insert (key));
if (result.second)
return true;
return false;
}
#if 0
bool find (key_type const& key)
{
if (m_front.find (key) != m_front.end())
return true;
return m_back.find (key) != m_back.end();
}
#endif
void cycle ()
{
std::swap (m_front, m_back);
m_front.clear ();
#if BOOST_VERSION > 105400
m_front.reserve (m_max);
#endif
}
private:
size_type m_max;
hasher m_hash;
key_equal m_equal;
allocator_type m_alloc;
ContainerType m_front;
ContainerType m_back;
};
}
}