20 #ifndef RIPPLE_PEERFINDER_LOGIC_H_INCLUDED
21 #define RIPPLE_PEERFINDER_LOGIC_H_INCLUDED
23 #include <ripple/basics/Log.h>
24 #include <ripple/basics/contract.h>
25 #include <ripple/basics/random.h>
26 #include <ripple/beast/container/aged_container_utility.h>
27 #include <ripple/beast/net/IPAddressConversion.h>
28 #include <ripple/peerfinder/PeerfinderManager.h>
29 #include <ripple/peerfinder/impl/Bootcache.h>
30 #include <ripple/peerfinder/impl/Counts.h>
31 #include <ripple/peerfinder/impl/Fixed.h>
32 #include <ripple/peerfinder/impl/Handouts.h>
33 #include <ripple/peerfinder/impl/Livecache.h>
34 #include <ripple/peerfinder/impl/Reporting.h>
35 #include <ripple/peerfinder/impl/SlotImp.h>
36 #include <ripple/peerfinder/impl/Source.h>
37 #include <ripple/peerfinder/impl/Store.h>
38 #include <ripple/peerfinder/impl/iosformat.h>
47 namespace PeerFinder {
53 template <
class Checker>
188 if (addresses.
empty())
191 <<
"Could not resolve fixed slot '" << name <<
"'";
195 for (
auto const& remote_address : addresses)
197 if (remote_address.port() == 0)
199 Throw<std::runtime_error>(
200 "Port not specified for address:" +
201 remote_address.to_string());
204 auto result(
fixed_.emplace(
205 std::piecewise_construct,
213 <<
"' at " << remote_address;
226 boost::system::error_code ec)
228 if (ec == boost::asio::error::operation_aborted)
237 <<
beast::leftw(18) <<
"Logic tested " << checkedAddress
238 <<
" but the connection was closed";
252 <<
" with error, " << ec.message();
260 << checkedAddress <<
" succeeded";
271 <<
beast::leftw(18) <<
"Logic accept" << remote_endpoint
272 <<
" on local " << local_endpoint;
277 if (is_public(remote_endpoint))
285 << remote_endpoint <<
" because of ip limits.";
297 auto const result(
slots_.
emplace(slot->remote_endpoint(), slot));
299 assert(result.second);
306 return result.first->second;
314 <<
beast::leftw(18) <<
"Logic connect " << remote_endpoint;
322 <<
beast::leftw(18) <<
"Logic dropping " << remote_endpoint
323 <<
" as duplicate connect";
332 auto const result =
slots_.
emplace(slot->remote_endpoint(), slot);
334 assert(result.second);
342 return result.first->second;
351 <<
beast::leftw(18) <<
"Logic connected" << slot->remote_endpoint()
352 <<
" on local " << local_endpoint;
359 slot->local_endpoint(local_endpoint);
367 iter->second->local_endpoint() == slot->remote_endpoint());
370 << slot->remote_endpoint() <<
" as self connect";
386 <<
beast::leftw(18) <<
"Logic handshake " << slot->remote_endpoint()
387 <<
" with " << (reserved ?
"reserved " :
"") <<
"key " << key;
404 slot->reserved(reserved);
410 if (!slot->inbound())
417 slot->public_key(key);
419 [[maybe_unused]]
bool const inserted =
keys_.insert(key).second;
429 if (!slot->inbound())
433 if (slot->fixed() && !slot->inbound())
435 auto iter(
fixed_.find(slot->remote_endpoint()));
438 "PeerFinder::Logic::activate(): remote_endpoint "
439 "missing from fixed_");
443 << slot->remote_endpoint() <<
" success";
460 return std::move(h.
list());
485 for (
auto const& s :
slots_)
537 << ((h.
list().
size() > 1) ?
"endpoints" :
"endpoint");
575 << ((h.
list().
size() > 1) ?
"addresses" :
"address");
603 [&slots](Slots::value_type
const& value) {
604 if (value.second->state() == Slot::active)
605 slots.emplace_back(value.second);
615 targets.emplace_back(slot);
644 for (
auto& t : targets)
657 for (
auto const& t : targets)
660 auto const& list = t.list();
663 << slot->remote_endpoint() <<
" with " << list.size()
664 << ((list.size() == 1) ?
" endpoint" :
" endpoints");
683 for (
auto const& entry :
slots_)
684 entry.second->expire();
698 bool neighbor(
false);
699 for (
auto iter = list.
begin(); iter != list.
end();)
708 <<
" for excess hops " << ep.
hops;
709 iter = list.
erase(iter);
721 slot->remote_endpoint().at_port(ep.
address.
port());
727 <<
" for extra self";
728 iter = list.
erase(iter);
737 << ep.
address <<
" as invalid";
738 iter = list.
erase(iter);
746 [ep](Endpoints::value_type
const& other) {
747 return ep.address == other.address;
751 << ep.
address <<
" as duplicate";
752 iter = list.
erase(iter);
776 <<
beast::leftw(18) <<
"Endpoints from " << slot->remote_endpoint()
777 <<
" contained " << list.
size()
778 << ((list.
size() > 1) ?
" entries" :
" entry");
791 if (slot->whenAcceptEndpoints > now)
796 for (
auto const& ep : list)
798 assert(ep.hops != 0);
800 slot->recent.insert(ep.address, ep.hops);
807 if (slot->connectivityCheckInProgress)
811 <<
" already in progress";
818 slot->connectivityCheckInProgress =
true;
828 slot->remote_endpoint(),
830 std::placeholders::_1));
841 if (!slot->canAccept)
862 auto const iter =
slots_.
find(slot->remote_endpoint());
866 "PeerFinder::Logic::remove(): remote_endpoint "
867 "missing from slots_");
873 if (slot->public_key() != std::nullopt)
875 auto const iter =
keys_.find(*slot->public_key());
877 if (iter ==
keys_.end())
879 "PeerFinder::Logic::remove(): public_key missing "
891 "PeerFinder::Logic::remove(): remote_endpont "
892 "address missing from connectedAddresses_");
909 if (slot->fixed() && !slot->inbound() && slot->state() !=
Slot::active)
911 auto iter(
fixed_.find(slot->remote_endpoint()));
914 "PeerFinder::Logic::on_closed(): remote_endpont "
915 "missing from fixed_");
919 << slot->remote_endpoint() <<
" failed";
923 switch (slot->state())
927 << slot->remote_endpoint() <<
" failed";
942 << slot->remote_endpoint();
947 << slot->remote_endpoint();
965 template <
class FwdIter>
970 boost::asio::ip::tcp::endpoint
const& remote_address);
979 for (
auto const& entry :
fixed_)
980 if (entry.first == endpoint)
991 for (
auto const& entry :
fixed_)
992 if (entry.first.address() == address)
1004 template <
class Container>
1012 for (
auto iter =
fixed_.begin(); needed && iter !=
fixed_.end(); ++iter)
1014 auto const& address(iter->first.address());
1015 if (iter->second.when() <= now &&
1016 squelches.
find(address) == squelches.
end() &&
1020 [address](Slots::value_type
const& v) {
1021 return address == v.first.address();
1024 squelches.
insert(iter->first.address());
1025 c.push_back(iter->first);
1059 for (
auto addr : list)
1098 <<
beast::leftw(18) <<
"Logic added " << count <<
" new "
1099 << ((count == 1) ?
"address" :
"addresses") <<
" from "
1105 <<
"'" << source->name() <<
"' fetch, "
1106 << results.
error.message();
1120 if (is_unspecified(address))
1122 if (!is_public(address))
1124 if (address.
port() == 0)
1138 for (
auto const& entry : slots)
1141 SlotImp const& slot(*entry.second);
1146 item[
"inbound"] =
"yes";
1148 item[
"fixed"] =
"yes";
1150 item[
"reserved"] =
"yes";
1229 template <
class Checker>
1230 template <
class FwdIter>
1235 boost::asio::ip::tcp::endpoint
const& remote_address)
1243 JLOG(m_journal.trace()) <<
beast::leftw(18) <<
"Logic add " << n
1244 <<
" redirect IPs from " << remote_address;
Tests remote listening sockets to make sure they are connectible.
bool insertStatic(beast::IP::Endpoint const &endpoint)
Add a staticallyconfigured address to the cache.
std::vector< std::shared_ptr< Source > > m_sources
bool inbound() const override
Returns true if this is an inbound connection.
void get_fixed(std::size_t needed, Container &c, typename ConnectHandouts::Squelches &squelches)
Adds eligible Fixed addresses for outbound attempts.
Stream trace() const
Severity stream access functions.
Receives handouts for making automatic connections.
The Logic for maintaining the list of Slot addresses.
std::set< PublicKey > keys_
SlotImp::ptr new_outbound_slot(beast::IP::Endpoint const &remote_endpoint)
void on_failure(SlotImp::ptr const &slot)
Logic(clock_type &clock, Store &store, Checker &checker, beast::Journal journal)
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
constexpr std::chrono::seconds secondsPerMessage(151)
clock_type::time_point m_whenBroadcast
Receives handouts for redirecting a connection.
void on_failure(beast::IP::Endpoint const &endpoint)
Called when an outbound connection attempt fails to handshake.
std::map< beast::IP::Endpoint, Fixed > fixed_
Address const & address() const
Returns the address portion of this endpoint.
auto insert(value_type const &value) -> typename std::enable_if<!maybe_multi, std::pair< iterator, bool >>::type
void onRedirects(FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint const &remote_address)
void addFixedPeer(std::string const &name, std::vector< beast::IP::Endpoint > const &addresses)
constexpr std::uint32_t numberOfEndpointsMax
Counts const & counts() const
void periodicActivity()
Stores the cache in the persistent database on a timer.
Abstract persistence for PeerFinder data.
bool can_activate(Slot const &s) const
Returns true if the slot can become active.
int inboundSlots() const
Returns the total number of inbound slots.
void load()
Load the persisted data from the Store into the container.
virtual time_point now() const =0
Returns the current time.
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
void insert(Endpoint const &ep)
Creates or updates an existing Element based on a new message.
int out_max() const
Returns the total number of outbound slots.
Manages the count of available connections for the various slots.
std::vector< Endpoint > & list()
map_type::size_type size() const
Returns the number of entries in the cache.
bool wantIncoming
true if we want to accept incoming connections.
bool onConnected(SlotImp::ptr const &slot, beast::IP::Endpoint const &local_endpoint)
iterator find(K const &k)
boost::asio::ip::address Address
void addStaticSource(std::shared_ptr< Source > const &source)
std::recursive_mutex lock_
Stores IP addresses useful for gaining initial connections.
std::vector< std::pair< std::shared_ptr< Slot >, std::vector< Endpoint > > > buildEndpointsForPeers()
bool insert(beast::IP::Endpoint const &endpoint)
Add a newly-learned address to the cache.
std::vector< Endpoint > redirect(SlotImp::ptr const &slot)
Return a list of addresses suitable for redirection.
void onWrite(beast::PropertyStream::Map &map)
int out_active() const
Returns the number of outbound peers assigned an open slot.
const_iterator end() const
void shuffle()
Shuffle each hop list.
reverse_iterator rbegin()
std::multiset< beast::IP::Address > connectedAddresses_
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
beast::xor_shift_engine & default_prng()
Return the default random engine.
void stop()
Stop the logic.
std::vector< beast::IP::Endpoint > autoconnect()
Create new outbound connection attempts as needed.
Port port() const
Returns the port number on the endpoint.
void onConfig(Config const &config)
Called when the config is set or changed.
bool fixed(beast::IP::Address const &address) const
The Livecache holds the short-lived relayed Endpoint messages.
std::optional< beast::IP::Endpoint > const & local_endpoint() const override
The local endpoint of the socket, when known.
void addFixedPeer(std::string const &name, beast::IP::Endpoint const &ep)
A generic endpoint for log messages.
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
T forward_as_tuple(T... args)
constexpr std::chrono::seconds recentAttemptDuration(60)
void checkComplete(beast::IP::Endpoint const &remoteAddress, beast::IP::Endpoint const &checkedAddress, boost::system::error_code ec)
std::shared_ptr< Source > fetchSource_
Result activate(SlotImp::ptr const &slot, PublicKey const &key, bool reserved)
void addSource(std::shared_ptr< Source > const &source)
boost::asio::ip::address_v6 AddressV6
boost::system::error_code error
void fetch(std::shared_ptr< Source > const &source)
void onWrite(beast::PropertyStream::Map &map)
Write the cache state to the property stream.
static std::string stateString(Slot::State state)
void on_success(beast::IP::Endpoint const &endpoint)
Called when an outbound connection handshake completes.
void remove(SlotImp::ptr const &slot)
class ripple::PeerFinder::Livecache::hops_t hops
Left justifies a field at the specified width.
void handout(TargetFwdIter first, TargetFwdIter last, SeqFwdIter seq_first, SeqFwdIter seq_last)
Distributes objects to targets according to business rules.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::uint16_t listeningPort
The listening port number.
Associative container where each element is also indexed by time.
std::size_t fixed_active() const
Returns the number of active fixed connections.
bool fixed(beast::IP::Endpoint const &endpoint) const
bool try_insert(beast::IP::Endpoint const &endpoint)
std::size_t attempts() const
Returns the number of outbound connection attempts.
void async_connect(beast::IP::Endpoint const &endpoint, Handler &&handler)
Performs an async connection test on the specified endpoint.
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
void set_listening_port(std::uint16_t port)
void add(Slot const &s)
Adds the slot state and properties to the slot counts.
void expire()
Erase entries whose time has expired.
SlotImp::ptr new_inbound_slot(beast::IP::Endpoint const &local_endpoint, beast::IP::Endpoint const &remote_endpoint)
void touch(beast::detail::aged_container_iterator< is_const, Iterator > pos)
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
void writeSlots(beast::PropertyStream::Set &set, Slots const &slots)
void preprocess(SlotImp::ptr const &slot, Endpoints &list)
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
const_iterator begin() const
IP::Endpoint iterators that traverse in decreasing valence.
bool is_valid_address(beast::IP::Endpoint const &address)
A version-independent IP address and port combination.
std::shared_ptr< SlotImp > ptr
bool autoConnect
true if we want to establish connections automatically
constexpr std::uint32_t maxHops
PeerFinder configuration settings.
std::size_t attempts_needed() const
Returns the number of attempts needed to bring us to the max.
void on_closed(SlotImp::ptr const &slot)
State state() const override
Returns the state of the connection.
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Describes a connectible peer address along with some metadata.
void config(Config const &c)
bool reserved() const override
Returns true if this is a reserved connection.
void remove(Slot const &s)
Removes the slot state and properties from the slot counts.
typename Clock::time_point time_point
ConnectHandouts::Squelches m_squelches
void onWrite(beast::PropertyStream::Map &map)
Write the configuration into a property stream.
beast::IP::Endpoint address
int ipLimit
Limit how many incoming connections we allow per IP.
bool connectivityCheckInProgress
bool fixed() const override
Returns true if this is a fixed connection.
beast::IP::Endpoint const & remote_endpoint() const override
The remote endpoint of socket.
int addBootcacheAddresses(IPAddresses const &list)
void on_endpoints(SlotImp::ptr const &slot, Endpoints list)
Result
Possible results from activating a slot.