Refactor PeerFinder:

* Revise documentation in README.md
* Inject abstract_clock in Manager
* Introduce the Slot object as a replacement for Peer
* New bullet-proof method for slot accounting
* Replace Peer with Slot for tracking connections
* Prevent duplicate outbound connection attempts
* Improved connection and bootstrap business logic
* Refactor PeerImp, PeersImp private interfaces
* Give PeersImp access to the PeerImp interface
* Handle errors retrieving endpoints from asio sockets
* Use weak_ptr to manage PeerImp lifetime
* Better handling of socket closure in PeerImp
* Improve the orderly shutdown logic of PeersImp
This commit is contained in:
Vinnie Falco
2014-02-06 12:23:13 -08:00
parent 12748e7539
commit 3a1a5d12de
35 changed files with 2427 additions and 2455 deletions

View File

@@ -30,11 +30,12 @@ class ManagerImp
public:
ServiceQueue m_queue;
SiteFiles::Manager& m_siteFiles;
clock_type& m_clock;
Journal m_journal;
StoreSqdb m_store;
SerializedContext m_context;
CheckerAdapter m_checker;
LogicType <SimpleMonotonicClock> m_logic;
Logic m_logic;
DeadlineTimer m_connectTimer;
DeadlineTimer m_messageTimer;
DeadlineTimer m_cacheTimer;
@@ -45,14 +46,16 @@ public:
Stoppable& stoppable,
SiteFiles::Manager& siteFiles,
Callback& callback,
clock_type& clock,
Journal journal)
: Manager (stoppable)
, Thread ("PeerFinder")
, m_siteFiles (siteFiles)
, m_clock (clock)
, m_journal (journal)
, m_store (journal)
, m_checker (m_context, m_queue)
, m_logic (callback, m_store, m_checker, journal)
, m_logic (clock, callback, m_store, m_checker, journal)
, m_connectTimer (this)
, m_messageTimer (this)
, m_cacheTimer (this)
@@ -103,72 +106,48 @@ public:
//--------------------------------------------------------------------------
void onPeerAccept (IPAddress const& local_address,
IPAddress const& remote_address)
Slot::ptr new_inbound_slot (
IP::Endpoint const& local_endpoint,
IP::Endpoint const& remote_endpoint)
{
m_queue.dispatch (
m_context.wrap (
bind (&Logic::onPeerAccept, &m_logic,
local_address, remote_address)));
return m_logic.new_inbound_slot (local_endpoint, remote_endpoint);
}
void onPeerConnect (IPAddress const& address)
Slot::ptr new_outbound_slot (IP::Endpoint const& remote_endpoint)
{
m_queue.dispatch (
m_context.wrap (
bind (&Logic::onPeerConnect, &m_logic,
address)));
return m_logic.new_outbound_slot (remote_endpoint);
}
void onPeerConnected (IPAddress const& local_address,
IPAddress const& remote_address)
void on_connected (Slot::ptr const& slot,
IP::Endpoint const& local_endpoint)
{
m_queue.dispatch (
m_context.wrap (
bind (&Logic::onPeerConnected, &m_logic,
local_address, remote_address)));
SlotImp::ptr impl (std::dynamic_pointer_cast <SlotImp> (slot));
m_logic.on_connected (impl, local_endpoint);
}
void onPeerAddressChanged (
IPAddress const& currentAddress, IPAddress const& newAddress)
void on_handshake (Slot::ptr const& slot,
RipplePublicKey const& key, bool cluster)
{
m_queue.dispatch (
m_context.wrap (
bind (&Logic::onPeerAddressChanged, &m_logic,
currentAddress, newAddress)));
SlotImp::ptr impl (std::dynamic_pointer_cast <SlotImp> (slot));
m_logic.on_handshake (impl, key, cluster);
}
void onPeerHandshake (IPAddress const& address, PeerID const& id,
bool cluster)
{
m_queue.dispatch (
m_context.wrap (
bind (&Logic::onPeerHandshake, &m_logic,
address, id, cluster)));
}
void onPeerClosed (IPAddress const& address)
{
m_queue.dispatch (
m_context.wrap (
bind (&Logic::onPeerClosed, &m_logic,
address)));
}
void onPeerEndpoints (IPAddress const& address,
void on_endpoints (Slot::ptr const& slot,
Endpoints const& endpoints)
{
m_queue.dispatch (
beast::bind (&Logic::onPeerEndpoints, &m_logic,
address, endpoints));
SlotImp::ptr impl (std::dynamic_pointer_cast <SlotImp> (slot));
m_logic.on_endpoints (impl, endpoints);
}
void onLegacyEndpoints (IPAddresses const& addresses)
void on_legacy_endpoints (IPAddresses const& addresses)
{
m_queue.dispatch (
m_context.wrap (
beast::bind (&Logic::onLegacyEndpoints, &m_logic,
addresses)));
m_logic.on_legacy_endpoints (addresses);
}
void on_closed (Slot::ptr const& slot)
{
SlotImp::ptr impl (std::dynamic_pointer_cast <SlotImp> (slot));
m_logic.on_closed (impl);
}
//--------------------------------------------------------------------------
@@ -281,7 +260,7 @@ public:
{
m_queue.dispatch (
m_context.wrap (
bind (&Logic::sendEndpoints, &m_logic)));
bind (&Logic::broadcast, &m_logic)));
m_messageTimer.setExpiration (Tuning::secondsPerMessage);
}
@@ -359,9 +338,10 @@ Manager* Manager::New (
Stoppable& parent,
SiteFiles::Manager& siteFiles,
Callback& callback,
clock_type& clock,
Journal journal)
{
return new ManagerImp (parent, siteFiles, callback, journal);
return new ManagerImp (parent, siteFiles, callback, clock, journal);
}
}