rippled
Loading...
Searching...
No Matches
OverlayImpl.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/app/misc/HashRouter.h>
21#include <xrpld/app/misc/NetworkOPs.h>
22#include <xrpld/app/misc/ValidatorList.h>
23#include <xrpld/app/misc/ValidatorSite.h>
24#include <xrpld/app/rdb/RelationalDatabase.h>
25#include <xrpld/app/rdb/Wallet.h>
26#include <xrpld/overlay/Cluster.h>
27#include <xrpld/overlay/detail/ConnectAttempt.h>
28#include <xrpld/overlay/detail/PeerImp.h>
29#include <xrpld/overlay/detail/TrafficCount.h>
30#include <xrpld/overlay/detail/Tuning.h>
31#include <xrpld/overlay/predicates.h>
32#include <xrpld/peerfinder/make_Manager.h>
33#include <xrpld/rpc/handlers/GetCounts.h>
34#include <xrpld/rpc/json_body.h>
35
36#include <xrpl/basics/base64.h>
37#include <xrpl/basics/make_SSLContext.h>
38#include <xrpl/basics/random.h>
39#include <xrpl/beast/core/LexicalCast.h>
40#include <xrpl/protocol/STTx.h>
41#include <xrpl/server/SimpleWriter.h>
42
43#include <boost/algorithm/string/predicate.hpp>
44#include <boost/asio/executor_work_guard.hpp>
45
46namespace ripple {
47
48namespace CrawlOptions {
49enum {
51 Overlay = (1 << 0),
52 ServerInfo = (1 << 1),
53 ServerCounts = (1 << 2),
54 Unl = (1 << 3)
55};
56}
57
58//------------------------------------------------------------------------------
59
60OverlayImpl::Child::Child(OverlayImpl& overlay) : overlay_(overlay)
61{
62}
63
65{
66 overlay_.remove(*this);
67}
68
69//------------------------------------------------------------------------------
70
72 : Child(overlay), timer_(overlay_.io_context_)
73{
74}
75
76void
78{
79 // This method is only ever called from the same strand that calls
80 // Timer::on_timer, ensuring they never execute concurrently.
81 stopping_ = true;
82 timer_.cancel();
83}
84
85void
87{
88 timer_.expires_after(std::chrono::seconds(1));
89 timer_.async_wait(boost::asio::bind_executor(
90 overlay_.strand_,
92 &Timer::on_timer, shared_from_this(), std::placeholders::_1)));
93}
94
95void
97{
98 if (ec || stopping_)
99 {
100 if (ec && ec != boost::asio::error::operation_aborted)
101 {
102 JLOG(overlay_.journal_.error()) << "on_timer: " << ec.message();
103 }
104 return;
105 }
106
107 overlay_.m_peerFinder->once_per_second();
108 overlay_.sendEndpoints();
109 overlay_.autoConnect();
110 if (overlay_.app_.config().TX_REDUCE_RELAY_ENABLE)
111 overlay_.sendTxQueue();
112
113 if ((++overlay_.timer_count_ % Tuning::checkIdlePeers) == 0)
114 overlay_.deleteIdlePeers();
115
116 async_wait();
117}
118
119//------------------------------------------------------------------------------
120
122 Application& app,
123 Setup const& setup,
124 ServerHandler& serverHandler,
126 Resolver& resolver,
127 boost::asio::io_context& io_context,
128 BasicConfig const& config,
129 beast::insight::Collector::ptr const& collector)
130 : app_(app)
131 , io_context_(io_context)
132 , work_(std::in_place, boost::asio::make_work_guard(io_context_))
133 , strand_(boost::asio::make_strand(io_context_))
134 , setup_(setup)
135 , journal_(app_.journal("Overlay"))
136 , serverHandler_(serverHandler)
138 , m_peerFinder(PeerFinder::make_Manager(
139 io_context,
140 stopwatch(),
141 app_.journal("PeerFinder"),
142 config,
143 collector))
144 , m_resolver(resolver)
145 , next_id_(1)
146 , timer_count_(0)
147 , slots_(app.logs(), *this, app.config())
148 , m_stats(
149 std::bind(&OverlayImpl::collect_metrics, this),
150 collector,
151 [counts = m_traffic.getCounts(), collector]() {
153
154 for (auto const& pair : counts)
155 ret.emplace(
156 pair.first, TrafficGauges(pair.second.name, collector));
157
158 return ret;
159 }())
160{
162}
163
164Handoff
166 std::unique_ptr<stream_type>&& stream_ptr,
167 http_request_type&& request,
168 endpoint_type remote_endpoint)
169{
170 auto const id = next_id_++;
171 beast::WrappedSink sink(app_.logs()["Peer"], makePrefix(id));
172 beast::Journal journal(sink);
173
174 Handoff handoff;
175 if (processRequest(request, handoff))
176 return handoff;
177 if (!isPeerUpgrade(request))
178 return handoff;
179
180 handoff.moved = true;
181
182 JLOG(journal.debug()) << "Peer connection upgrade from " << remote_endpoint;
183
184 error_code ec;
185 auto const local_endpoint(
186 stream_ptr->next_layer().socket().local_endpoint(ec));
187 if (ec)
188 {
189 JLOG(journal.debug()) << remote_endpoint << " failed: " << ec.message();
190 return handoff;
191 }
192
195 if (consumer.disconnect(journal))
196 return handoff;
197
198 auto const slot = m_peerFinder->new_inbound_slot(
201
202 if (slot == nullptr)
203 {
204 // self-connect, close
205 handoff.moved = false;
206 return handoff;
207 }
208
209 // Validate HTTP request
210
211 {
212 auto const types = beast::rfc2616::split_commas(request["Connect-As"]);
213 if (std::find_if(types.begin(), types.end(), [](std::string const& s) {
214 return boost::iequals(s, "peer");
215 }) == types.end())
216 {
217 handoff.moved = false;
218 handoff.response =
219 makeRedirectResponse(slot, request, remote_endpoint.address());
220 handoff.keep_alive = beast::rfc2616::is_keep_alive(request);
221 return handoff;
222 }
223 }
224
225 auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]);
226 if (!negotiatedVersion)
227 {
228 m_peerFinder->on_closed(slot);
229 handoff.moved = false;
230 handoff.response = makeErrorResponse(
231 slot,
232 request,
233 remote_endpoint.address(),
234 "Unable to agree on a protocol version");
235 handoff.keep_alive = false;
236 return handoff;
237 }
238
239 auto const sharedValue = makeSharedValue(*stream_ptr, journal);
240 if (!sharedValue)
241 {
242 m_peerFinder->on_closed(slot);
243 handoff.moved = false;
244 handoff.response = makeErrorResponse(
245 slot,
246 request,
247 remote_endpoint.address(),
248 "Incorrect security cookie");
249 handoff.keep_alive = false;
250 return handoff;
251 }
252
253 try
254 {
255 auto publicKey = verifyHandshake(
256 request,
257 *sharedValue,
260 remote_endpoint.address(),
261 app_);
262
263 {
264 // The node gets a reserved slot if it is in our cluster
265 // or if it has a reservation.
266 bool const reserved =
267 static_cast<bool>(app_.cluster().member(publicKey)) ||
268 app_.peerReservations().contains(publicKey);
269 auto const result =
270 m_peerFinder->activate(slot, publicKey, reserved);
271 if (result != PeerFinder::Result::success)
272 {
273 m_peerFinder->on_closed(slot);
274 JLOG(journal.debug()) << "Peer " << remote_endpoint
275 << " redirected, " << to_string(result);
276 handoff.moved = false;
278 slot, request, remote_endpoint.address());
279 handoff.keep_alive = false;
280 return handoff;
281 }
282 }
283
284 auto const peer = std::make_shared<PeerImp>(
285 app_,
286 id,
287 slot,
288 std::move(request),
289 publicKey,
290 *negotiatedVersion,
291 consumer,
292 std::move(stream_ptr),
293 *this);
294 {
295 // As we are not on the strand, run() must be called
296 // while holding the lock, otherwise new I/O can be
297 // queued after a call to stop().
298 std::lock_guard<decltype(mutex_)> lock(mutex_);
299 {
300 auto const result = m_peers.emplace(peer->slot(), peer);
301 XRPL_ASSERT(
302 result.second,
303 "ripple::OverlayImpl::onHandoff : peer is inserted");
304 (void)result.second;
305 }
306 list_.emplace(peer.get(), peer);
307
308 peer->run();
309 }
310 handoff.moved = true;
311 return handoff;
312 }
313 catch (std::exception const& e)
314 {
315 JLOG(journal.debug()) << "Peer " << remote_endpoint
316 << " fails handshake (" << e.what() << ")";
317
318 m_peerFinder->on_closed(slot);
319 handoff.moved = false;
320 handoff.response = makeErrorResponse(
321 slot, request, remote_endpoint.address(), e.what());
322 handoff.keep_alive = false;
323 return handoff;
324 }
325}
326
327//------------------------------------------------------------------------------
328
329bool
331{
332 if (!is_upgrade(request))
333 return false;
334 auto const versions = parseProtocolVersions(request["Upgrade"]);
335 return !versions.empty();
336}
337
340{
342 ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
343 return ss.str();
344}
345
349 http_request_type const& request,
350 address_type remote_address)
351{
352 boost::beast::http::response<json_body> msg;
353 msg.version(request.version());
354 msg.result(boost::beast::http::status::service_unavailable);
355 msg.insert("Server", BuildInfo::getFullVersionString());
356 {
358 ostr << remote_address;
359 msg.insert("Remote-Address", ostr.str());
360 }
361 msg.insert("Content-Type", "application/json");
362 msg.insert(boost::beast::http::field::connection, "close");
363 msg.body() = Json::objectValue;
364 {
365 Json::Value& ips = (msg.body()["peer-ips"] = Json::arrayValue);
366 for (auto const& _ : m_peerFinder->redirect(slot))
367 ips.append(_.address.to_string());
368 }
369 msg.prepare_payload();
371}
372
376 http_request_type const& request,
377 address_type remote_address,
378 std::string text)
379{
380 boost::beast::http::response<boost::beast::http::empty_body> msg;
381 msg.version(request.version());
382 msg.result(boost::beast::http::status::bad_request);
383 msg.reason("Bad Request (" + text + ")");
384 msg.insert("Server", BuildInfo::getFullVersionString());
385 msg.insert("Remote-Address", remote_address.to_string());
386 msg.insert(boost::beast::http::field::connection, "close");
387 msg.prepare_payload();
389}
390
391//------------------------------------------------------------------------------
392
393void
395{
396 XRPL_ASSERT(work_, "ripple::OverlayImpl::connect : work is set");
397
398 auto usage = resourceManager().newOutboundEndpoint(remote_endpoint);
399 if (usage.disconnect(journal_))
400 {
401 JLOG(journal_.info()) << "Over resource limit: " << remote_endpoint;
402 return;
403 }
404
405 auto const slot = peerFinder().new_outbound_slot(remote_endpoint);
406 if (slot == nullptr)
407 {
408 JLOG(journal_.debug()) << "Connect: No slot for " << remote_endpoint;
409 return;
410 }
411
413 app_,
416 usage,
418 next_id_++,
419 slot,
420 app_.journal("Peer"),
421 *this);
422
424 list_.emplace(p.get(), p);
425 p->run();
426}
427
428//------------------------------------------------------------------------------
429
430// Adds a peer that is already handshaked and active
431void
433{
435
436 {
437 auto const result = m_peers.emplace(peer->slot(), peer);
438 XRPL_ASSERT(
439 result.second,
440 "ripple::OverlayImpl::add_active : peer is inserted");
441 (void)result.second;
442 }
443
444 {
445 auto const result = ids_.emplace(
447 std::make_tuple(peer->id()),
448 std::make_tuple(peer));
449 XRPL_ASSERT(
450 result.second,
451 "ripple::OverlayImpl::add_active : peer ID is inserted");
452 (void)result.second;
453 }
454
455 list_.emplace(peer.get(), peer);
456
457 JLOG(journal_.debug()) << "activated " << peer->getRemoteAddress() << " ("
458 << peer->id() << ":"
459 << toBase58(
460 TokenType::NodePublic, peer->getNodePublic())
461 << ")";
462
463 // As we are not on the strand, run() must be called
464 // while holding the lock, otherwise new I/O can be
465 // queued after a call to stop().
466 peer->run();
467}
468
469void
471{
473 auto const iter = m_peers.find(slot);
474 XRPL_ASSERT(
475 iter != m_peers.end(), "ripple::OverlayImpl::remove : valid input");
476 m_peers.erase(iter);
477}
478
479void
481{
483 app_.config(),
484 serverHandler_.setup().overlay.port(),
485 app_.getValidationPublicKey().has_value(),
487
488 m_peerFinder->setConfig(config);
489 m_peerFinder->start();
490
491 // Populate our boot cache: if there are no entries in [ips] then we use
492 // the entries in [ips_fixed].
493 auto bootstrapIps =
495
496 // If nothing is specified, default to several well-known high-capacity
497 // servers to serve as bootstrap:
498 if (bootstrapIps.empty())
499 {
500 // Pool of servers operated by Ripple Labs Inc. - https://ripple.com
501 bootstrapIps.push_back("r.ripple.com 51235");
502
503 // Pool of servers operated by ISRDC - https://isrdc.in
504 bootstrapIps.push_back("sahyadri.isrdc.in 51235");
505
506 // Pool of servers operated by @Xrpkuwait - https://xrpkuwait.com
507 bootstrapIps.push_back("hubs.xrpkuwait.com 51235");
508
509 // Pool of servers operated by XRPL Commons - https://xrpl-commons.org
510 bootstrapIps.push_back("hub.xrpl-commons.org 51235");
511 }
512
514 bootstrapIps,
515 [this](
516 std::string const& name,
517 std::vector<beast::IP::Endpoint> const& addresses) {
519 ips.reserve(addresses.size());
520 for (auto const& addr : addresses)
521 {
522 if (addr.port() == 0)
523 ips.push_back(to_string(addr.at_port(DEFAULT_PEER_PORT)));
524 else
525 ips.push_back(to_string(addr));
526 }
527
528 std::string const base("config: ");
529 if (!ips.empty())
530 m_peerFinder->addFallbackStrings(base + name, ips);
531 });
532
533 // Add the ips_fixed from the rippled.cfg file
535 {
538 [this](
539 std::string const& name,
540 std::vector<beast::IP::Endpoint> const& addresses) {
542 ips.reserve(addresses.size());
543
544 for (auto& addr : addresses)
545 {
546 if (addr.port() == 0)
547 ips.emplace_back(addr.address(), DEFAULT_PEER_PORT);
548 else
549 ips.emplace_back(addr);
550 }
551
552 if (!ips.empty())
553 m_peerFinder->addFixedPeer(name, ips);
554 });
555 }
556 auto const timer = std::make_shared<Timer>(*this);
558 list_.emplace(timer.get(), timer);
559 timer_ = timer;
560 timer->async_wait();
561}
562
563void
565{
566 boost::asio::dispatch(strand_, std::bind(&OverlayImpl::stopChildren, this));
567 {
568 std::unique_lock<decltype(mutex_)> lock(mutex_);
569 cond_.wait(lock, [this] { return list_.empty(); });
570 }
571 m_peerFinder->stop();
572}
573
574//------------------------------------------------------------------------------
575//
576// PropertyStream
577//
578//------------------------------------------------------------------------------
579
580void
582{
583 beast::PropertyStream::Set set("traffic", stream);
584 auto const stats = m_traffic.getCounts();
585 for (auto const& pair : stats)
586 {
588 item["category"] = pair.second.name;
589 item["bytes_in"] = std::to_string(pair.second.bytesIn.load());
590 item["messages_in"] = std::to_string(pair.second.messagesIn.load());
591 item["bytes_out"] = std::to_string(pair.second.bytesOut.load());
592 item["messages_out"] = std::to_string(pair.second.messagesOut.load());
593 }
594}
595
596//------------------------------------------------------------------------------
602void
604{
605 // Now track this peer
606 {
608 auto const result(ids_.emplace(
610 std::make_tuple(peer->id()),
611 std::make_tuple(peer)));
612 XRPL_ASSERT(
613 result.second,
614 "ripple::OverlayImpl::activate : peer ID is inserted");
615 (void)result.second;
616 }
617
618 JLOG(journal_.debug()) << "activated " << peer->getRemoteAddress() << " ("
619 << peer->id() << ":"
620 << toBase58(
621 TokenType::NodePublic, peer->getNodePublic())
622 << ")";
623
624 // We just accepted this peer so we have non-zero active peers
625 XRPL_ASSERT(size(), "ripple::OverlayImpl::activate : nonzero peers");
626}
627
628void
634
635void
638 std::shared_ptr<PeerImp> const& from)
639{
640 auto const n = m->list_size();
641 auto const& journal = from->pjournal();
642
643 protocol::TMManifests relay;
644
645 for (std::size_t i = 0; i < n; ++i)
646 {
647 auto& s = m->list().Get(i).stobject();
648
649 if (auto mo = deserializeManifest(s))
650 {
651 auto const serialized = mo->serialized;
652
653 auto const result =
654 app_.validatorManifests().applyManifest(std::move(*mo));
655
656 if (result == ManifestDisposition::accepted)
657 {
658 relay.add_list()->set_stobject(s);
659
660 // N.B.: this is important; the applyManifest call above moves
661 // the loaded Manifest out of the optional so we need to
662 // reload it here.
663 mo = deserializeManifest(serialized);
664 XRPL_ASSERT(
665 mo,
666 "ripple::OverlayImpl::onManifests : manifest "
667 "deserialization succeeded");
668
669 app_.getOPs().pubManifest(*mo);
670
671 if (app_.validators().listed(mo->masterKey))
672 {
673 auto db = app_.getWalletDB().checkoutDb();
674 addValidatorManifest(*db, serialized);
675 }
676 }
677 }
678 else
679 {
680 JLOG(journal.debug())
681 << "Malformed manifest #" << i + 1 << ": " << strHex(s);
682 continue;
683 }
684 }
685
686 if (!relay.list().empty())
687 for_each([m2 = std::make_shared<Message>(relay, protocol::mtMANIFESTS)](
688 std::shared_ptr<PeerImp>&& p) { p->send(m2); });
689}
690
691void
696
697void
708{
710 return ids_.size();
711}
712
713int
715{
716 return m_peerFinder->config().maxPeers;
717}
718
721{
722 using namespace std::chrono;
723 Json::Value jv;
724 auto& av = jv["active"] = Json::Value(Json::arrayValue);
725
727 auto& pv = av.append(Json::Value(Json::objectValue));
728 pv[jss::public_key] = base64_encode(
729 sp->getNodePublic().data(), sp->getNodePublic().size());
730 pv[jss::type] = sp->slot()->inbound() ? "in" : "out";
731 pv[jss::uptime] = static_cast<std::uint32_t>(
732 duration_cast<seconds>(sp->uptime()).count());
733 if (sp->crawl())
734 {
735 pv[jss::ip] = sp->getRemoteAddress().address().to_string();
736 if (sp->slot()->inbound())
737 {
738 if (auto port = sp->slot()->listening_port())
739 pv[jss::port] = *port;
740 }
741 else
742 {
743 pv[jss::port] = std::to_string(sp->getRemoteAddress().port());
744 }
745 }
746
747 {
748 auto version{sp->getVersion()};
749 if (!version.empty())
750 // Could move here if Json::value supported moving from strings
751 pv[jss::version] = std::string{version};
752 }
753
754 std::uint32_t minSeq, maxSeq;
755 sp->ledgerRange(minSeq, maxSeq);
756 if (minSeq != 0 || maxSeq != 0)
757 pv[jss::complete_ledgers] =
758 std::to_string(minSeq) + "-" + std::to_string(maxSeq);
759 });
760
761 return jv;
762}
763
766{
767 bool const humanReadable = false;
768 bool const admin = false;
769 bool const counters = false;
770
771 Json::Value server_info =
772 app_.getOPs().getServerInfo(humanReadable, admin, counters);
773
774 // Filter out some information
775 server_info.removeMember(jss::hostid);
776 server_info.removeMember(jss::load_factor_fee_escalation);
777 server_info.removeMember(jss::load_factor_fee_queue);
778 server_info.removeMember(jss::validation_quorum);
779
780 if (server_info.isMember(jss::validated_ledger))
781 {
782 Json::Value& validated_ledger = server_info[jss::validated_ledger];
783
784 validated_ledger.removeMember(jss::base_fee);
785 validated_ledger.removeMember(jss::reserve_base_xrp);
786 validated_ledger.removeMember(jss::reserve_inc_xrp);
787 }
788
789 return server_info;
790}
791
797
800{
801 Json::Value validators = app_.validators().getJson();
802
803 if (validators.isMember(jss::publisher_lists))
804 {
805 Json::Value& publisher_lists = validators[jss::publisher_lists];
806
807 for (auto& publisher : publisher_lists)
808 {
809 publisher.removeMember(jss::list);
810 }
811 }
812
813 validators.removeMember(jss::signing_keys);
814 validators.removeMember(jss::trusted_validator_keys);
815 validators.removeMember(jss::validation_quorum);
816
817 Json::Value validatorSites = app_.validatorSites().getJson();
818
819 if (validatorSites.isMember(jss::validator_sites))
820 {
821 validators[jss::validator_sites] =
822 std::move(validatorSites[jss::validator_sites]);
823 }
824
825 return validators;
826}
827
828// Returns information on verified peers.
831{
833 for (auto const& peer : getActivePeers())
834 {
835 json.append(peer->json());
836 }
837 return json;
838}
839
840bool
842{
843 if (req.target() != "/crawl" ||
845 return false;
846
847 boost::beast::http::response<json_body> msg;
848 msg.version(req.version());
849 msg.result(boost::beast::http::status::ok);
850 msg.insert("Server", BuildInfo::getFullVersionString());
851 msg.insert("Content-Type", "application/json");
852 msg.insert("Connection", "close");
853 msg.body()["version"] = Json::Value(2u);
854
856 {
857 msg.body()["overlay"] = getOverlayInfo();
858 }
860 {
861 msg.body()["server"] = getServerInfo();
862 }
864 {
865 msg.body()["counts"] = getServerCounts();
866 }
868 {
869 msg.body()["unl"] = getUnlInfo();
870 }
871
872 msg.prepare_payload();
874 return true;
875}
876
877bool
879 http_request_type const& req,
880 Handoff& handoff)
881{
882 // If the target is in the form "/vl/<validator_list_public_key>",
883 // return the most recent validator list for that key.
884 constexpr std::string_view prefix("/vl/");
885
886 if (!req.target().starts_with(prefix.data()) || !setup_.vlEnabled)
887 return false;
888
889 std::uint32_t version = 1;
890
891 boost::beast::http::response<json_body> msg;
892 msg.version(req.version());
893 msg.insert("Server", BuildInfo::getFullVersionString());
894 msg.insert("Content-Type", "application/json");
895 msg.insert("Connection", "close");
896
897 auto fail = [&msg, &handoff](auto status) {
898 msg.result(status);
899 msg.insert("Content-Length", "0");
900
901 msg.body() = Json::nullValue;
902
903 msg.prepare_payload();
905 return true;
906 };
907
908 std::string_view key = req.target().substr(prefix.size());
909
910 if (auto slash = key.find('/'); slash != std::string_view::npos)
911 {
912 auto verString = key.substr(0, slash);
913 if (!boost::conversion::try_lexical_convert(verString, version))
914 return fail(boost::beast::http::status::bad_request);
915 key = key.substr(slash + 1);
916 }
917
918 if (key.empty())
919 return fail(boost::beast::http::status::bad_request);
920
921 // find the list
922 auto vl = app_.validators().getAvailable(key, version);
923
924 if (!vl)
925 {
926 // 404 not found
927 return fail(boost::beast::http::status::not_found);
928 }
929 else if (!*vl)
930 {
931 return fail(boost::beast::http::status::bad_request);
932 }
933 else
934 {
935 msg.result(boost::beast::http::status::ok);
936
937 msg.body() = *vl;
938
939 msg.prepare_payload();
941 return true;
942 }
943}
944
945bool
947{
948 if (req.target() != "/health")
949 return false;
950 boost::beast::http::response<json_body> msg;
951 msg.version(req.version());
952 msg.insert("Server", BuildInfo::getFullVersionString());
953 msg.insert("Content-Type", "application/json");
954 msg.insert("Connection", "close");
955
956 auto info = getServerInfo();
957
958 int last_validated_ledger_age = -1;
959 if (info.isMember(jss::validated_ledger))
960 last_validated_ledger_age =
961 info[jss::validated_ledger][jss::age].asInt();
962 bool amendment_blocked = false;
963 if (info.isMember(jss::amendment_blocked))
964 amendment_blocked = true;
965 int number_peers = info[jss::peers].asInt();
966 std::string server_state = info[jss::server_state].asString();
967 auto load_factor = info[jss::load_factor_server].asDouble() /
968 info[jss::load_base].asDouble();
969
970 enum { healthy, warning, critical };
971 int health = healthy;
972 auto set_health = [&health](int state) {
973 if (health < state)
974 health = state;
975 };
976
977 msg.body()[jss::info] = Json::objectValue;
978 if (last_validated_ledger_age >= 7 || last_validated_ledger_age < 0)
979 {
980 msg.body()[jss::info][jss::validated_ledger] =
981 last_validated_ledger_age;
982 if (last_validated_ledger_age < 20)
983 set_health(warning);
984 else
985 set_health(critical);
986 }
987
988 if (amendment_blocked)
989 {
990 msg.body()[jss::info][jss::amendment_blocked] = true;
991 set_health(critical);
992 }
993
994 if (number_peers <= 7)
995 {
996 msg.body()[jss::info][jss::peers] = number_peers;
997 if (number_peers != 0)
998 set_health(warning);
999 else
1000 set_health(critical);
1001 }
1002
1003 if (!(server_state == "full" || server_state == "validating" ||
1004 server_state == "proposing"))
1005 {
1006 msg.body()[jss::info][jss::server_state] = server_state;
1007 if (server_state == "syncing" || server_state == "tracking" ||
1008 server_state == "connected")
1009 {
1010 set_health(warning);
1011 }
1012 else
1013 set_health(critical);
1014 }
1015
1016 if (load_factor > 100)
1017 {
1018 msg.body()[jss::info][jss::load_factor] = load_factor;
1019 if (load_factor < 1000)
1020 set_health(warning);
1021 else
1022 set_health(critical);
1023 }
1024
1025 switch (health)
1026 {
1027 case healthy:
1028 msg.result(boost::beast::http::status::ok);
1029 break;
1030 case warning:
1031 msg.result(boost::beast::http::status::service_unavailable);
1032 break;
1033 case critical:
1034 msg.result(boost::beast::http::status::internal_server_error);
1035 break;
1036 }
1037
1038 msg.prepare_payload();
1040 return true;
1041}
1042
1043bool
1045{
1046 // Take advantage of || short-circuiting
1047 return processCrawl(req, handoff) || processValidatorList(req, handoff) ||
1048 processHealth(req, handoff);
1049}
1050
1053{
1055 ret.reserve(size());
1056
1057 for_each([&ret](std::shared_ptr<PeerImp>&& sp) {
1058 ret.emplace_back(std::move(sp));
1059 });
1060
1061 return ret;
1062}
1063
1066 std::set<Peer::id_t> const& toSkip,
1067 std::size_t& active,
1068 std::size_t& disabled,
1069 std::size_t& enabledInSkip) const
1070{
1072 std::lock_guard lock(mutex_);
1073
1074 active = ids_.size();
1075 disabled = enabledInSkip = 0;
1076 ret.reserve(ids_.size());
1077
1078 // NOTE The purpose of p is to delay the destruction of PeerImp
1080 for (auto& [id, w] : ids_)
1081 {
1082 if (p = w.lock(); p != nullptr)
1083 {
1084 bool const reduceRelayEnabled = p->txReduceRelayEnabled();
1085 // tx reduced relay feature disabled
1086 if (!reduceRelayEnabled)
1087 ++disabled;
1088
1089 if (toSkip.count(id) == 0)
1090 ret.emplace_back(std::move(p));
1091 else if (reduceRelayEnabled)
1092 ++enabledInSkip;
1093 }
1094 }
1095
1096 return ret;
1097}
1098
1099void
1101{
1102 for_each(
1103 [index](std::shared_ptr<PeerImp>&& sp) { sp->checkTracking(index); });
1104}
1105
1108{
1109 std::lock_guard lock(mutex_);
1110 auto const iter = ids_.find(id);
1111 if (iter != ids_.end())
1112 return iter->second.lock();
1113 return {};
1114}
1115
1116// A public key hash map was not used due to the peer connect/disconnect
1117// update overhead outweighing the performance of a small set linear search.
1120{
1121 std::lock_guard lock(mutex_);
1122 // NOTE The purpose of peer is to delay the destruction of PeerImp
1124 for (auto const& e : ids_)
1125 {
1126 if (peer = e.second.lock(); peer != nullptr)
1127 {
1128 if (peer->getNodePublic() == pubKey)
1129 return peer;
1130 }
1131 }
1132 return {};
1133}
1134
1135void
1136OverlayImpl::broadcast(protocol::TMProposeSet& m)
1137{
1138 auto const sm = std::make_shared<Message>(m, protocol::mtPROPOSE_LEDGER);
1139 for_each([&](std::shared_ptr<PeerImp>&& p) { p->send(sm); });
1140}
1141
1144 protocol::TMProposeSet& m,
1145 uint256 const& uid,
1146 PublicKey const& validator)
1147{
1148 if (auto const toSkip = app_.getHashRouter().shouldRelay(uid))
1149 {
1150 auto const sm =
1151 std::make_shared<Message>(m, protocol::mtPROPOSE_LEDGER, validator);
1153 if (toSkip->find(p->id()) == toSkip->end())
1154 p->send(sm);
1155 });
1156 return *toSkip;
1157 }
1158 return {};
1159}
1160
1161void
1162OverlayImpl::broadcast(protocol::TMValidation& m)
1163{
1164 auto const sm = std::make_shared<Message>(m, protocol::mtVALIDATION);
1165 for_each([sm](std::shared_ptr<PeerImp>&& p) { p->send(sm); });
1166}
1167
1170 protocol::TMValidation& m,
1171 uint256 const& uid,
1172 PublicKey const& validator)
1173{
1174 if (auto const toSkip = app_.getHashRouter().shouldRelay(uid))
1175 {
1176 auto const sm =
1177 std::make_shared<Message>(m, protocol::mtVALIDATION, validator);
1179 if (toSkip->find(p->id()) == toSkip->end())
1180 p->send(sm);
1181 });
1182 return *toSkip;
1183 }
1184 return {};
1185}
1186
1189{
1191
1192 if (auto seq = app_.validatorManifests().sequence();
1193 seq != manifestListSeq_)
1194 {
1195 protocol::TMManifests tm;
1196
1198 [&tm](std::size_t s) { tm.mutable_list()->Reserve(s); },
1199 [&tm, &hr = app_.getHashRouter()](Manifest const& manifest) {
1200 tm.add_list()->set_stobject(
1201 manifest.serialized.data(), manifest.serialized.size());
1202 hr.addSuppression(manifest.hash());
1203 });
1204
1206
1207 if (tm.list_size() != 0)
1209 std::make_shared<Message>(tm, protocol::mtMANIFESTS);
1210
1211 manifestListSeq_ = seq;
1212 }
1213
1214 return manifestMessage_;
1215}
1216
1217void
1219 uint256 const& hash,
1221 std::set<Peer::id_t> const& toSkip)
1222{
1223 bool relay = tx.has_value();
1224 if (relay)
1225 {
1226 auto& txn = tx->get();
1227 SerialIter sit(makeSlice(txn.rawtransaction()));
1228 relay = !isPseudoTx(STTx{sit});
1229 }
1230
1231 Overlay::PeerSequence peers = {};
1232 std::size_t total = 0;
1233 std::size_t disabled = 0;
1234 std::size_t enabledInSkip = 0;
1235
1236 if (!relay)
1237 {
1239 return;
1240
1241 peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
1242 JLOG(journal_.trace())
1243 << "not relaying tx, total peers " << peers.size();
1244 for (auto const& p : peers)
1245 p->addTxQueue(hash);
1246 return;
1247 }
1248
1249 auto& txn = tx->get();
1250 auto const sm = std::make_shared<Message>(txn, protocol::mtTRANSACTION);
1251 peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
1252 auto const minRelay = app_.config().TX_REDUCE_RELAY_MIN_PEERS + disabled;
1253
1254 if (!app_.config().TX_REDUCE_RELAY_ENABLE || total <= minRelay)
1255 {
1256 for (auto const& p : peers)
1257 p->send(sm);
1260 txMetrics_.addMetrics(total, toSkip.size(), 0);
1261 return;
1262 }
1263
1264 // We have more peers than the minimum (disabled + minimum enabled),
1265 // relay to all disabled and some randomly selected enabled that
1266 // do not have the transaction.
1267 auto const enabledTarget = app_.config().TX_REDUCE_RELAY_MIN_PEERS +
1268 (total - minRelay) * app_.config().TX_RELAY_PERCENTAGE / 100;
1269
1270 txMetrics_.addMetrics(enabledTarget, toSkip.size(), disabled);
1271
1272 if (enabledTarget > enabledInSkip)
1273 std::shuffle(peers.begin(), peers.end(), default_prng());
1274
1275 JLOG(journal_.trace()) << "relaying tx, total peers " << peers.size()
1276 << " selected " << enabledTarget << " skip "
1277 << toSkip.size() << " disabled " << disabled;
1278
1279 // count skipped peers with the enabled feature towards the quota
1280 std::uint16_t enabledAndRelayed = enabledInSkip;
1281 for (auto const& p : peers)
1282 {
1283 // always relay to a peer with the disabled feature
1284 if (!p->txReduceRelayEnabled())
1285 {
1286 p->send(sm);
1287 }
1288 else if (enabledAndRelayed < enabledTarget)
1289 {
1290 enabledAndRelayed++;
1291 p->send(sm);
1292 }
1293 else
1294 {
1295 p->addTxQueue(hash);
1296 }
1297 }
1298}
1299
1300//------------------------------------------------------------------------------
1301
1302void
1304{
1305 std::lock_guard lock(mutex_);
1306 list_.erase(&child);
1307 if (list_.empty())
1308 cond_.notify_all();
1309}
1310
1311void
1313{
1314 // Calling list_[].second->stop() may cause list_ to be modified
1315 // (OverlayImpl::remove() may be called on this same thread). So
1316 // iterating directly over list_ to call child->stop() could lead to
1317 // undefined behavior.
1318 //
1319 // Therefore we copy all of the weak/shared ptrs out of list_ before we
1320 // start calling stop() on them. That guarantees OverlayImpl::remove()
1321 // won't be called until vector<> children leaves scope.
1323 {
1324 std::lock_guard lock(mutex_);
1325 if (!work_)
1326 return;
1328
1329 children.reserve(list_.size());
1330 for (auto const& element : list_)
1331 {
1332 children.emplace_back(element.second.lock());
1333 }
1334 } // lock released
1335
1336 for (auto const& child : children)
1337 {
1338 if (child != nullptr)
1339 child->stop();
1340 }
1341}
1342
1343void
1345{
1346 auto const result = m_peerFinder->autoconnect();
1347 for (auto addr : result)
1348 connect(addr);
1349}
1350
1351void
1353{
1354 auto const result = m_peerFinder->buildEndpointsForPeers();
1355 for (auto const& e : result)
1356 {
1358 {
1359 std::lock_guard lock(mutex_);
1360 auto const iter = m_peers.find(e.first);
1361 if (iter != m_peers.end())
1362 peer = iter->second.lock();
1363 }
1364 if (peer)
1365 peer->sendEndpoints(e.second.begin(), e.second.end());
1366 }
1367}
1368
1369void
1371{
1372 for_each([](auto const& p) {
1373 if (p->txReduceRelayEnabled())
1374 p->sendTxQueue();
1375 });
1376}
1377
1380 PublicKey const& validator,
1381 bool squelch,
1382 uint32_t squelchDuration)
1383{
1384 protocol::TMSquelch m;
1385 m.set_squelch(squelch);
1386 m.set_validatorpubkey(validator.data(), validator.size());
1387 if (squelch)
1388 m.set_squelchduration(squelchDuration);
1389 return std::make_shared<Message>(m, protocol::mtSQUELCH);
1390}
1391
1392void
1394{
1395 if (auto peer = findPeerByShortID(id); peer)
1396 {
1397 // optimize - multiple message with different
1398 // validator might be sent to the same peer
1399 peer->send(makeSquelchMessage(validator, false, 0));
1400 }
1401}
1402
1403void
1405 PublicKey const& validator,
1406 Peer::id_t id,
1407 uint32_t squelchDuration) const
1408{
1409 if (auto peer = findPeerByShortID(id); peer)
1410 {
1411 peer->send(makeSquelchMessage(validator, true, squelchDuration));
1412 }
1413}
1414
1415void
1417 uint256 const& key,
1418 PublicKey const& validator,
1419 std::set<Peer::id_t>&& peers,
1420 protocol::MessageType type)
1421{
1422 if (!slots_.baseSquelchReady())
1423 return;
1424
1425 if (!strand_.running_in_this_thread())
1426 return post(
1427 strand_,
1428 // Must capture copies of reference parameters (i.e. key, validator)
1429 [this,
1430 key = key,
1431 validator = validator,
1432 peers = std::move(peers),
1433 type]() mutable {
1434 updateSlotAndSquelch(key, validator, std::move(peers), type);
1435 });
1436
1437 for (auto id : peers)
1438 slots_.updateSlotAndSquelch(key, validator, id, type, [&]() {
1440 });
1441}
1442
1443void
1445 uint256 const& key,
1446 PublicKey const& validator,
1447 Peer::id_t peer,
1448 protocol::MessageType type)
1449{
1450 if (!slots_.baseSquelchReady())
1451 return;
1452
1453 if (!strand_.running_in_this_thread())
1454 return post(
1455 strand_,
1456 // Must capture copies of reference parameters (i.e. key, validator)
1457 [this, key = key, validator = validator, peer, type]() {
1458 updateSlotAndSquelch(key, validator, peer, type);
1459 });
1460
1461 slots_.updateSlotAndSquelch(key, validator, peer, type, [&]() {
1463 });
1464}
1465
1466void
1468{
1469 if (!strand_.running_in_this_thread())
1470 return post(strand_, std::bind(&OverlayImpl::deletePeer, this, id));
1471
1472 slots_.deletePeer(id, true);
1473}
1474
1475void
1477{
1478 if (!strand_.running_in_this_thread())
1479 return post(strand_, std::bind(&OverlayImpl::deleteIdlePeers, this));
1480
1481 slots_.deleteIdlePeers();
1482}
1483
1484//------------------------------------------------------------------------------
1485
1488{
1489 Overlay::Setup setup;
1490
1491 {
1492 auto const& section = config.section("overlay");
1493 setup.context = make_SSLContext("");
1494
1495 set(setup.ipLimit, "ip_limit", section);
1496 if (setup.ipLimit < 0)
1497 Throw<std::runtime_error>("Configured IP limit is invalid");
1498
1499 std::string ip;
1500 set(ip, "public_ip", section);
1501 if (!ip.empty())
1502 {
1503 boost::system::error_code ec;
1504 setup.public_ip = boost::asio::ip::make_address(ip, ec);
1505 if (ec || beast::IP::is_private(setup.public_ip))
1506 Throw<std::runtime_error>("Configured public IP is invalid");
1507 }
1508 }
1509
1510 {
1511 auto const& section = config.section("crawl");
1512 auto const& values = section.values();
1513
1514 if (values.size() > 1)
1515 {
1516 Throw<std::runtime_error>(
1517 "Configured [crawl] section is invalid, too many values");
1518 }
1519
1520 bool crawlEnabled = true;
1521
1522 // Only allow "0|1" as a value
1523 if (values.size() == 1)
1524 {
1525 try
1526 {
1527 crawlEnabled = boost::lexical_cast<bool>(values.front());
1528 }
1529 catch (boost::bad_lexical_cast const&)
1530 {
1531 Throw<std::runtime_error>(
1532 "Configured [crawl] section has invalid value: " +
1533 values.front());
1534 }
1535 }
1536
1537 if (crawlEnabled)
1538 {
1539 if (get<bool>(section, "overlay", true))
1540 {
1542 }
1543 if (get<bool>(section, "server", true))
1544 {
1546 }
1547 if (get<bool>(section, "counts", false))
1548 {
1550 }
1551 if (get<bool>(section, "unl", true))
1552 {
1554 }
1555 }
1556 }
1557 {
1558 auto const& section = config.section("vl");
1559
1560 set(setup.vlEnabled, "enabled", section);
1561 }
1562
1563 try
1564 {
1565 auto id = config.legacy("network_id");
1566
1567 if (!id.empty())
1568 {
1569 if (id == "main")
1570 id = "0";
1571
1572 if (id == "testnet")
1573 id = "1";
1574
1575 if (id == "devnet")
1576 id = "2";
1577
1578 setup.networkID = beast::lexicalCastThrow<std::uint32_t>(id);
1579 }
1580 }
1581 catch (...)
1582 {
1583 Throw<std::runtime_error>(
1584 "Configured [network_id] section is invalid: must be a number "
1585 "or one of the strings 'main', 'testnet' or 'devnet'.");
1586 }
1587
1588 return setup;
1589}
1590
1593 Application& app,
1594 Overlay::Setup const& setup,
1595 ServerHandler& serverHandler,
1596 Resource::Manager& resourceManager,
1597 Resolver& resolver,
1598 boost::asio::io_context& io_context,
1599 BasicConfig const& config,
1600 beast::insight::Collector::ptr const& collector)
1601{
1603 app,
1604 setup,
1605 serverHandler,
1606 resourceManager,
1607 resolver,
1608 io_context,
1609 config,
1610 collector);
1611}
1612
1613} // namespace ripple
T begin(T... args)
T bind(T... args)
Represents a JSON value.
Definition json_value.h:149
Value & append(Value const &value)
Append value to array at the end.
Value removeMember(char const *key)
Remove and return the named member.
bool isMember(char const *key) const
Return true if the object has a member named key.
A version-independent IP address and port combination.
Definition IPEndpoint.h:38
A generic endpoint for log messages.
Definition Journal.h:60
Stream debug() const
Definition Journal.h:328
Stream info() const
Definition Journal.h:334
Stream trace() const
Severity stream access functions.
Definition Journal.h:322
std::string const & name() const
Returns the name of this source.
void add(Source &source)
Add a child source.
Wraps a Journal::Sink to prefix its output with a string.
Definition WrappedSink.h:34
virtual Config & config()=0
virtual beast::Journal journal(std::string const &name)=0
virtual ValidatorSite & validatorSites()=0
virtual DatabaseCon & getWalletDB()=0
Retrieve the "wallet database".
virtual NetworkOPs & getOPs()=0
virtual ValidatorList & validators()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual ManifestCache & validatorManifests()=0
virtual PeerReservationTable & peerReservations()=0
virtual Cluster & cluster()=0
virtual Logs & logs()=0
virtual HashRouter & getHashRouter()=0
Holds unparsed configuration information.
Section & section(std::string const &name)
Returns the section with the given name.
void legacy(std::string const &section, std::string value)
Set a value that is not a key/value pair.
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
Definition Cluster.cpp:38
std::vector< std::string > IPS_FIXED
Definition Config.h:144
std::vector< std::string > IPS
Definition Config.h:141
bool standalone() const
Definition Config.h:336
std::size_t TX_REDUCE_RELAY_MIN_PEERS
Definition Config.h:268
bool TX_REDUCE_RELAY_ENABLE
Definition Config.h:258
bool TX_REDUCE_RELAY_METRICS
Definition Config.h:265
std::size_t TX_RELAY_PERCENTAGE
Definition Config.h:271
LockedSociSession checkoutDb()
std::optional< std::set< PeerShortID > > shouldRelay(uint256 const &key)
Determines whether the hashed item should be relayed.
virtual void pubManifest(Manifest const &)=0
std::uint32_t sequence() const
A monotonically increasing number used to detect new manifests.
Definition Manifest.h:278
void for_each_manifest(Function &&f) const
Invokes the callback once for every populated manifest.
Definition Manifest.h:425
ManifestDisposition applyManifest(Manifest m)
Add manifest to cache.
Definition Manifest.cpp:383
virtual Json::Value getServerInfo(bool human, bool admin, bool counters)=0
Child(OverlayImpl &overlay)
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > work_
boost::system::error_code error_code
Definition OverlayImpl.h:84
Json::Value getUnlInfo()
Returns information about the local server's UNL.
void stop() override
static std::string makePrefix(std::uint32_t id)
PeerFinder::Manager & peerFinder()
boost::asio::ip::tcp::endpoint endpoint_type
Definition OverlayImpl.h:83
bool processHealth(http_request_type const &req, Handoff &handoff)
Handles health requests.
boost::asio::ip::address address_type
Definition OverlayImpl.h:82
boost::asio::io_context & io_context_
static bool is_upgrade(boost::beast::http::header< true, Fields > const &req)
std::condition_variable_any cond_
void onWrite(beast::PropertyStream::Map &stream) override
Subclass override.
void deleteIdlePeers()
Check if peers stopped relaying messages and if slots stopped receiving messages from the validator.
Resolver & m_resolver
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
PeerSequence getActivePeers() const override
Returns a sequence representing the current list of peers.
void start() override
hash_map< std::shared_ptr< PeerFinder::Slot >, std::weak_ptr< PeerImp > > m_peers
void add_active(std::shared_ptr< PeerImp > const &peer)
std::shared_ptr< Peer > findPeerByPublicKey(PublicKey const &pubKey) override
Returns the peer with the matching public key, or null.
Resource::Manager & m_resourceManager
std::shared_ptr< Message > manifestMessage_
std::optional< std::uint32_t > manifestListSeq_
TrafficCount m_traffic
void squelch(PublicKey const &validator, Peer::id_t const id, std::uint32_t squelchDuration) const override
Squelch handler.
std::shared_ptr< Writer > makeErrorResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remote_address, std::string msg)
reduce_relay::Slots< UptimeClock > slots_
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
std::shared_ptr< Peer > findPeerByShortID(Peer::id_t const &id) const override
Returns the peer with the matching short id, or null.
std::atomic< Peer::id_t > next_id_
Application & app_
std::weak_ptr< Timer > timer_
metrics::TxMetrics txMetrics_
void broadcast(protocol::TMProposeSet &m) override
Broadcast a proposal.
void onPeerDeactivate(Peer::id_t id)
std::mutex manifestLock_
bool processRequest(http_request_type const &req, Handoff &handoff)
Handles non-peer protocol requests.
std::recursive_mutex mutex_
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
OverlayImpl(Application &app, Setup const &setup, ServerHandler &serverHandler, Resource::Manager &resourceManager, Resolver &resolver, boost::asio::io_context &io_context, BasicConfig const &config, beast::insight::Collector::ptr const &collector)
void sendTxQueue()
Send once a second transactions' hashes aggregated by peers.
void reportOutboundTraffic(TrafficCount::category cat, int bytes)
std::set< Peer::id_t > relay(protocol::TMProposeSet &m, uint256 const &uid, PublicKey const &validator) override
Relay a proposal.
std::size_t size() const override
The number of active peers on the network Active peers are only those peers that have completed the h...
boost::asio::strand< boost::asio::io_context::executor_type > strand_
void unsquelch(PublicKey const &validator, Peer::id_t id) const override
Unsquelch handler.
std::shared_ptr< Writer > makeRedirectResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remote_address)
void for_each(UnaryFunc &&f) const
Json::Value getOverlayInfo()
Returns information about peers on the overlay network.
Resource::Manager & resourceManager()
static bool isPeerUpgrade(http_request_type const &request)
Json::Value getServerCounts()
Returns information about the local server's performance counters.
void reportInboundTraffic(TrafficCount::category cat, int bytes)
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
std::unique_ptr< PeerFinder::Manager > m_peerFinder
void connect(beast::IP::Endpoint const &remote_endpoint) override
Establish a peer connection to the specified endpoint.
Handoff onHandoff(std::unique_ptr< stream_type > &&bundle, http_request_type &&request, endpoint_type remote_endpoint) override
Conditionally accept an incoming HTTP request.
Setup const & setup() const
std::shared_ptr< Message > getManifestsMessage()
hash_map< Peer::id_t, std::weak_ptr< PeerImp > > ids_
Json::Value getServerInfo()
Returns information about the local server.
bool processValidatorList(http_request_type const &req, Handoff &handoff)
Handles validator list requests.
Json::Value json() override
Return diagnostics on the status of all peers.
void checkTracking(std::uint32_t) override
Calls the checkTracking function on each peer.
ServerHandler & serverHandler_
bool processCrawl(http_request_type const &req, Handoff &handoff)
Handles crawl requests.
int limit() override
Returns the maximum number of peers we are configured to allow.
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, std::set< Peer::id_t > &&peers, protocol::MessageType type)
Updates message count for validator/peer.
beast::Journal const journal_
boost::container::flat_map< Child *, std::weak_ptr< Child > > list_
Manages the set of connected peers.
Definition Overlay.h:49
virtual std::shared_ptr< Slot > new_outbound_slot(beast::IP::Endpoint const &remote_endpoint)=0
Create a new outbound slot with the specified remote endpoint.
bool contains(PublicKey const &nodeId)
A public key.
Definition PublicKey.h:61
void resolve(std::vector< std::string > const &names, Handler handler)
resolve all hostnames on the list
Definition Resolver.h:57
Tracks load and resource consumption.
virtual Consumer newInboundEndpoint(beast::IP::Endpoint const &address)=0
Create a new endpoint keyed by inbound IP address or the forwarded IP if proxied.
virtual Consumer newOutboundEndpoint(beast::IP::Endpoint const &address)=0
Create a new endpoint keyed by outbound IP address and port.
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition BasicConfig.h:79
void setup(Setup const &setup, beast::Journal journal)
auto const & getCounts() const
An up-to-date copy of all the counters.
void addCount(category cat, bool inbound, int bytes)
Account for traffic associated with the given category.
bool listed(PublicKey const &identity) const
Returns true if public key is included on any lists.
std::optional< Json::Value > getAvailable(std::string_view pubKey, std::optional< std::uint32_t > forceVersion={})
Returns the current valid list for the given publisher key, if available, as a Json object.
Json::Value getJson() const
Return a JSON representation of the state of the validator list.
Json::Value getJson() const
Return JSON representation of configured validator sites.
T count(T... args)
T data(T... args)
T emplace_back(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T find_if(T... args)
T get(T... args)
T is_same_v
T make_tuple(T... args)
@ nullValue
'null' value
Definition json_value.h:38
@ arrayValue
array value (ordered list)
Definition json_value.h:44
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:45
bool is_private(Address const &addr)
Returns true if the address is a private unroutable address.
Definition IPAddress.h:71
Result split_commas(FwdIt first, FwdIt last)
Definition rfc2616.h:199
bool is_keep_alive(boost::beast::http::message< isRequest, Body, Fields > const &m)
Definition rfc2616.h:386
std::string const & getFullVersionString()
Full server version string.
Definition BuildInfo.cpp:81
@ checkIdlePeers
How often we check for idle peers (seconds)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
std::optional< ProtocolVersion > negotiateProtocolVersion(std::vector< ProtocolVersion > const &versions)
Given a list of supported protocol versions, choose the one we prefer.
std::optional< Manifest > deserializeManifest(Slice s, beast::Journal journal)
Constructs Manifest from serialized string.
Definition Manifest.cpp:54
std::vector< ProtocolVersion > parseProtocolVersions(boost::beast::string_view const &value)
Parse a set of protocol versions.
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
Definition STTx.cpp:820
void addValidatorManifest(soci::session &session, std::string const &serialized)
addValidatorManifest Saves the manifest of a validator to the database.
Definition Wallet.cpp:119
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
std::shared_ptr< boost::asio::ssl::context > make_SSLContext(std::string const &cipherList)
Create a self-signed SSL context that allows anonymous Diffie Hellman.
std::shared_ptr< Message > makeSquelchMessage(PublicKey const &validator, bool squelch, uint32_t squelchDuration)
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:30
@ accepted
Manifest is valid.
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:244
std::string base64_encode(std::uint8_t const *data, std::size_t len)
Stopwatch & stopwatch()
Returns an instance of a wall clock.
Definition chrono.h:119
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:33
Json::Value getCountsJson(Application &app, int minObjectCount)
Definition GetCounts.cpp:62
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:630
PublicKey verifyHandshake(boost::beast::http::fields const &headers, ripple::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address public_ip, beast::IP::Address remote, Application &app)
Validate header fields necessary for upgrading the link to the peer protocol.
std::unique_ptr< Overlay > make_Overlay(Application &app, Overlay::Setup const &setup, ServerHandler &serverHandler, Resource::Manager &resourceManager, Resolver &resolver, boost::asio::io_context &io_context, BasicConfig const &config, beast::insight::Collector::ptr const &collector)
Creates the implementation of Overlay.
@ manifest
Manifest.
Overlay::Setup setup_Overlay(BasicConfig const &config)
constexpr Number squelch(Number const &x, Number const &limit) noexcept
Definition Number.h:363
beast::xor_shift_engine & default_prng()
Return the default random engine.
STL namespace.
T piecewise_construct
T push_back(T... args)
T shuffle(T... args)
T reserve(T... args)
T reset(T... args)
T setfill(T... args)
T setw(T... args)
T size(T... args)
T str(T... args)
static boost::asio::ip::tcp::endpoint to_asio_endpoint(IP::Endpoint const &address)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
Used to indicate the result of a server connection handoff.
Definition Handoff.h:40
bool keep_alive
Definition Handoff.h:46
std::shared_ptr< Writer > response
Definition Handoff.h:49
void on_timer(error_code ec)
Timer(OverlayImpl &overlay)
beast::IP::Address public_ip
Definition Overlay.h:69
std::uint32_t crawlOptions
Definition Overlay.h:71
std::shared_ptr< boost::asio::ssl::context > context
Definition Overlay.h:68
std::optional< std::uint32_t > networkID
Definition Overlay.h:72
PeerFinder configuration settings.
static Config makeConfig(ripple::Config const &config, std::uint16_t port, bool validationPublicKey, int ipLimit)
Make PeerFinder::Config from configuration parameters.
void addMetrics(protocol::MessageType type, std::uint32_t val)
Add protocol message metrics.
Definition TxMetrics.cpp:31
T substr(T... args)
T to_string(T... args)
T what(T... args)