rippled
Loading...
Searching...
No Matches
src/xrpld/peerfinder/detail/Logic.h
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#ifndef RIPPLE_PEERFINDER_LOGIC_H_INCLUDED
21#define RIPPLE_PEERFINDER_LOGIC_H_INCLUDED
22
23#include <xrpld/peerfinder/PeerfinderManager.h>
24#include <xrpld/peerfinder/detail/Bootcache.h>
25#include <xrpld/peerfinder/detail/Counts.h>
26#include <xrpld/peerfinder/detail/Fixed.h>
27#include <xrpld/peerfinder/detail/Handouts.h>
28#include <xrpld/peerfinder/detail/Livecache.h>
29#include <xrpld/peerfinder/detail/SlotImp.h>
30#include <xrpld/peerfinder/detail/Source.h>
31#include <xrpld/peerfinder/detail/Store.h>
32#include <xrpld/peerfinder/detail/iosformat.h>
33
34#include <xrpl/basics/Log.h>
35#include <xrpl/basics/contract.h>
36#include <xrpl/basics/random.h>
37#include <xrpl/beast/net/IPAddressConversion.h>
38
39#include <algorithm>
40#include <functional>
41#include <map>
42#include <memory>
43#include <set>
44
45namespace ripple {
46namespace PeerFinder {
47
52template <class Checker>
53class Logic
54{
55public:
56 // Maps remote endpoints to slots. Since a slot has a
57 // remote endpoint upon construction, this holds all counts.
58 //
60
65
67
68 // True if we are stopping.
69 bool stopping_ = false;
70
71 // The source we are currently fetching.
72 // This is used to cancel I/O during program exit.
74
75 // Configuration settings
77
78 // Slot counts and other aggregate statistics.
80
81 // A list of slots that should always be connected
83
84 // Live livecache from mtENDPOINTS messages
86
87 // LiveCache of addresses suitable for gaining initial connections
89
90 // Holds all counts
92
93 // The addresses (but not port) we are connected to. This includes
94 // outgoing connection attempts. Note that this set can contain
95 // duplicates (since the port is not set)
97
98 // Set of public keys belonging to active peers
100
101 // A list of dynamic sources to consult as a fallback
103
105
107
108 //--------------------------------------------------------------------------
109
111 clock_type& clock,
112 Store& store,
113 Checker& checker,
114 beast::Journal journal)
115 : m_journal(journal)
116 , m_clock(clock)
117 , m_store(store)
118 , m_checker(checker)
119 , livecache_(m_clock, journal)
120 , bootcache_(store, m_clock, journal)
121 , m_whenBroadcast(m_clock.now())
123 {
124 config({});
125 }
126
127 // Load persistent state information from the Store
128 //
129 void
131 {
134 }
135
142 void
144 {
146 stopping_ = true;
147 if (fetchSource_ != nullptr)
148 fetchSource_->cancel();
149 }
150
151 //--------------------------------------------------------------------------
152 //
153 // Manager
154 //
155 //--------------------------------------------------------------------------
156
157 void
158 config(Config const& c)
159 {
161 config_ = c;
163 }
164
165 Config
167 {
169 return config_;
170 }
171
172 void
174 {
176 v.push_back(ep);
177 addFixedPeer(name, v);
178 }
179
180 void
182 std::string const& name,
183 std::vector<beast::IP::Endpoint> const& addresses)
184 {
186
187 if (addresses.empty())
188 {
189 JLOG(m_journal.info())
190 << "Could not resolve fixed slot '" << name << "'";
191 return;
192 }
193
194 for (auto const& remote_address : addresses)
195 {
196 if (remote_address.port() == 0)
197 {
198 Throw<std::runtime_error>(
199 "Port not specified for address:" +
200 remote_address.to_string());
201 }
202
203 auto result(fixed_.emplace(
205 std::forward_as_tuple(remote_address),
207
208 if (result.second)
209 {
210 JLOG(m_journal.debug())
211 << beast::leftw(18) << "Logic add fixed '" << name
212 << "' at " << remote_address;
213 return;
214 }
215 }
216 }
217
218 //--------------------------------------------------------------------------
219
220 // Called when the Checker completes a connectivity test
221 void
223 beast::IP::Endpoint const& remoteAddress,
224 beast::IP::Endpoint const& checkedAddress,
225 boost::system::error_code ec)
226 {
227 if (ec == boost::asio::error::operation_aborted)
228 return;
229
231 auto const iter(slots_.find(remoteAddress));
232 if (iter == slots_.end())
233 {
234 // The slot disconnected before we finished the check
235 JLOG(m_journal.debug())
236 << beast::leftw(18) << "Logic tested " << checkedAddress
237 << " but the connection was closed";
238 return;
239 }
240
241 SlotImp& slot(*iter->second);
242 slot.checked = true;
243 slot.connectivityCheckInProgress = false;
244
245 if (ec)
246 {
247 // VFALCO TODO Should we retry depending on the error?
248 slot.canAccept = false;
249 JLOG(m_journal.error())
250 << beast::leftw(18) << "Logic testing " << iter->first
251 << " with error, " << ec.message();
252 bootcache_.on_failure(checkedAddress);
253 return;
254 }
255
256 slot.canAccept = true;
257 slot.set_listening_port(checkedAddress.port());
258 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic testing "
259 << checkedAddress << " succeeded";
260 }
261
262 //--------------------------------------------------------------------------
263
266 beast::IP::Endpoint const& local_endpoint,
267 beast::IP::Endpoint const& remote_endpoint)
268 {
269 JLOG(m_journal.debug())
270 << beast::leftw(18) << "Logic accept" << remote_endpoint
271 << " on local " << local_endpoint;
272
274
275 // Check for connection limit per address
276 if (is_public(remote_endpoint))
277 {
278 auto const count =
279 connectedAddresses_.count(remote_endpoint.address());
280 if (count > config_.ipLimit)
281 {
282 JLOG(m_journal.debug())
283 << beast::leftw(18) << "Logic dropping inbound "
284 << remote_endpoint << " because of ip limits.";
285 return SlotImp::ptr();
286 }
287 }
288
289 // Check for duplicate connection
290 if (slots_.find(remote_endpoint) != slots_.end())
291 {
292 JLOG(m_journal.debug())
293 << beast::leftw(18) << "Logic dropping " << remote_endpoint
294 << " as duplicate incoming";
295 return SlotImp::ptr();
296 }
297
298 // Create the slot
299 SlotImp::ptr const slot(std::make_shared<SlotImp>(
300 local_endpoint,
301 remote_endpoint,
302 fixed(remote_endpoint.address()),
303 m_clock));
304 // Add slot to table
305 auto const result(slots_.emplace(slot->remote_endpoint(), slot));
306 // Remote address must not already exist
307 XRPL_ASSERT(
308 result.second,
309 "ripple::PeerFinder::Logic::new_inbound_slot : remote endpoint "
310 "inserted");
311 // Add to the connected address list
312 connectedAddresses_.emplace(remote_endpoint.address());
313
314 // Update counts
315 counts_.add(*slot);
316
317 return result.first->second;
318 }
319
320 // Can't check for self-connect because we don't know the local endpoint
323 {
324 JLOG(m_journal.debug())
325 << beast::leftw(18) << "Logic connect " << remote_endpoint;
326
328
329 // Check for duplicate connection
330 if (slots_.find(remote_endpoint) != slots_.end())
331 {
332 JLOG(m_journal.debug())
333 << beast::leftw(18) << "Logic dropping " << remote_endpoint
334 << " as duplicate connect";
335 return SlotImp::ptr();
336 }
337
338 // Create the slot
339 SlotImp::ptr const slot(std::make_shared<SlotImp>(
340 remote_endpoint, fixed(remote_endpoint), m_clock));
341
342 // Add slot to table
343 auto const result = slots_.emplace(slot->remote_endpoint(), slot);
344 // Remote address must not already exist
345 XRPL_ASSERT(
346 result.second,
347 "ripple::PeerFinder::Logic::new_outbound_slot : remote endpoint "
348 "inserted");
349
350 // Add to the connected address list
351 connectedAddresses_.emplace(remote_endpoint.address());
352
353 // Update counts
354 counts_.add(*slot);
355
356 return result.first->second;
357 }
358
359 bool
361 SlotImp::ptr const& slot,
362 beast::IP::Endpoint const& local_endpoint)
363 {
364 JLOG(m_journal.trace())
365 << beast::leftw(18) << "Logic connected" << slot->remote_endpoint()
366 << " on local " << local_endpoint;
367
369
370 // The object must exist in our table
371 XRPL_ASSERT(
372 slots_.find(slot->remote_endpoint()) != slots_.end(),
373 "ripple::PeerFinder::Logic::onConnected : valid slot input");
374 // Assign the local endpoint now that it's known
375 slot->local_endpoint(local_endpoint);
376
377 // Check for self-connect by address
378 {
379 auto const iter(slots_.find(local_endpoint));
380 if (iter != slots_.end())
381 {
382 XRPL_ASSERT(
383 iter->second->local_endpoint() == slot->remote_endpoint(),
384 "ripple::PeerFinder::Logic::onConnected : local and remote "
385 "endpoints do match");
386 JLOG(m_journal.warn())
387 << beast::leftw(18) << "Logic dropping "
388 << slot->remote_endpoint() << " as self connect";
389 return false;
390 }
391 }
392
393 // Update counts
394 counts_.remove(*slot);
395 slot->state(Slot::connected);
396 counts_.add(*slot);
397 return true;
398 }
399
400 Result
401 activate(SlotImp::ptr const& slot, PublicKey const& key, bool reserved)
402 {
403 JLOG(m_journal.debug())
404 << beast::leftw(18) << "Logic handshake " << slot->remote_endpoint()
405 << " with " << (reserved ? "reserved " : "") << "key " << key;
406
408
409 // The object must exist in our table
410 XRPL_ASSERT(
411 slots_.find(slot->remote_endpoint()) != slots_.end(),
412 "ripple::PeerFinder::Logic::activate : valid slot input");
413 // Must be accepted or connected
414 XRPL_ASSERT(
415 slot->state() == Slot::accept || slot->state() == Slot::connected,
416 "ripple::PeerFinder::Logic::activate : valid slot state");
417
418 // Check for duplicate connection by key
419 if (keys_.find(key) != keys_.end())
420 return Result::duplicate;
421
422 // If the peer belongs to a cluster or is reserved,
423 // update the slot to reflect that.
424 counts_.remove(*slot);
425 slot->reserved(reserved);
426 counts_.add(*slot);
427
428 // See if we have an open space for this slot
429 if (!counts_.can_activate(*slot))
430 {
431 if (!slot->inbound())
432 bootcache_.on_success(slot->remote_endpoint());
433 return Result::full;
434 }
435
436 // Set the key right before adding to the map, otherwise we might
437 // assert later when erasing the key.
438 slot->public_key(key);
439 {
440 [[maybe_unused]] bool const inserted = keys_.insert(key).second;
441 // Public key must not already exist
442 XRPL_ASSERT(
443 inserted,
444 "ripple::PeerFinder::Logic::activate : public key inserted");
445 }
446
447 // Change state and update counts
448 counts_.remove(*slot);
449 slot->activate(m_clock.now());
450 counts_.add(*slot);
451
452 if (!slot->inbound())
453 bootcache_.on_success(slot->remote_endpoint());
454
455 // Mark fixed slot success
456 if (slot->fixed() && !slot->inbound())
457 {
458 auto iter(fixed_.find(slot->remote_endpoint()));
459 if (iter == fixed_.end())
461 "PeerFinder::Logic::activate(): remote_endpoint "
462 "missing from fixed_");
463
464 iter->second.success(m_clock.now());
465 JLOG(m_journal.trace()) << beast::leftw(18) << "Logic fixed "
466 << slot->remote_endpoint() << " success";
467 }
468
469 return Result::success;
470 }
471
478 {
480 RedirectHandouts h(slot);
482 handout(&h, (&h) + 1, livecache_.hops.begin(), livecache_.hops.end());
483 return std::move(h.list());
484 }
485
489 // VFALCO TODO This should add the returned addresses to the
490 // squelch list in one go once the list is built,
491 // rather than having each module add to the squelch list.
494 {
496
498
499 // Count how many more outbound attempts to make
500 //
501 auto needed(counts_.attempts_needed());
502 if (needed == 0)
503 return none;
504
505 ConnectHandouts h(needed, m_squelches);
506
507 // Make sure we don't connect to already-connected entries.
508 for (auto const& s : slots_)
509 {
510 auto const result(
511 m_squelches.insert(s.second->remote_endpoint().address()));
512 if (!result.second)
513 m_squelches.touch(result.first);
514 }
515
516 // 1. Use Fixed if:
517 // Fixed active count is below fixed count AND
518 // ( There are eligible fixed addresses to try OR
519 // Any outbound attempts are in progress)
520 //
521 if (counts_.fixed_active() < fixed_.size())
522 {
523 get_fixed(needed, h.list(), m_squelches);
524
525 if (!h.list().empty())
526 {
527 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic connect "
528 << h.list().size() << " fixed";
529 return h.list();
530 }
531
532 if (counts_.attempts() > 0)
533 {
534 JLOG(m_journal.debug())
535 << beast::leftw(18) << "Logic waiting on "
536 << counts_.attempts() << " attempts";
537 return none;
538 }
539 }
540
541 // Only proceed if auto connect is enabled and we
542 // have less than the desired number of outbound slots
543 //
545 return none;
546
547 // 2. Use Livecache if:
548 // There are any entries in the cache OR
549 // Any outbound attempts are in progress
550 //
551 {
553 handout(
554 &h, (&h) + 1, livecache_.hops.rbegin(), livecache_.hops.rend());
555 if (!h.list().empty())
556 {
557 JLOG(m_journal.debug())
558 << beast::leftw(18) << "Logic connect " << h.list().size()
559 << " live "
560 << ((h.list().size() > 1) ? "endpoints" : "endpoint");
561 return h.list();
562 }
563 else if (counts_.attempts() > 0)
564 {
565 JLOG(m_journal.debug())
566 << beast::leftw(18) << "Logic waiting on "
567 << counts_.attempts() << " attempts";
568 return none;
569 }
570 }
571
572 /* 3. Bootcache refill
573 If the Bootcache is empty, try to get addresses from the current
574 set of Sources and add them into the Bootstrap cache.
575
576 Pseudocode:
577 If ( domainNames.count() > 0 AND (
578 unusedBootstrapIPs.count() == 0
579 OR activeNameResolutions.count() > 0) )
580 ForOneOrMore (DomainName that hasn't been resolved recently)
581 Contact DomainName and add entries to the
582 unusedBootstrapIPs return;
583 */
584
585 // 4. Use Bootcache if:
586 // There are any entries we haven't tried lately
587 //
588 for (auto iter(bootcache_.begin());
589 !h.full() && iter != bootcache_.end();
590 ++iter)
591 h.try_insert(*iter);
592
593 if (!h.list().empty())
594 {
595 JLOG(m_journal.debug())
596 << beast::leftw(18) << "Logic connect " << h.list().size()
597 << " boot "
598 << ((h.list().size() > 1) ? "addresses" : "address");
599 return h.list();
600 }
601
602 // If we get here we are stuck
603 return none;
604 }
605
608 {
610 result;
611
613
614 clock_type::time_point const now = m_clock.now();
615 if (m_whenBroadcast <= now)
616 {
618
619 {
620 // build list of active slots
622 slots.reserve(slots_.size());
624 slots_.cbegin(),
625 slots_.cend(),
626 [&slots](Slots::value_type const& value) {
627 if (value.second->state() == Slot::active)
628 slots.emplace_back(value.second);
629 });
630 std::shuffle(slots.begin(), slots.end(), default_prng());
631
632 // build target vector
633 targets.reserve(slots.size());
635 slots.cbegin(),
636 slots.cend(),
637 [&targets](SlotImp::ptr const& slot) {
638 targets.emplace_back(slot);
639 });
640 }
641
642 /* VFALCO NOTE
643 This is a temporary measure. Once we know our own IP
644 address, the correct solution is to put it into the Livecache
645 at hops 0, and go through the regular handout path. This way
646 we avoid handing our address out too frequenty, which this code
647 suffers from.
648 */
649 // Add an entry for ourselves if:
650 // 1. We want incoming
651 // 2. We have slots
652 // 3. We haven't failed the firewalled test
653 //
655 {
656 Endpoint ep;
657 ep.hops = 0;
658 // we use the unspecified (0) address here because the value is
659 // irrelevant to recipients. When peers receive an endpoint
660 // with 0 hops, they use the socket remote_addr instead of the
661 // value in the message. Furthermore, since the address value
662 // is ignored, the type/version (ipv4 vs ipv6) doesn't matter
663 // either. ipv6 has a slightly more compact string
664 // representation of 0, so use that for self entries.
667 for (auto& t : targets)
668 t.insert(ep);
669 }
670
671 // build sequence of endpoints by hops
673 handout(
674 targets.begin(),
675 targets.end(),
678
679 // broadcast
680 for (auto const& t : targets)
681 {
682 SlotImp::ptr const& slot = t.slot();
683 auto const& list = t.list();
684 JLOG(m_journal.trace())
685 << beast::leftw(18) << "Logic sending "
686 << slot->remote_endpoint() << " with " << list.size()
687 << ((list.size() == 1) ? " endpoint" : " endpoints");
688 result.push_back(std::make_pair(slot, list));
689 }
690
692 }
693
694 return result;
695 }
696
697 void
699 {
701
702 // Expire the Livecache
704
705 // Expire the recent cache in each slot
706 for (auto const& entry : slots_)
707 entry.second->expire();
708
709 // Expire the recent attempts table
711
713 }
714
715 //--------------------------------------------------------------------------
716
717 // Validate and clean up the list that we received from the slot.
718 void
720 {
721 bool neighbor(false);
722 for (auto iter = list.begin(); iter != list.end();)
723 {
724 Endpoint& ep(*iter);
725
726 // Enforce hop limit
727 if (ep.hops > Tuning::maxHops)
728 {
729 JLOG(m_journal.debug())
730 << beast::leftw(18) << "Endpoints drop " << ep.address
731 << " for excess hops " << ep.hops;
732 iter = list.erase(iter);
733 continue;
734 }
735
736 // See if we are directly connected
737 if (ep.hops == 0)
738 {
739 if (!neighbor)
740 {
741 // Fill in our neighbors remote address
742 neighbor = true;
743 ep.address =
744 slot->remote_endpoint().at_port(ep.address.port());
745 }
746 else
747 {
748 JLOG(m_journal.debug())
749 << beast::leftw(18) << "Endpoints drop " << ep.address
750 << " for extra self";
751 iter = list.erase(iter);
752 continue;
753 }
754 }
755
756 // Discard invalid addresses
757 if (!is_valid_address(ep.address))
758 {
759 JLOG(m_journal.debug()) << beast::leftw(18) << "Endpoints drop "
760 << ep.address << " as invalid";
761 iter = list.erase(iter);
762 continue;
763 }
764
765 // Filter duplicates
766 if (std::any_of(
767 list.begin(),
768 iter,
769 [ep](Endpoints::value_type const& other) {
770 return ep.address == other.address;
771 }))
772 {
773 JLOG(m_journal.debug()) << beast::leftw(18) << "Endpoints drop "
774 << ep.address << " as duplicate";
775 iter = list.erase(iter);
776 continue;
777 }
778
779 // Increment hop count on the incoming message, so
780 // we store it at the hop count we will send it at.
781 //
782 ++ep.hops;
783
784 ++iter;
785 }
786 }
787
788 void
790 {
791 // If we're sent too many endpoints, sample them at random:
793 {
794 std::shuffle(list.begin(), list.end(), default_prng());
796 }
797
798 JLOG(m_journal.trace())
799 << beast::leftw(18) << "Endpoints from " << slot->remote_endpoint()
800 << " contained " << list.size()
801 << ((list.size() > 1) ? " entries" : " entry");
802
804
805 // The object must exist in our table
806 XRPL_ASSERT(
807 slots_.find(slot->remote_endpoint()) != slots_.end(),
808 "ripple::PeerFinder::Logic::on_endpoints : valid slot input");
809
810 // Must be handshaked!
811 XRPL_ASSERT(
812 slot->state() == Slot::active,
813 "ripple::PeerFinder::Logic::on_endpoints : valid slot state");
814
816
817 // Limit how often we accept new endpoints
818 if (slot->whenAcceptEndpoints > now)
819 return;
820
821 preprocess(slot, list);
822
823 for (auto const& ep : list)
824 {
825 XRPL_ASSERT(
826 ep.hops,
827 "ripple::PeerFinder::Logic::on_endpoints : nonzero hops");
828
829 slot->recent.insert(ep.address, ep.hops);
830
831 // Note hops has been incremented, so 1
832 // means a directly connected neighbor.
833 //
834 if (ep.hops == 1)
835 {
836 if (slot->connectivityCheckInProgress)
837 {
838 JLOG(m_journal.debug())
839 << beast::leftw(18) << "Logic testing " << ep.address
840 << " already in progress";
841 continue;
842 }
843
844 if (!slot->checked)
845 {
846 // Mark that a check for this slot is now in progress.
847 slot->connectivityCheckInProgress = true;
848
849 // Test the slot's listening port before
850 // adding it to the livecache for the first time.
851 //
853 ep.address,
854 std::bind(
856 this,
857 slot->remote_endpoint(),
858 ep.address,
859 std::placeholders::_1));
860
861 // Note that we simply discard the first Endpoint
862 // that the neighbor sends when we perform the
863 // listening test. They will just send us another
864 // one in a few seconds.
865
866 continue;
867 }
868
869 // If they failed the test then skip the address
870 if (!slot->canAccept)
871 continue;
872 }
873
874 // We only add to the livecache if the neighbor passed the
875 // listening test, else we silently drop neighbor endpoint
876 // since their listening port is misconfigured.
877 //
878 livecache_.insert(ep);
879 bootcache_.insert(ep.address);
880 }
881
882 slot->whenAcceptEndpoints = now + Tuning::secondsPerMessage;
883 }
884
885 //--------------------------------------------------------------------------
886
887 void
888 remove(SlotImp::ptr const& slot)
889 {
890 {
891 auto const iter = slots_.find(slot->remote_endpoint());
892 // The slot must exist in the table
893 if (iter == slots_.end())
895 "PeerFinder::Logic::remove(): remote_endpoint "
896 "missing from slots_");
897
898 // Remove from slot by IP table
899 slots_.erase(iter);
900 }
901 // Remove the key if present
902 if (slot->public_key() != std::nullopt)
903 {
904 auto const iter = keys_.find(*slot->public_key());
905 // Key must exist
906 if (iter == keys_.end())
908 "PeerFinder::Logic::remove(): public_key missing "
909 "from keys_");
910
911 keys_.erase(iter);
912 }
913 // Remove from connected address table
914 {
915 auto const iter(
916 connectedAddresses_.find(slot->remote_endpoint().address()));
917 // Address must exist
918 if (iter == connectedAddresses_.end())
920 "PeerFinder::Logic::remove(): remote_endpont "
921 "address missing from connectedAddresses_");
922
924 }
925
926 // Update counts
927 counts_.remove(*slot);
928 }
929
930 void
932 {
934
935 remove(slot);
936
937 // Mark fixed slot failure
938 if (slot->fixed() && !slot->inbound() && slot->state() != Slot::active)
939 {
940 auto iter(fixed_.find(slot->remote_endpoint()));
941 if (iter == fixed_.end())
943 "PeerFinder::Logic::on_closed(): remote_endpont "
944 "missing from fixed_");
945
946 iter->second.failure(m_clock.now());
947 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic fixed "
948 << slot->remote_endpoint() << " failed";
949 }
950
951 // Do state specific bookkeeping
952 switch (slot->state())
953 {
954 case Slot::accept:
955 JLOG(m_journal.trace()) << beast::leftw(18) << "Logic accept "
956 << slot->remote_endpoint() << " failed";
957 break;
958
959 case Slot::connect:
960 case Slot::connected:
961 bootcache_.on_failure(slot->remote_endpoint());
962 // VFALCO TODO If the address exists in the ephemeral/live
963 // endpoint livecache then we should mark the
964 // failure
965 // as if it didn't pass the listening test. We should also
966 // avoid propagating the address.
967 break;
968
969 case Slot::active:
970 JLOG(m_journal.trace()) << beast::leftw(18) << "Logic close "
971 << slot->remote_endpoint();
972 break;
973
974 case Slot::closing:
975 JLOG(m_journal.trace()) << beast::leftw(18) << "Logic finished "
976 << slot->remote_endpoint();
977 break;
978
979 default:
980 UNREACHABLE(
981 "ripple::PeerFinder::Logic::on_closed : invalid slot "
982 "state");
983 break;
984 }
985 }
986
987 void
989 {
991
992 bootcache_.on_failure(slot->remote_endpoint());
993 }
994
995 // Insert a set of redirect IP addresses into the Bootcache
996 template <class FwdIter>
997 void
999 FwdIter first,
1000 FwdIter last,
1001 boost::asio::ip::tcp::endpoint const& remote_address);
1002
1003 //--------------------------------------------------------------------------
1004
1005 // Returns `true` if the address matches a fixed slot address
1006 // Must have the lock held
1007 bool
1008 fixed(beast::IP::Endpoint const& endpoint) const
1009 {
1010 for (auto const& entry : fixed_)
1011 if (entry.first == endpoint)
1012 return true;
1013 return false;
1014 }
1015
1016 // Returns `true` if the address matches a fixed slot address
1017 // Note that this does not use the port information in the IP::Endpoint
1018 // Must have the lock held
1019 bool
1020 fixed(beast::IP::Address const& address) const
1021 {
1022 for (auto const& entry : fixed_)
1023 if (entry.first.address() == address)
1024 return true;
1025 return false;
1026 }
1027
1028 //--------------------------------------------------------------------------
1029 //
1030 // Connection Strategy
1031 //
1032 //--------------------------------------------------------------------------
1033
1035 template <class Container>
1036 void
1038 std::size_t needed,
1039 Container& c,
1040 typename ConnectHandouts::Squelches& squelches)
1041 {
1042 auto const now(m_clock.now());
1043 for (auto iter = fixed_.begin(); needed && iter != fixed_.end(); ++iter)
1044 {
1045 auto const& address(iter->first.address());
1046 if (iter->second.when() <= now &&
1047 squelches.find(address) == squelches.end() &&
1049 slots_.cbegin(),
1050 slots_.cend(),
1051 [address](Slots::value_type const& v) {
1052 return address == v.first.address();
1053 }))
1054 {
1055 squelches.insert(iter->first.address());
1056 c.push_back(iter->first);
1057 --needed;
1058 }
1059 }
1060 }
1061
1062 //--------------------------------------------------------------------------
1063
1064 void
1066 {
1067 fetch(source);
1068 }
1069
1070 void
1072 {
1073 m_sources.push_back(source);
1074 }
1075
1076 //--------------------------------------------------------------------------
1077 //
1078 // Bootcache livecache sources
1079 //
1080 //--------------------------------------------------------------------------
1081
1082 // Add a set of addresses.
1083 // Returns the number of addresses added.
1084 //
1085 int
1087 {
1088 int count(0);
1090 for (auto addr : list)
1091 {
1092 if (bootcache_.insertStatic(addr))
1093 ++count;
1094 }
1095 return count;
1096 }
1097
1098 // Fetch bootcache addresses from the specified source.
1099 void
1101 {
1102 Source::Results results;
1103
1104 {
1105 {
1107 if (stopping_)
1108 return;
1109 fetchSource_ = source;
1110 }
1111
1112 // VFALCO NOTE The fetch is synchronous,
1113 // not sure if that's a good thing.
1114 //
1115 source->fetch(results, m_journal);
1116
1117 {
1119 if (stopping_)
1120 return;
1121 fetchSource_ = nullptr;
1122 }
1123 }
1124
1125 if (!results.error)
1126 {
1127 int const count(addBootcacheAddresses(results.addresses));
1128 JLOG(m_journal.info())
1129 << beast::leftw(18) << "Logic added " << count << " new "
1130 << ((count == 1) ? "address" : "addresses") << " from "
1131 << source->name();
1132 }
1133 else
1134 {
1135 JLOG(m_journal.error()) << beast::leftw(18) << "Logic failed "
1136 << "'" << source->name() << "' fetch, "
1137 << results.error.message();
1138 }
1139 }
1140
1141 //--------------------------------------------------------------------------
1142 //
1143 // Endpoint message handling
1144 //
1145 //--------------------------------------------------------------------------
1146
1147 // Returns true if the IP::Endpoint contains no invalid data.
1148 bool
1150 {
1151 if (is_unspecified(address))
1152 return false;
1153 if (!is_public(address))
1154 return false;
1155 if (address.port() == 0)
1156 return false;
1157 return true;
1158 }
1159
1160 //--------------------------------------------------------------------------
1161 //
1162 // PropertyStream
1163 //
1164 //--------------------------------------------------------------------------
1165
1166 void
1168 {
1169 for (auto const& entry : slots)
1170 {
1172 SlotImp const& slot(*entry.second);
1173 if (slot.local_endpoint() != std::nullopt)
1174 item["local_address"] = to_string(*slot.local_endpoint());
1175 item["remote_address"] = to_string(slot.remote_endpoint());
1176 if (slot.inbound())
1177 item["inbound"] = "yes";
1178 if (slot.fixed())
1179 item["fixed"] = "yes";
1180 if (slot.reserved())
1181 item["reserved"] = "yes";
1182
1183 item["state"] = stateString(slot.state());
1184 }
1185 }
1186
1187 void
1189 {
1191
1192 // VFALCO NOTE These ugly casts are needed because
1193 // of how std::size_t is declared on some linuxes
1194 //
1195 map["bootcache"] = std::uint32_t(bootcache_.size());
1196 map["fixed"] = std::uint32_t(fixed_.size());
1197
1198 {
1199 beast::PropertyStream::Set child("peers", map);
1200 writeSlots(child, slots_);
1201 }
1202
1203 {
1204 beast::PropertyStream::Map child("counts", map);
1205 counts_.onWrite(child);
1206 }
1207
1208 {
1209 beast::PropertyStream::Map child("config", map);
1210 config_.onWrite(child);
1211 }
1212
1213 {
1214 beast::PropertyStream::Map child("livecache", map);
1215 livecache_.onWrite(child);
1216 }
1217
1218 {
1219 beast::PropertyStream::Map child("bootcache", map);
1220 bootcache_.onWrite(child);
1221 }
1222 }
1223
1224 //--------------------------------------------------------------------------
1225 //
1226 // Diagnostics
1227 //
1228 //--------------------------------------------------------------------------
1229
1230 Counts const&
1231 counts() const
1232 {
1233 return counts_;
1234 }
1235
1236 static std::string
1238 {
1239 switch (state)
1240 {
1241 case Slot::accept:
1242 return "accept";
1243 case Slot::connect:
1244 return "connect";
1245 case Slot::connected:
1246 return "connected";
1247 case Slot::active:
1248 return "active";
1249 case Slot::closing:
1250 return "closing";
1251 default:
1252 break;
1253 };
1254 return "?";
1255 }
1256};
1257
1258//------------------------------------------------------------------------------
1259
1260template <class Checker>
1261template <class FwdIter>
1262void
1264 FwdIter first,
1265 FwdIter last,
1266 boost::asio::ip::tcp::endpoint const& remote_address)
1267{
1268 std::lock_guard _(lock_);
1269 std::size_t n = 0;
1270 for (; first != last && n < Tuning::maxRedirects; ++first, ++n)
1271 bootcache_.insert(beast::IPAddressConversion::from_asio(*first));
1272 if (n > 0)
1273 {
1274 JLOG(m_journal.trace()) << beast::leftw(18) << "Logic add " << n
1275 << " redirect IPs from " << remote_address;
1276 }
1277}
1278
1279} // namespace PeerFinder
1280} // namespace ripple
1281
1282#endif
T any_of(T... args)
T cbegin(T... args)
T bind(T... args)
A version-independent IP address and port combination.
Definition: IPEndpoint.h:38
Address const & address() const
Returns the address portion of this endpoint.
Definition: IPEndpoint.h:75
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Definition: IPEndpoint.h:68
Port port() const
Returns the port number on the endpoint.
Definition: IPEndpoint.h:61
A generic endpoint for log messages.
Definition: Journal.h:60
Stream error() const
Definition: Journal.h:346
Stream debug() const
Definition: Journal.h:328
Stream info() const
Definition: Journal.h:334
Stream trace() const
Severity stream access functions.
Definition: Journal.h:322
Stream warn() const
Definition: Journal.h:340
virtual time_point now() const =0
Returns the current time.
Associative container where each element is also indexed by time.
auto insert(value_type const &value) -> typename std::enable_if<!maybe_multi, std::pair< iterator, bool > >::type
void touch(beast::detail::aged_container_iterator< is_const, Iterator > pos)
Stores IP addresses useful for gaining initial connections.
Definition: Bootcache.h:54
map_type::size_type size() const
Returns the number of entries in the cache.
Definition: Bootcache.cpp:50
const_iterator end() const
Definition: Bootcache.cpp:68
void periodicActivity()
Stores the cache in the persistent database on a timer.
Definition: Bootcache.cpp:204
bool insert(beast::IP::Endpoint const &endpoint)
Add a newly-learned address to the cache.
Definition: Bootcache.cpp:112
void on_failure(beast::IP::Endpoint const &endpoint)
Called when an outbound connection attempt fails to handshake.
Definition: Bootcache.cpp:176
const_iterator begin() const
IP::Endpoint iterators that traverse in decreasing valence.
Definition: Bootcache.cpp:56
void onWrite(beast::PropertyStream::Map &map)
Write the cache state to the property stream.
Definition: Bootcache.cpp:212
void on_success(beast::IP::Endpoint const &endpoint)
Called when an outbound connection handshake completes.
Definition: Bootcache.cpp:148
bool insertStatic(beast::IP::Endpoint const &endpoint)
Add a staticallyconfigured address to the cache.
Definition: Bootcache.cpp:126
void load()
Load the persisted data from the Store into the container.
Definition: Bootcache.cpp:89
Tests remote listening sockets to make sure they are connectible.
Definition: Checker.h:39
void async_connect(beast::IP::Endpoint const &endpoint, Handler &&handler)
Performs an async connection test on the specified endpoint.
Definition: Checker.h:206
Receives handouts for making automatic connections.
Definition: Handouts.h:272
bool try_insert(beast::IP::Endpoint const &endpoint)
Definition: Handouts.h:333
Manages the count of available connections for the various slots.
Definition: Counts.h:34
std::size_t fixed_active() const
Returns the number of active fixed connections.
Definition: Counts.h:127
int out_active() const
Returns the number of outbound peers assigned an open slot.
Definition: Counts.h:113
std::size_t attempts_needed() const
Returns the number of attempts needed to bring us to the max.
Definition: Counts.h:88
int inboundSlots() const
Returns the total number of inbound slots.
Definition: Counts.h:166
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
Definition: Counts.h:228
int out_max() const
Returns the total number of outbound slots.
Definition: Counts.h:104
void add(Slot const &s)
Adds the slot state and properties to the slot counts.
Definition: Counts.h:56
void remove(Slot const &s)
Removes the slot state and properties from the slot counts.
Definition: Counts.h:63
void onConfig(Config const &config)
Called when the config is set or changed.
Definition: Counts.h:136
bool can_activate(Slot const &s) const
Returns true if the slot can become active.
Definition: Counts.h:70
std::size_t attempts() const
Returns the number of outbound connection attempts.
Definition: Counts.h:97
void shuffle()
Shuffle each hop list.
Definition: Livecache.h:499
The Livecache holds the short-lived relayed Endpoint messages.
Definition: Livecache.h:198
void expire()
Erase entries whose time has expired.
Definition: Livecache.h:407
void insert(Endpoint const &ep)
Creates or updates an existing Element based on a new message.
Definition: Livecache.h:429
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
Definition: Livecache.h:476
class ripple::PeerFinder::Livecache::hops_t hops
The Logic for maintaining the list of Slot addresses.
bool fixed(beast::IP::Endpoint const &endpoint) const
int addBootcacheAddresses(IPAddresses const &list)
void onWrite(beast::PropertyStream::Map &map)
void fetch(std::shared_ptr< Source > const &source)
bool onConnected(SlotImp::ptr const &slot, beast::IP::Endpoint const &local_endpoint)
bool is_valid_address(beast::IP::Endpoint const &address)
std::vector< std::pair< std::shared_ptr< Slot >, std::vector< Endpoint > > > buildEndpointsForPeers()
void on_closed(SlotImp::ptr const &slot)
void addFixedPeer(std::string const &name, std::vector< beast::IP::Endpoint > const &addresses)
void on_failure(SlotImp::ptr const &slot)
void preprocess(SlotImp::ptr const &slot, Endpoints &list)
SlotImp::ptr new_outbound_slot(beast::IP::Endpoint const &remote_endpoint)
void checkComplete(beast::IP::Endpoint const &remoteAddress, beast::IP::Endpoint const &checkedAddress, boost::system::error_code ec)
Result activate(SlotImp::ptr const &slot, PublicKey const &key, bool reserved)
bool fixed(beast::IP::Address const &address) const
void remove(SlotImp::ptr const &slot)
std::multiset< beast::IP::Address > connectedAddresses_
void get_fixed(std::size_t needed, Container &c, typename ConnectHandouts::Squelches &squelches)
Adds eligible Fixed addresses for outbound attempts.
std::vector< beast::IP::Endpoint > autoconnect()
Create new outbound connection attempts as needed.
SlotImp::ptr new_inbound_slot(beast::IP::Endpoint const &local_endpoint, beast::IP::Endpoint const &remote_endpoint)
std::vector< std::shared_ptr< Source > > m_sources
void onRedirects(FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint const &remote_address)
void addSource(std::shared_ptr< Source > const &source)
void on_endpoints(SlotImp::ptr const &slot, Endpoints list)
void addStaticSource(std::shared_ptr< Source > const &source)
Logic(clock_type &clock, Store &store, Checker &checker, beast::Journal journal)
void writeSlots(beast::PropertyStream::Set &set, Slots const &slots)
void addFixedPeer(std::string const &name, beast::IP::Endpoint const &ep)
std::vector< Endpoint > redirect(SlotImp::ptr const &slot)
Return a list of addresses suitable for redirection.
std::map< beast::IP::Endpoint, Fixed > fixed_
static std::string stateString(Slot::State state)
Receives handouts for redirecting a connection.
Definition: Handouts.h:104
std::vector< Endpoint > & list()
Definition: Handouts.h:126
std::optional< beast::IP::Endpoint > const & local_endpoint() const override
The local endpoint of the socket, when known.
Definition: SlotImp.h:83
bool fixed() const override
Returns true if this is a fixed connection.
Definition: SlotImp.h:59
bool inbound() const override
Returns true if this is an inbound connection.
Definition: SlotImp.h:53
std::shared_ptr< SlotImp > ptr
Definition: SlotImp.h:37
void set_listening_port(std::uint16_t port)
Definition: SlotImp.h:104
State state() const override
Returns the state of the connection.
Definition: SlotImp.h:71
beast::IP::Endpoint const & remote_endpoint() const override
The remote endpoint of socket.
Definition: SlotImp.h:77
bool reserved() const override
Returns true if this is a reserved connection.
Definition: SlotImp.h:65
Abstract persistence for PeerFinder data.
Definition: Store.h:29
A public key.
Definition: PublicKey.h:61
T count(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T for_each(T... args)
T forward_as_tuple(T... args)
T make_pair(T... args)
T make_tuple(T... args)
boost::asio::ip::address_v6 AddressV6
Definition: IPAddressV6.h:30
boost::asio::ip::address Address
Definition: IPAddress.h:39
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
std::chrono::seconds constexpr secondsPerMessage(151)
std::chrono::seconds constexpr recentAttemptDuration(60)
Result
Possible results from activating a slot.
void handout(TargetFwdIter first, TargetFwdIter last, SeqFwdIter seq_first, SeqFwdIter seq_last)
Distributes objects to targets according to business rules.
Definition: Handouts.h:68
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: algorithm.h:25
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
Definition: BasicConfig.h:315
std::string to_string(base_uint< Bits, Tag > const &a)
Definition: base_uint.h:630
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
beast::xor_shift_engine & default_prng()
Return the default random engine.
T piecewise_construct
T push_back(T... args)
T shuffle(T... args)
T ref(T... args)
T reserve(T... args)
T resize(T... args)
T size(T... args)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
Left justifies a field at the specified width.
Definition: iosformat.h:34
PeerFinder configuration settings.
void onWrite(beast::PropertyStream::Map &map)
Write the configuration into a property stream.
bool autoConnect
true if we want to establish connections automatically
int ipLimit
Limit how many incoming connections we allow per IP.
bool wantIncoming
true if we want to accept incoming connections.
std::uint16_t listeningPort
The listening port number.
Describes a connectible peer address along with some metadata.
The results of a fetch.
Definition: Source.h:43
boost::system::error_code error
Definition: Source.h:47