rippled
Loading...
Searching...
No Matches
OverlayImpl.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/app/misc/HashRouter.h>
21#include <xrpld/app/misc/NetworkOPs.h>
22#include <xrpld/app/misc/ValidatorList.h>
23#include <xrpld/app/misc/ValidatorSite.h>
24#include <xrpld/app/rdb/RelationalDatabase.h>
25#include <xrpld/app/rdb/Wallet.h>
26#include <xrpld/overlay/Cluster.h>
27#include <xrpld/overlay/detail/ConnectAttempt.h>
28#include <xrpld/overlay/detail/PeerImp.h>
29#include <xrpld/overlay/detail/TrafficCount.h>
30#include <xrpld/overlay/detail/Tuning.h>
31#include <xrpld/overlay/predicates.h>
32#include <xrpld/peerfinder/make_Manager.h>
33#include <xrpld/rpc/handlers/GetCounts.h>
34#include <xrpld/rpc/json_body.h>
35
36#include <xrpl/basics/base64.h>
37#include <xrpl/basics/make_SSLContext.h>
38#include <xrpl/basics/random.h>
39#include <xrpl/beast/core/LexicalCast.h>
40#include <xrpl/protocol/STTx.h>
41#include <xrpl/server/SimpleWriter.h>
42
43#include <boost/algorithm/string/predicate.hpp>
44
45namespace ripple {
46
47namespace CrawlOptions {
48enum {
50 Overlay = (1 << 0),
51 ServerInfo = (1 << 1),
52 ServerCounts = (1 << 2),
53 Unl = (1 << 3)
54};
55}
56
57//------------------------------------------------------------------------------
58
59OverlayImpl::Child::Child(OverlayImpl& overlay) : overlay_(overlay)
60{
61}
62
64{
65 overlay_.remove(*this);
66}
67
68//------------------------------------------------------------------------------
69
71 : Child(overlay), timer_(overlay_.io_service_)
72{
73}
74
75void
77{
78 // This method is only ever called from the same strand that calls
79 // Timer::on_timer, ensuring they never execute concurrently.
80 stopping_ = true;
81 timer_.cancel();
82}
83
84void
86{
87 timer_.expires_after(std::chrono::seconds(1));
88 timer_.async_wait(overlay_.strand_.wrap(std::bind(
89 &Timer::on_timer, shared_from_this(), std::placeholders::_1)));
90}
91
92void
94{
95 if (ec || stopping_)
96 {
97 if (ec && ec != boost::asio::error::operation_aborted)
98 {
99 JLOG(overlay_.journal_.error()) << "on_timer: " << ec.message();
100 }
101 return;
102 }
103
104 overlay_.m_peerFinder->once_per_second();
105 overlay_.sendEndpoints();
106 overlay_.autoConnect();
107 if (overlay_.app_.config().TX_REDUCE_RELAY_ENABLE)
108 overlay_.sendTxQueue();
109
110 if ((++overlay_.timer_count_ % Tuning::checkIdlePeers) == 0)
111 overlay_.deleteIdlePeers();
112
113 async_wait();
114}
115
116//------------------------------------------------------------------------------
117
119 Application& app,
120 Setup const& setup,
121 ServerHandler& serverHandler,
123 Resolver& resolver,
124 boost::asio::io_service& io_service,
125 BasicConfig const& config,
126 beast::insight::Collector::ptr const& collector)
127 : app_(app)
128 , io_service_(io_service)
129 , work_(std::in_place, std::ref(io_service_))
131 , setup_(setup)
132 , journal_(app_.journal("Overlay"))
133 , serverHandler_(serverHandler)
135 , m_peerFinder(PeerFinder::make_Manager(
136 io_service,
137 stopwatch(),
138 app_.journal("PeerFinder"),
139 config,
140 collector))
141 , m_resolver(resolver)
142 , next_id_(1)
143 , timer_count_(0)
144 , slots_(app.logs(), *this, app.config())
145 , m_stats(
146 std::bind(&OverlayImpl::collect_metrics, this),
147 collector,
148 [counts = m_traffic.getCounts(), collector]() {
150
151 for (auto const& pair : counts)
152 ret.emplace(
153 pair.first, TrafficGauges(pair.second.name, collector));
154
155 return ret;
156 }())
157{
159}
160
161Handoff
163 std::unique_ptr<stream_type>&& stream_ptr,
164 http_request_type&& request,
165 endpoint_type remote_endpoint)
166{
167 auto const id = next_id_++;
168 beast::WrappedSink sink(app_.logs()["Peer"], makePrefix(id));
169 beast::Journal journal(sink);
170
171 Handoff handoff;
172 if (processRequest(request, handoff))
173 return handoff;
174 if (!isPeerUpgrade(request))
175 return handoff;
176
177 handoff.moved = true;
178
179 JLOG(journal.debug()) << "Peer connection upgrade from " << remote_endpoint;
180
181 error_code ec;
182 auto const local_endpoint(
183 stream_ptr->next_layer().socket().local_endpoint(ec));
184 if (ec)
185 {
186 JLOG(journal.debug()) << remote_endpoint << " failed: " << ec.message();
187 return handoff;
188 }
189
192 if (consumer.disconnect(journal))
193 return handoff;
194
195 auto const slot = m_peerFinder->new_inbound_slot(
198
199 if (slot == nullptr)
200 {
201 // self-connect, close
202 handoff.moved = false;
203 return handoff;
204 }
205
206 // Validate HTTP request
207
208 {
209 auto const types = beast::rfc2616::split_commas(request["Connect-As"]);
210 if (std::find_if(types.begin(), types.end(), [](std::string const& s) {
211 return boost::iequals(s, "peer");
212 }) == types.end())
213 {
214 handoff.moved = false;
215 handoff.response =
216 makeRedirectResponse(slot, request, remote_endpoint.address());
217 handoff.keep_alive = beast::rfc2616::is_keep_alive(request);
218 return handoff;
219 }
220 }
221
222 auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]);
223 if (!negotiatedVersion)
224 {
225 m_peerFinder->on_closed(slot);
226 handoff.moved = false;
227 handoff.response = makeErrorResponse(
228 slot,
229 request,
230 remote_endpoint.address(),
231 "Unable to agree on a protocol version");
232 handoff.keep_alive = false;
233 return handoff;
234 }
235
236 auto const sharedValue = makeSharedValue(*stream_ptr, journal);
237 if (!sharedValue)
238 {
239 m_peerFinder->on_closed(slot);
240 handoff.moved = false;
241 handoff.response = makeErrorResponse(
242 slot,
243 request,
244 remote_endpoint.address(),
245 "Incorrect security cookie");
246 handoff.keep_alive = false;
247 return handoff;
248 }
249
250 try
251 {
252 auto publicKey = verifyHandshake(
253 request,
254 *sharedValue,
257 remote_endpoint.address(),
258 app_);
259
260 {
261 // The node gets a reserved slot if it is in our cluster
262 // or if it has a reservation.
263 bool const reserved =
264 static_cast<bool>(app_.cluster().member(publicKey)) ||
265 app_.peerReservations().contains(publicKey);
266 auto const result =
267 m_peerFinder->activate(slot, publicKey, reserved);
268 if (result != PeerFinder::Result::success)
269 {
270 m_peerFinder->on_closed(slot);
271 JLOG(journal.debug()) << "Peer " << remote_endpoint
272 << " redirected, " << to_string(result);
273 handoff.moved = false;
275 slot, request, remote_endpoint.address());
276 handoff.keep_alive = false;
277 return handoff;
278 }
279 }
280
281 auto const peer = std::make_shared<PeerImp>(
282 app_,
283 id,
284 slot,
285 std::move(request),
286 publicKey,
287 *negotiatedVersion,
288 consumer,
289 std::move(stream_ptr),
290 *this);
291 {
292 // As we are not on the strand, run() must be called
293 // while holding the lock, otherwise new I/O can be
294 // queued after a call to stop().
295 std::lock_guard<decltype(mutex_)> lock(mutex_);
296 {
297 auto const result = m_peers.emplace(peer->slot(), peer);
298 XRPL_ASSERT(
299 result.second,
300 "ripple::OverlayImpl::onHandoff : peer is inserted");
301 (void)result.second;
302 }
303 list_.emplace(peer.get(), peer);
304
305 peer->run();
306 }
307 handoff.moved = true;
308 return handoff;
309 }
310 catch (std::exception const& e)
311 {
312 JLOG(journal.debug()) << "Peer " << remote_endpoint
313 << " fails handshake (" << e.what() << ")";
314
315 m_peerFinder->on_closed(slot);
316 handoff.moved = false;
317 handoff.response = makeErrorResponse(
318 slot, request, remote_endpoint.address(), e.what());
319 handoff.keep_alive = false;
320 return handoff;
321 }
322}
323
324//------------------------------------------------------------------------------
325
326bool
328{
329 if (!is_upgrade(request))
330 return false;
331 auto const versions = parseProtocolVersions(request["Upgrade"]);
332 return !versions.empty();
333}
334
337{
339 ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
340 return ss.str();
341}
342
346 http_request_type const& request,
347 address_type remote_address)
348{
349 boost::beast::http::response<json_body> msg;
350 msg.version(request.version());
351 msg.result(boost::beast::http::status::service_unavailable);
352 msg.insert("Server", BuildInfo::getFullVersionString());
353 {
355 ostr << remote_address;
356 msg.insert("Remote-Address", ostr.str());
357 }
358 msg.insert("Content-Type", "application/json");
359 msg.insert(boost::beast::http::field::connection, "close");
360 msg.body() = Json::objectValue;
361 {
362 Json::Value& ips = (msg.body()["peer-ips"] = Json::arrayValue);
363 for (auto const& _ : m_peerFinder->redirect(slot))
364 ips.append(_.address.to_string());
365 }
366 msg.prepare_payload();
367 return std::make_shared<SimpleWriter>(msg);
368}
369
373 http_request_type const& request,
374 address_type remote_address,
375 std::string text)
376{
377 boost::beast::http::response<boost::beast::http::empty_body> msg;
378 msg.version(request.version());
379 msg.result(boost::beast::http::status::bad_request);
380 msg.reason("Bad Request (" + text + ")");
381 msg.insert("Server", BuildInfo::getFullVersionString());
382 msg.insert("Remote-Address", remote_address.to_string());
383 msg.insert(boost::beast::http::field::connection, "close");
384 msg.prepare_payload();
385 return std::make_shared<SimpleWriter>(msg);
386}
387
388//------------------------------------------------------------------------------
389
390void
392{
393 XRPL_ASSERT(work_, "ripple::OverlayImpl::connect : work is set");
394
395 auto usage = resourceManager().newOutboundEndpoint(remote_endpoint);
396 if (usage.disconnect(journal_))
397 {
398 JLOG(journal_.info()) << "Over resource limit: " << remote_endpoint;
399 return;
400 }
401
402 auto const slot = peerFinder().new_outbound_slot(remote_endpoint);
403 if (slot == nullptr)
404 {
405 JLOG(journal_.debug()) << "Connect: No slot for " << remote_endpoint;
406 return;
407 }
408
409 auto const p = std::make_shared<ConnectAttempt>(
410 app_,
413 usage,
415 next_id_++,
416 slot,
417 app_.journal("Peer"),
418 *this);
419
421 list_.emplace(p.get(), p);
422 p->run();
423}
424
425//------------------------------------------------------------------------------
426
427// Adds a peer that is already handshaked and active
428void
430{
432
433 {
434 auto const result = m_peers.emplace(peer->slot(), peer);
435 XRPL_ASSERT(
436 result.second,
437 "ripple::OverlayImpl::add_active : peer is inserted");
438 (void)result.second;
439 }
440
441 {
442 auto const result = ids_.emplace(
444 std::make_tuple(peer->id()),
445 std::make_tuple(peer));
446 XRPL_ASSERT(
447 result.second,
448 "ripple::OverlayImpl::add_active : peer ID is inserted");
449 (void)result.second;
450 }
451
452 list_.emplace(peer.get(), peer);
453
454 JLOG(journal_.debug()) << "activated " << peer->getRemoteAddress() << " ("
455 << peer->id() << ":"
456 << toBase58(
457 TokenType::NodePublic, peer->getNodePublic())
458 << ")";
459
460 // As we are not on the strand, run() must be called
461 // while holding the lock, otherwise new I/O can be
462 // queued after a call to stop().
463 peer->run();
464}
465
466void
468{
470 auto const iter = m_peers.find(slot);
471 XRPL_ASSERT(
472 iter != m_peers.end(), "ripple::OverlayImpl::remove : valid input");
473 m_peers.erase(iter);
474}
475
476void
478{
480 app_.config(),
481 serverHandler_.setup().overlay.port(),
482 app_.getValidationPublicKey().has_value(),
484
485 m_peerFinder->setConfig(config);
486 m_peerFinder->start();
487
488 // Populate our boot cache: if there are no entries in [ips] then we use
489 // the entries in [ips_fixed].
490 auto bootstrapIps =
492
493 // If nothing is specified, default to several well-known high-capacity
494 // servers to serve as bootstrap:
495 if (bootstrapIps.empty())
496 {
497 // Pool of servers operated by Ripple Labs Inc. - https://ripple.com
498 bootstrapIps.push_back("r.ripple.com 51235");
499
500 // Pool of servers operated by ISRDC - https://isrdc.in
501 bootstrapIps.push_back("sahyadri.isrdc.in 51235");
502
503 // Pool of servers operated by @Xrpkuwait - https://xrpkuwait.com
504 bootstrapIps.push_back("hubs.xrpkuwait.com 51235");
505
506 // Pool of servers operated by XRPL Commons - https://xrpl-commons.org
507 bootstrapIps.push_back("hub.xrpl-commons.org 51235");
508 }
509
511 bootstrapIps,
512 [this](
513 std::string const& name,
514 std::vector<beast::IP::Endpoint> const& addresses) {
516 ips.reserve(addresses.size());
517 for (auto const& addr : addresses)
518 {
519 if (addr.port() == 0)
520 ips.push_back(to_string(addr.at_port(DEFAULT_PEER_PORT)));
521 else
522 ips.push_back(to_string(addr));
523 }
524
525 std::string const base("config: ");
526 if (!ips.empty())
527 m_peerFinder->addFallbackStrings(base + name, ips);
528 });
529
530 // Add the ips_fixed from the rippled.cfg file
532 {
535 [this](
536 std::string const& name,
537 std::vector<beast::IP::Endpoint> const& addresses) {
539 ips.reserve(addresses.size());
540
541 for (auto& addr : addresses)
542 {
543 if (addr.port() == 0)
544 ips.emplace_back(addr.address(), DEFAULT_PEER_PORT);
545 else
546 ips.emplace_back(addr);
547 }
548
549 if (!ips.empty())
550 m_peerFinder->addFixedPeer(name, ips);
551 });
552 }
553 auto const timer = std::make_shared<Timer>(*this);
555 list_.emplace(timer.get(), timer);
556 timer_ = timer;
557 timer->async_wait();
558}
559
560void
562{
564 {
565 std::unique_lock<decltype(mutex_)> lock(mutex_);
566 cond_.wait(lock, [this] { return list_.empty(); });
567 }
568 m_peerFinder->stop();
569}
570
571//------------------------------------------------------------------------------
572//
573// PropertyStream
574//
575//------------------------------------------------------------------------------
576
577void
579{
580 beast::PropertyStream::Set set("traffic", stream);
581 auto const stats = m_traffic.getCounts();
582 for (auto const& pair : stats)
583 {
585 item["category"] = pair.second.name;
586 item["bytes_in"] = std::to_string(pair.second.bytesIn.load());
587 item["messages_in"] = std::to_string(pair.second.messagesIn.load());
588 item["bytes_out"] = std::to_string(pair.second.bytesOut.load());
589 item["messages_out"] = std::to_string(pair.second.messagesOut.load());
590 }
591}
592
593//------------------------------------------------------------------------------
599void
601{
602 // Now track this peer
603 {
605 auto const result(ids_.emplace(
607 std::make_tuple(peer->id()),
608 std::make_tuple(peer)));
609 XRPL_ASSERT(
610 result.second,
611 "ripple::OverlayImpl::activate : peer ID is inserted");
612 (void)result.second;
613 }
614
615 JLOG(journal_.debug()) << "activated " << peer->getRemoteAddress() << " ("
616 << peer->id() << ":"
617 << toBase58(
618 TokenType::NodePublic, peer->getNodePublic())
619 << ")";
620
621 // We just accepted this peer so we have non-zero active peers
622 XRPL_ASSERT(size(), "ripple::OverlayImpl::activate : nonzero peers");
623}
624
625void
627{
629 ids_.erase(id);
630}
631
632void
635 std::shared_ptr<PeerImp> const& from)
636{
637 auto const n = m->list_size();
638 auto const& journal = from->pjournal();
639
640 protocol::TMManifests relay;
641
642 for (std::size_t i = 0; i < n; ++i)
643 {
644 auto& s = m->list().Get(i).stobject();
645
646 if (auto mo = deserializeManifest(s))
647 {
648 auto const serialized = mo->serialized;
649
650 auto const result =
651 app_.validatorManifests().applyManifest(std::move(*mo));
652
653 if (result == ManifestDisposition::accepted)
654 {
655 relay.add_list()->set_stobject(s);
656
657 // N.B.: this is important; the applyManifest call above moves
658 // the loaded Manifest out of the optional so we need to
659 // reload it here.
660 mo = deserializeManifest(serialized);
661 XRPL_ASSERT(
662 mo,
663 "ripple::OverlayImpl::onManifests : manifest "
664 "deserialization succeeded");
665
666 app_.getOPs().pubManifest(*mo);
667
668 if (app_.validators().listed(mo->masterKey))
669 {
670 auto db = app_.getWalletDB().checkoutDb();
671 addValidatorManifest(*db, serialized);
672 }
673 }
674 }
675 else
676 {
677 JLOG(journal.debug())
678 << "Malformed manifest #" << i + 1 << ": " << strHex(s);
679 continue;
680 }
681 }
682
683 if (!relay.list().empty())
684 for_each([m2 = std::make_shared<Message>(relay, protocol::mtMANIFESTS)](
685 std::shared_ptr<PeerImp>&& p) { p->send(m2); });
686}
687
688void
690{
691 m_traffic.addCount(cat, true, size);
692}
693
694void
696{
697 m_traffic.addCount(cat, false, size);
698}
705{
707 return ids_.size();
708}
709
710int
712{
713 return m_peerFinder->config().maxPeers;
714}
715
718{
719 using namespace std::chrono;
720 Json::Value jv;
721 auto& av = jv["active"] = Json::Value(Json::arrayValue);
722
724 auto& pv = av.append(Json::Value(Json::objectValue));
725 pv[jss::public_key] = base64_encode(
726 sp->getNodePublic().data(), sp->getNodePublic().size());
727 pv[jss::type] = sp->slot()->inbound() ? "in" : "out";
728 pv[jss::uptime] = static_cast<std::uint32_t>(
729 duration_cast<seconds>(sp->uptime()).count());
730 if (sp->crawl())
731 {
732 pv[jss::ip] = sp->getRemoteAddress().address().to_string();
733 if (sp->slot()->inbound())
734 {
735 if (auto port = sp->slot()->listening_port())
736 pv[jss::port] = *port;
737 }
738 else
739 {
740 pv[jss::port] = std::to_string(sp->getRemoteAddress().port());
741 }
742 }
743
744 {
745 auto version{sp->getVersion()};
746 if (!version.empty())
747 // Could move here if Json::value supported moving from strings
748 pv[jss::version] = std::string{version};
749 }
750
751 std::uint32_t minSeq, maxSeq;
752 sp->ledgerRange(minSeq, maxSeq);
753 if (minSeq != 0 || maxSeq != 0)
754 pv[jss::complete_ledgers] =
755 std::to_string(minSeq) + "-" + std::to_string(maxSeq);
756 });
757
758 return jv;
759}
760
763{
764 bool const humanReadable = false;
765 bool const admin = false;
766 bool const counters = false;
767
768 Json::Value server_info =
769 app_.getOPs().getServerInfo(humanReadable, admin, counters);
770
771 // Filter out some information
772 server_info.removeMember(jss::hostid);
773 server_info.removeMember(jss::load_factor_fee_escalation);
774 server_info.removeMember(jss::load_factor_fee_queue);
775 server_info.removeMember(jss::validation_quorum);
776
777 if (server_info.isMember(jss::validated_ledger))
778 {
779 Json::Value& validated_ledger = server_info[jss::validated_ledger];
780
781 validated_ledger.removeMember(jss::base_fee);
782 validated_ledger.removeMember(jss::reserve_base_xrp);
783 validated_ledger.removeMember(jss::reserve_inc_xrp);
784 }
785
786 return server_info;
787}
788
791{
792 return getCountsJson(app_, 10);
793}
794
797{
798 Json::Value validators = app_.validators().getJson();
799
800 if (validators.isMember(jss::publisher_lists))
801 {
802 Json::Value& publisher_lists = validators[jss::publisher_lists];
803
804 for (auto& publisher : publisher_lists)
805 {
806 publisher.removeMember(jss::list);
807 }
808 }
809
810 validators.removeMember(jss::signing_keys);
811 validators.removeMember(jss::trusted_validator_keys);
812 validators.removeMember(jss::validation_quorum);
813
814 Json::Value validatorSites = app_.validatorSites().getJson();
815
816 if (validatorSites.isMember(jss::validator_sites))
817 {
818 validators[jss::validator_sites] =
819 std::move(validatorSites[jss::validator_sites]);
820 }
821
822 return validators;
823}
824
825// Returns information on verified peers.
828{
830 for (auto const& peer : getActivePeers())
831 {
832 json.append(peer->json());
833 }
834 return json;
835}
836
837bool
839{
840 if (req.target() != "/crawl" ||
842 return false;
843
844 boost::beast::http::response<json_body> msg;
845 msg.version(req.version());
846 msg.result(boost::beast::http::status::ok);
847 msg.insert("Server", BuildInfo::getFullVersionString());
848 msg.insert("Content-Type", "application/json");
849 msg.insert("Connection", "close");
850 msg.body()["version"] = Json::Value(2u);
851
853 {
854 msg.body()["overlay"] = getOverlayInfo();
855 }
857 {
858 msg.body()["server"] = getServerInfo();
859 }
861 {
862 msg.body()["counts"] = getServerCounts();
863 }
865 {
866 msg.body()["unl"] = getUnlInfo();
867 }
868
869 msg.prepare_payload();
870 handoff.response = std::make_shared<SimpleWriter>(msg);
871 return true;
872}
873
874bool
876 http_request_type const& req,
877 Handoff& handoff)
878{
879 // If the target is in the form "/vl/<validator_list_public_key>",
880 // return the most recent validator list for that key.
881 constexpr std::string_view prefix("/vl/");
882
883 if (!req.target().starts_with(prefix.data()) || !setup_.vlEnabled)
884 return false;
885
886 std::uint32_t version = 1;
887
888 boost::beast::http::response<json_body> msg;
889 msg.version(req.version());
890 msg.insert("Server", BuildInfo::getFullVersionString());
891 msg.insert("Content-Type", "application/json");
892 msg.insert("Connection", "close");
893
894 auto fail = [&msg, &handoff](auto status) {
895 msg.result(status);
896 msg.insert("Content-Length", "0");
897
898 msg.body() = Json::nullValue;
899
900 msg.prepare_payload();
901 handoff.response = std::make_shared<SimpleWriter>(msg);
902 return true;
903 };
904
905 std::string_view key = req.target().substr(prefix.size());
906
907 if (auto slash = key.find('/'); slash != std::string_view::npos)
908 {
909 auto verString = key.substr(0, slash);
910 if (!boost::conversion::try_lexical_convert(verString, version))
911 return fail(boost::beast::http::status::bad_request);
912 key = key.substr(slash + 1);
913 }
914
915 if (key.empty())
916 return fail(boost::beast::http::status::bad_request);
917
918 // find the list
919 auto vl = app_.validators().getAvailable(key, version);
920
921 if (!vl)
922 {
923 // 404 not found
924 return fail(boost::beast::http::status::not_found);
925 }
926 else if (!*vl)
927 {
928 return fail(boost::beast::http::status::bad_request);
929 }
930 else
931 {
932 msg.result(boost::beast::http::status::ok);
933
934 msg.body() = *vl;
935
936 msg.prepare_payload();
937 handoff.response = std::make_shared<SimpleWriter>(msg);
938 return true;
939 }
940}
941
942bool
944{
945 if (req.target() != "/health")
946 return false;
947 boost::beast::http::response<json_body> msg;
948 msg.version(req.version());
949 msg.insert("Server", BuildInfo::getFullVersionString());
950 msg.insert("Content-Type", "application/json");
951 msg.insert("Connection", "close");
952
953 auto info = getServerInfo();
954
955 int last_validated_ledger_age = -1;
956 if (info.isMember(jss::validated_ledger))
957 last_validated_ledger_age =
958 info[jss::validated_ledger][jss::age].asInt();
959 bool amendment_blocked = false;
960 if (info.isMember(jss::amendment_blocked))
961 amendment_blocked = true;
962 int number_peers = info[jss::peers].asInt();
963 std::string server_state = info[jss::server_state].asString();
964 auto load_factor = info[jss::load_factor_server].asDouble() /
965 info[jss::load_base].asDouble();
966
967 enum { healthy, warning, critical };
968 int health = healthy;
969 auto set_health = [&health](int state) {
970 if (health < state)
971 health = state;
972 };
973
974 msg.body()[jss::info] = Json::objectValue;
975 if (last_validated_ledger_age >= 7 || last_validated_ledger_age < 0)
976 {
977 msg.body()[jss::info][jss::validated_ledger] =
978 last_validated_ledger_age;
979 if (last_validated_ledger_age < 20)
980 set_health(warning);
981 else
982 set_health(critical);
983 }
984
985 if (amendment_blocked)
986 {
987 msg.body()[jss::info][jss::amendment_blocked] = true;
988 set_health(critical);
989 }
990
991 if (number_peers <= 7)
992 {
993 msg.body()[jss::info][jss::peers] = number_peers;
994 if (number_peers != 0)
995 set_health(warning);
996 else
997 set_health(critical);
998 }
999
1000 if (!(server_state == "full" || server_state == "validating" ||
1001 server_state == "proposing"))
1002 {
1003 msg.body()[jss::info][jss::server_state] = server_state;
1004 if (server_state == "syncing" || server_state == "tracking" ||
1005 server_state == "connected")
1006 {
1007 set_health(warning);
1008 }
1009 else
1010 set_health(critical);
1011 }
1012
1013 if (load_factor > 100)
1014 {
1015 msg.body()[jss::info][jss::load_factor] = load_factor;
1016 if (load_factor < 1000)
1017 set_health(warning);
1018 else
1019 set_health(critical);
1020 }
1021
1022 switch (health)
1023 {
1024 case healthy:
1025 msg.result(boost::beast::http::status::ok);
1026 break;
1027 case warning:
1028 msg.result(boost::beast::http::status::service_unavailable);
1029 break;
1030 case critical:
1031 msg.result(boost::beast::http::status::internal_server_error);
1032 break;
1033 }
1034
1035 msg.prepare_payload();
1036 handoff.response = std::make_shared<SimpleWriter>(msg);
1037 return true;
1038}
1039
1040bool
1042{
1043 // Take advantage of || short-circuiting
1044 return processCrawl(req, handoff) || processValidatorList(req, handoff) ||
1045 processHealth(req, handoff);
1046}
1047
1050{
1052 ret.reserve(size());
1053
1054 for_each([&ret](std::shared_ptr<PeerImp>&& sp) {
1055 ret.emplace_back(std::move(sp));
1056 });
1057
1058 return ret;
1059}
1060
1063 std::set<Peer::id_t> const& toSkip,
1064 std::size_t& active,
1065 std::size_t& disabled,
1066 std::size_t& enabledInSkip) const
1067{
1069 std::lock_guard lock(mutex_);
1070
1071 active = ids_.size();
1072 disabled = enabledInSkip = 0;
1073 ret.reserve(ids_.size());
1074
1075 // NOTE The purpose of p is to delay the destruction of PeerImp
1077 for (auto& [id, w] : ids_)
1078 {
1079 if (p = w.lock(); p != nullptr)
1080 {
1081 bool const reduceRelayEnabled = p->txReduceRelayEnabled();
1082 // tx reduced relay feature disabled
1083 if (!reduceRelayEnabled)
1084 ++disabled;
1085
1086 if (toSkip.count(id) == 0)
1087 ret.emplace_back(std::move(p));
1088 else if (reduceRelayEnabled)
1089 ++enabledInSkip;
1090 }
1091 }
1092
1093 return ret;
1094}
1095
1096void
1098{
1099 for_each(
1100 [index](std::shared_ptr<PeerImp>&& sp) { sp->checkTracking(index); });
1101}
1102
1105{
1106 std::lock_guard lock(mutex_);
1107 auto const iter = ids_.find(id);
1108 if (iter != ids_.end())
1109 return iter->second.lock();
1110 return {};
1111}
1112
1113// A public key hash map was not used due to the peer connect/disconnect
1114// update overhead outweighing the performance of a small set linear search.
1117{
1118 std::lock_guard lock(mutex_);
1119 // NOTE The purpose of peer is to delay the destruction of PeerImp
1121 for (auto const& e : ids_)
1122 {
1123 if (peer = e.second.lock(); peer != nullptr)
1124 {
1125 if (peer->getNodePublic() == pubKey)
1126 return peer;
1127 }
1128 }
1129 return {};
1130}
1131
1132void
1133OverlayImpl::broadcast(protocol::TMProposeSet& m)
1134{
1135 auto const sm = std::make_shared<Message>(m, protocol::mtPROPOSE_LEDGER);
1136 for_each([&](std::shared_ptr<PeerImp>&& p) { p->send(sm); });
1137}
1138
1141 protocol::TMProposeSet& m,
1142 uint256 const& uid,
1143 PublicKey const& validator)
1144{
1145 if (auto const toSkip = app_.getHashRouter().shouldRelay(uid))
1146 {
1147 auto const sm =
1148 std::make_shared<Message>(m, protocol::mtPROPOSE_LEDGER, validator);
1150 if (toSkip->find(p->id()) == toSkip->end())
1151 p->send(sm);
1152 });
1153 return *toSkip;
1154 }
1155 return {};
1156}
1157
1158void
1159OverlayImpl::broadcast(protocol::TMValidation& m)
1160{
1161 auto const sm = std::make_shared<Message>(m, protocol::mtVALIDATION);
1162 for_each([sm](std::shared_ptr<PeerImp>&& p) { p->send(sm); });
1163}
1164
1167 protocol::TMValidation& m,
1168 uint256 const& uid,
1169 PublicKey const& validator)
1170{
1171 if (auto const toSkip = app_.getHashRouter().shouldRelay(uid))
1172 {
1173 auto const sm =
1174 std::make_shared<Message>(m, protocol::mtVALIDATION, validator);
1176 if (toSkip->find(p->id()) == toSkip->end())
1177 p->send(sm);
1178 });
1179 return *toSkip;
1180 }
1181 return {};
1182}
1183
1186{
1188
1189 if (auto seq = app_.validatorManifests().sequence();
1190 seq != manifestListSeq_)
1191 {
1192 protocol::TMManifests tm;
1193
1195 [&tm](std::size_t s) { tm.mutable_list()->Reserve(s); },
1196 [&tm, &hr = app_.getHashRouter()](Manifest const& manifest) {
1197 tm.add_list()->set_stobject(
1198 manifest.serialized.data(), manifest.serialized.size());
1199 hr.addSuppression(manifest.hash());
1200 });
1201
1203
1204 if (tm.list_size() != 0)
1206 std::make_shared<Message>(tm, protocol::mtMANIFESTS);
1207
1208 manifestListSeq_ = seq;
1209 }
1210
1211 return manifestMessage_;
1212}
1213
1214void
1216 uint256 const& hash,
1218 std::set<Peer::id_t> const& toSkip)
1219{
1220 bool relay = tx.has_value();
1221 if (relay)
1222 {
1223 auto& txn = tx->get();
1224 SerialIter sit(makeSlice(txn.rawtransaction()));
1225 relay = !isPseudoTx(STTx{sit});
1226 }
1227
1228 Overlay::PeerSequence peers = {};
1229 std::size_t total = 0;
1230 std::size_t disabled = 0;
1231 std::size_t enabledInSkip = 0;
1232
1233 if (!relay)
1234 {
1236 return;
1237
1238 peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
1239 JLOG(journal_.trace())
1240 << "not relaying tx, total peers " << peers.size();
1241 for (auto const& p : peers)
1242 p->addTxQueue(hash);
1243 return;
1244 }
1245
1246 auto& txn = tx->get();
1247 auto const sm = std::make_shared<Message>(txn, protocol::mtTRANSACTION);
1248 peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
1249 auto const minRelay = app_.config().TX_REDUCE_RELAY_MIN_PEERS + disabled;
1250
1251 if (!app_.config().TX_REDUCE_RELAY_ENABLE || total <= minRelay)
1252 {
1253 for (auto const& p : peers)
1254 p->send(sm);
1257 txMetrics_.addMetrics(total, toSkip.size(), 0);
1258 return;
1259 }
1260
1261 // We have more peers than the minimum (disabled + minimum enabled),
1262 // relay to all disabled and some randomly selected enabled that
1263 // do not have the transaction.
1264 auto const enabledTarget = app_.config().TX_REDUCE_RELAY_MIN_PEERS +
1265 (total - minRelay) * app_.config().TX_RELAY_PERCENTAGE / 100;
1266
1267 txMetrics_.addMetrics(enabledTarget, toSkip.size(), disabled);
1268
1269 if (enabledTarget > enabledInSkip)
1270 std::shuffle(peers.begin(), peers.end(), default_prng());
1271
1272 JLOG(journal_.trace()) << "relaying tx, total peers " << peers.size()
1273 << " selected " << enabledTarget << " skip "
1274 << toSkip.size() << " disabled " << disabled;
1275
1276 // count skipped peers with the enabled feature towards the quota
1277 std::uint16_t enabledAndRelayed = enabledInSkip;
1278 for (auto const& p : peers)
1279 {
1280 // always relay to a peer with the disabled feature
1281 if (!p->txReduceRelayEnabled())
1282 {
1283 p->send(sm);
1284 }
1285 else if (enabledAndRelayed < enabledTarget)
1286 {
1287 enabledAndRelayed++;
1288 p->send(sm);
1289 }
1290 else
1291 {
1292 p->addTxQueue(hash);
1293 }
1294 }
1295}
1296
1297//------------------------------------------------------------------------------
1298
1299void
1301{
1302 std::lock_guard lock(mutex_);
1303 list_.erase(&child);
1304 if (list_.empty())
1305 cond_.notify_all();
1306}
1307
1308void
1310{
1311 // Calling list_[].second->stop() may cause list_ to be modified
1312 // (OverlayImpl::remove() may be called on this same thread). So
1313 // iterating directly over list_ to call child->stop() could lead to
1314 // undefined behavior.
1315 //
1316 // Therefore we copy all of the weak/shared ptrs out of list_ before we
1317 // start calling stop() on them. That guarantees OverlayImpl::remove()
1318 // won't be called until vector<> children leaves scope.
1320 {
1321 std::lock_guard lock(mutex_);
1322 if (!work_)
1323 return;
1324 work_ = std::nullopt;
1325
1326 children.reserve(list_.size());
1327 for (auto const& element : list_)
1328 {
1329 children.emplace_back(element.second.lock());
1330 }
1331 } // lock released
1332
1333 for (auto const& child : children)
1334 {
1335 if (child != nullptr)
1336 child->stop();
1337 }
1338}
1339
1340void
1342{
1343 auto const result = m_peerFinder->autoconnect();
1344 for (auto addr : result)
1345 connect(addr);
1346}
1347
1348void
1350{
1351 auto const result = m_peerFinder->buildEndpointsForPeers();
1352 for (auto const& e : result)
1353 {
1355 {
1356 std::lock_guard lock(mutex_);
1357 auto const iter = m_peers.find(e.first);
1358 if (iter != m_peers.end())
1359 peer = iter->second.lock();
1360 }
1361 if (peer)
1362 peer->sendEndpoints(e.second.begin(), e.second.end());
1363 }
1364}
1365
1366void
1368{
1369 for_each([](auto const& p) {
1370 if (p->txReduceRelayEnabled())
1371 p->sendTxQueue();
1372 });
1373}
1374
1377 PublicKey const& validator,
1378 bool squelch,
1379 uint32_t squelchDuration)
1380{
1381 protocol::TMSquelch m;
1382 m.set_squelch(squelch);
1383 m.set_validatorpubkey(validator.data(), validator.size());
1384 if (squelch)
1385 m.set_squelchduration(squelchDuration);
1386 return std::make_shared<Message>(m, protocol::mtSQUELCH);
1387}
1388
1389void
1391{
1392 if (auto peer = findPeerByShortID(id); peer)
1393 {
1394 // optimize - multiple message with different
1395 // validator might be sent to the same peer
1396 peer->send(makeSquelchMessage(validator, false, 0));
1397 }
1398}
1399
1400void
1402 PublicKey const& validator,
1403 Peer::id_t id,
1404 uint32_t squelchDuration) const
1405{
1406 if (auto peer = findPeerByShortID(id); peer)
1407 {
1408 peer->send(makeSquelchMessage(validator, true, squelchDuration));
1409 }
1410}
1411
1412void
1414 uint256 const& key,
1415 PublicKey const& validator,
1416 std::set<Peer::id_t>&& peers,
1417 protocol::MessageType type)
1418{
1419 if (!slots_.baseSquelchReady())
1420 return;
1421
1422 if (!strand_.running_in_this_thread())
1423 return post(
1424 strand_,
1425 // Must capture copies of reference parameters (i.e. key, validator)
1426 [this,
1427 key = key,
1428 validator = validator,
1429 peers = std::move(peers),
1430 type]() mutable {
1431 updateSlotAndSquelch(key, validator, std::move(peers), type);
1432 });
1433
1434 for (auto id : peers)
1435 slots_.updateSlotAndSquelch(key, validator, id, type, [&]() {
1437 });
1438}
1439
1440void
1442 uint256 const& key,
1443 PublicKey const& validator,
1444 Peer::id_t peer,
1445 protocol::MessageType type)
1446{
1447 if (!slots_.baseSquelchReady())
1448 return;
1449
1450 if (!strand_.running_in_this_thread())
1451 return post(
1452 strand_,
1453 // Must capture copies of reference parameters (i.e. key, validator)
1454 [this, key = key, validator = validator, peer, type]() {
1455 updateSlotAndSquelch(key, validator, peer, type);
1456 });
1457
1458 slots_.updateSlotAndSquelch(key, validator, peer, type, [&]() {
1460 });
1461}
1462
1463void
1465{
1466 if (!strand_.running_in_this_thread())
1467 return post(strand_, std::bind(&OverlayImpl::deletePeer, this, id));
1468
1469 slots_.deletePeer(id, true);
1470}
1471
1472void
1474{
1475 if (!strand_.running_in_this_thread())
1476 return post(strand_, std::bind(&OverlayImpl::deleteIdlePeers, this));
1477
1478 slots_.deleteIdlePeers();
1479}
1480
1481//------------------------------------------------------------------------------
1482
1485{
1486 Overlay::Setup setup;
1487
1488 {
1489 auto const& section = config.section("overlay");
1490 setup.context = make_SSLContext("");
1491
1492 set(setup.ipLimit, "ip_limit", section);
1493 if (setup.ipLimit < 0)
1494 Throw<std::runtime_error>("Configured IP limit is invalid");
1495
1496 std::string ip;
1497 set(ip, "public_ip", section);
1498 if (!ip.empty())
1499 {
1500 boost::system::error_code ec;
1501 setup.public_ip = beast::IP::Address::from_string(ip, ec);
1502 if (ec || beast::IP::is_private(setup.public_ip))
1503 Throw<std::runtime_error>("Configured public IP is invalid");
1504 }
1505 }
1506
1507 {
1508 auto const& section = config.section("crawl");
1509 auto const& values = section.values();
1510
1511 if (values.size() > 1)
1512 {
1513 Throw<std::runtime_error>(
1514 "Configured [crawl] section is invalid, too many values");
1515 }
1516
1517 bool crawlEnabled = true;
1518
1519 // Only allow "0|1" as a value
1520 if (values.size() == 1)
1521 {
1522 try
1523 {
1524 crawlEnabled = boost::lexical_cast<bool>(values.front());
1525 }
1526 catch (boost::bad_lexical_cast const&)
1527 {
1528 Throw<std::runtime_error>(
1529 "Configured [crawl] section has invalid value: " +
1530 values.front());
1531 }
1532 }
1533
1534 if (crawlEnabled)
1535 {
1536 if (get<bool>(section, "overlay", true))
1537 {
1539 }
1540 if (get<bool>(section, "server", true))
1541 {
1543 }
1544 if (get<bool>(section, "counts", false))
1545 {
1547 }
1548 if (get<bool>(section, "unl", true))
1549 {
1551 }
1552 }
1553 }
1554 {
1555 auto const& section = config.section("vl");
1556
1557 set(setup.vlEnabled, "enabled", section);
1558 }
1559
1560 try
1561 {
1562 auto id = config.legacy("network_id");
1563
1564 if (!id.empty())
1565 {
1566 if (id == "main")
1567 id = "0";
1568
1569 if (id == "testnet")
1570 id = "1";
1571
1572 if (id == "devnet")
1573 id = "2";
1574
1575 setup.networkID = beast::lexicalCastThrow<std::uint32_t>(id);
1576 }
1577 }
1578 catch (...)
1579 {
1580 Throw<std::runtime_error>(
1581 "Configured [network_id] section is invalid: must be a number "
1582 "or one of the strings 'main', 'testnet' or 'devnet'.");
1583 }
1584
1585 return setup;
1586}
1587
1590 Application& app,
1591 Overlay::Setup const& setup,
1592 ServerHandler& serverHandler,
1593 Resource::Manager& resourceManager,
1594 Resolver& resolver,
1595 boost::asio::io_service& io_service,
1596 BasicConfig const& config,
1597 beast::insight::Collector::ptr const& collector)
1598{
1599 return std::make_unique<OverlayImpl>(
1600 app,
1601 setup,
1602 serverHandler,
1603 resourceManager,
1604 resolver,
1605 io_service,
1606 config,
1607 collector);
1608}
1609
1610} // namespace ripple
T begin(T... args)
T bind(T... args)
Represents a JSON value.
Definition: json_value.h:149
Value & append(Value const &value)
Append value to array at the end.
Definition: json_value.cpp:910
Value removeMember(char const *key)
Remove and return the named member.
Definition: json_value.cpp:935
bool isMember(char const *key) const
Return true if the object has a member named key.
Definition: json_value.cpp:962
A version-independent IP address and port combination.
Definition: IPEndpoint.h:38
A generic endpoint for log messages.
Definition: Journal.h:60
Stream debug() const
Definition: Journal.h:328
Stream info() const
Definition: Journal.h:334
Stream trace() const
Severity stream access functions.
Definition: Journal.h:322
std::string const & name() const
Returns the name of this source.
void add(Source &source)
Add a child source.
Wraps a Journal::Sink to prefix its output with a string.
Definition: WrappedSink.h:34
virtual Config & config()=0
virtual beast::Journal journal(std::string const &name)=0
virtual ValidatorSite & validatorSites()=0
virtual DatabaseCon & getWalletDB()=0
Retrieve the "wallet database".
virtual NetworkOPs & getOPs()=0
virtual ValidatorList & validators()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual ManifestCache & validatorManifests()=0
virtual PeerReservationTable & peerReservations()=0
virtual Cluster & cluster()=0
virtual Logs & logs()=0
virtual HashRouter & getHashRouter()=0
Holds unparsed configuration information.
Definition: BasicConfig.h:218
Section & section(std::string const &name)
Returns the section with the given name.
void legacy(std::string const &section, std::string value)
Set a value that is not a key/value pair.
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
Definition: Cluster.cpp:38
std::vector< std::string > IPS_FIXED
Definition: Config.h:144
std::vector< std::string > IPS
Definition: Config.h:141
bool standalone() const
Definition: Config.h:336
std::size_t TX_REDUCE_RELAY_MIN_PEERS
Definition: Config.h:268
bool TX_REDUCE_RELAY_ENABLE
Definition: Config.h:258
bool TX_REDUCE_RELAY_METRICS
Definition: Config.h:265
std::size_t TX_RELAY_PERCENTAGE
Definition: Config.h:271
LockedSociSession checkoutDb()
Definition: DatabaseCon.h:190
std::optional< std::set< PeerShortID > > shouldRelay(uint256 const &key)
Determines whether the hashed item should be relayed.
Definition: HashRouter.cpp:123
virtual void pubManifest(Manifest const &)=0
std::uint32_t sequence() const
A monotonically increasing number used to detect new manifests.
Definition: Manifest.h:278
void for_each_manifest(Function &&f) const
Invokes the callback once for every populated manifest.
Definition: Manifest.h:425
ManifestDisposition applyManifest(Manifest m)
Add manifest to cache.
Definition: Manifest.cpp:383
virtual Json::Value getServerInfo(bool human, bool admin, bool counters)=0
Child(OverlayImpl &overlay)
Definition: OverlayImpl.cpp:59
boost::system::error_code error_code
Definition: OverlayImpl.h:83
Json::Value getUnlInfo()
Returns information about the local server's UNL.
void stop() override
static std::string makePrefix(std::uint32_t id)
PeerFinder::Manager & peerFinder()
Definition: OverlayImpl.h:161
boost::asio::ip::tcp::endpoint endpoint_type
Definition: OverlayImpl.h:82
bool processHealth(http_request_type const &req, Handoff &handoff)
Handles health requests.
boost::asio::ip::address address_type
Definition: OverlayImpl.h:81
static bool is_upgrade(boost::beast::http::header< true, Fields > const &req)
Definition: OverlayImpl.h:320
std::condition_variable_any cond_
Definition: OverlayImpl.h:107
void onWrite(beast::PropertyStream::Map &stream) override
Subclass override.
void deleteIdlePeers()
Check if peers stopped relaying messages and if slots stopped receiving messages from the validator.
Resolver & m_resolver
Definition: OverlayImpl.h:118
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
PeerSequence getActivePeers() const override
Returns a sequence representing the current list of peers.
void start() override
hash_map< std::shared_ptr< PeerFinder::Slot >, std::weak_ptr< PeerImp > > m_peers
Definition: OverlayImpl.h:116
void add_active(std::shared_ptr< PeerImp > const &peer)
std::shared_ptr< Peer > findPeerByPublicKey(PublicKey const &pubKey) override
Returns the peer with the matching public key, or null.
Resource::Manager & m_resourceManager
Definition: OverlayImpl.h:113
std::shared_ptr< Message > manifestMessage_
Definition: OverlayImpl.h:131
std::optional< std::uint32_t > manifestListSeq_
Definition: OverlayImpl.h:133
TrafficCount m_traffic
Definition: OverlayImpl.h:115
void squelch(PublicKey const &validator, Peer::id_t const id, std::uint32_t squelchDuration) const override
Squelch handler.
std::shared_ptr< Writer > makeErrorResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remote_address, std::string msg)
reduce_relay::Slots< UptimeClock > slots_
Definition: OverlayImpl.h:125
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
std::shared_ptr< Peer > findPeerByShortID(Peer::id_t const &id) const override
Returns the peer with the matching short id, or null.
std::atomic< Peer::id_t > next_id_
Definition: OverlayImpl.h:119
boost::asio::io_service & io_service_
Definition: OverlayImpl.h:103
Application & app_
Definition: OverlayImpl.h:102
std::weak_ptr< Timer > timer_
Definition: OverlayImpl.h:108
metrics::TxMetrics txMetrics_
Definition: OverlayImpl.h:128
void broadcast(protocol::TMProposeSet &m) override
Broadcast a proposal.
void onPeerDeactivate(Peer::id_t id)
std::mutex manifestLock_
Definition: OverlayImpl.h:135
bool processRequest(http_request_type const &req, Handoff &handoff)
Handles non-peer protocol requests.
std::recursive_mutex mutex_
Definition: OverlayImpl.h:106
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
void sendTxQueue()
Send once a second transactions' hashes aggregated by peers.
void reportOutboundTraffic(TrafficCount::category cat, int bytes)
std::set< Peer::id_t > relay(protocol::TMProposeSet &m, uint256 const &uid, PublicKey const &validator) override
Relay a proposal.
std::size_t size() const override
The number of active peers on the network Active peers are only those peers that have completed the h...
void unsquelch(PublicKey const &validator, Peer::id_t id) const override
Unsquelch handler.
std::shared_ptr< Writer > makeRedirectResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remote_address)
void for_each(UnaryFunc &&f) const
Definition: OverlayImpl.h:278
std::optional< boost::asio::io_service::work > work_
Definition: OverlayImpl.h:104
Json::Value getOverlayInfo()
Returns information about peers on the overlay network.
Resource::Manager & resourceManager()
Definition: OverlayImpl.h:167
static bool isPeerUpgrade(http_request_type const &request)
Json::Value getServerCounts()
Returns information about the local server's performance counters.
void reportInboundTraffic(TrafficCount::category cat, int bytes)
boost::asio::io_service::strand strand_
Definition: OverlayImpl.h:105
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
std::unique_ptr< PeerFinder::Manager > m_peerFinder
Definition: OverlayImpl.h:114
void connect(beast::IP::Endpoint const &remote_endpoint) override
Establish a peer connection to the specified endpoint.
Handoff onHandoff(std::unique_ptr< stream_type > &&bundle, http_request_type &&request, endpoint_type remote_endpoint) override
Conditionally accept an incoming HTTP request.
Setup const & setup() const
Definition: OverlayImpl.h:173
std::shared_ptr< Message > getManifestsMessage()
hash_map< Peer::id_t, std::weak_ptr< PeerImp > > ids_
Definition: OverlayImpl.h:117
Json::Value getServerInfo()
Returns information about the local server.
bool processValidatorList(http_request_type const &req, Handoff &handoff)
Handles validator list requests.
Json::Value json() override
Return diagnostics on the status of all peers.
void checkTracking(std::uint32_t) override
Calls the checkTracking function on each peer.
ServerHandler & serverHandler_
Definition: OverlayImpl.h:112
bool processCrawl(http_request_type const &req, Handoff &handoff)
Handles crawl requests.
int limit() override
Returns the maximum number of peers we are configured to allow.
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, std::set< Peer::id_t > &&peers, protocol::MessageType type)
Updates message count for validator/peer.
OverlayImpl(Application &app, Setup const &setup, ServerHandler &serverHandler, Resource::Manager &resourceManager, Resolver &resolver, boost::asio::io_service &io_service, BasicConfig const &config, beast::insight::Collector::ptr const &collector)
beast::Journal const journal_
Definition: OverlayImpl.h:111
boost::container::flat_map< Child *, std::weak_ptr< Child > > list_
Definition: OverlayImpl.h:109
Manages the set of connected peers.
Definition: Overlay.h:49
virtual std::shared_ptr< Slot > new_outbound_slot(beast::IP::Endpoint const &remote_endpoint)=0
Create a new outbound slot with the specified remote endpoint.
bool contains(PublicKey const &nodeId)
A public key.
Definition: PublicKey.h:61
void resolve(std::vector< std::string > const &names, Handler handler)
resolve all hostnames on the list
Definition: Resolver.h:57
Tracks load and resource consumption.
virtual Consumer newInboundEndpoint(beast::IP::Endpoint const &address)=0
Create a new endpoint keyed by inbound IP address or the forwarded IP if proxied.
virtual Consumer newOutboundEndpoint(beast::IP::Endpoint const &address)=0
Create a new endpoint keyed by outbound IP address and port.
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition: BasicConfig.h:79
void setup(Setup const &setup, beast::Journal journal)
auto const & getCounts() const
An up-to-date copy of all the counters.
Definition: TrafficCount.h:243
void addCount(category cat, bool inbound, int bytes)
Account for traffic associated with the given category.
Definition: TrafficCount.h:214
bool listed(PublicKey const &identity) const
Returns true if public key is included on any lists.
std::optional< Json::Value > getAvailable(std::string_view pubKey, std::optional< std::uint32_t > forceVersion={})
Returns the current valid list for the given publisher key, if available, as a Json object.
Json::Value getJson() const
Return a JSON representation of the state of the validator list.
Json::Value getJson() const
Return JSON representation of configured validator sites.
T count(T... args)
T data(T... args)
T emplace_back(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T find_if(T... args)
T get(T... args)
T make_tuple(T... args)
@ nullValue
'null' value
Definition: json_value.h:38
@ arrayValue
array value (ordered list)
Definition: json_value.h:44
@ objectValue
object value (collection of name/value pairs).
Definition: json_value.h:45
bool is_private(Address const &addr)
Returns true if the address is a private unroutable address.
Definition: IPAddress.h:71
Result split_commas(FwdIt first, FwdIt last)
Definition: rfc2616.h:199
bool is_keep_alive(boost::beast::http::message< isRequest, Body, Fields > const &m)
Definition: rfc2616.h:386
std::string const & getFullVersionString()
Full server version string.
Definition: BuildInfo.cpp:81
@ checkIdlePeers
How often we check for idle peers (seconds)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: algorithm.h:25
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition: AccountID.cpp:114
std::optional< ProtocolVersion > negotiateProtocolVersion(std::vector< ProtocolVersion > const &versions)
Given a list of supported protocol versions, choose the one we prefer.
std::optional< Manifest > deserializeManifest(Slice s, beast::Journal journal)
Constructs Manifest from serialized string.
Definition: Manifest.cpp:54
std::vector< ProtocolVersion > parseProtocolVersions(boost::beast::string_view const &value)
Parse a set of protocol versions.
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
Definition: STTx.cpp:820
void addValidatorManifest(soci::session &session, std::string const &serialized)
addValidatorManifest Saves the manifest of a validator to the database.
Definition: Wallet.cpp:119
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
Definition: BasicConfig.h:315
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
Definition: Handshake.cpp:146
std::shared_ptr< boost::asio::ssl::context > make_SSLContext(std::string const &cipherList)
Create a self-signed SSL context that allows anonymous Diffie Hellman.
std::shared_ptr< Message > makeSquelchMessage(PublicKey const &validator, bool squelch, uint32_t squelchDuration)
std::string strHex(FwdIt begin, FwdIt end)
Definition: strHex.h:30
@ accepted
Manifest is valid.
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition: Slice.h:244
std::string base64_encode(std::uint8_t const *data, std::size_t len)
Stopwatch & stopwatch()
Returns an instance of a wall clock.
Definition: chrono.h:119
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition: Handoff.h:33
Json::Value getCountsJson(Application &app, int minObjectCount)
Definition: GetCounts.cpp:62
std::string to_string(base_uint< Bits, Tag > const &a)
Definition: base_uint.h:630
PublicKey verifyHandshake(boost::beast::http::fields const &headers, ripple::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address public_ip, beast::IP::Address remote, Application &app)
Validate header fields necessary for upgrading the link to the peer protocol.
Definition: Handshake.cpp:227
@ manifest
Manifest.
Overlay::Setup setup_Overlay(BasicConfig const &config)
constexpr Number squelch(Number const &x, Number const &limit) noexcept
Definition: Number.h:363
std::unique_ptr< Overlay > make_Overlay(Application &app, Overlay::Setup const &setup, ServerHandler &serverHandler, Resource::Manager &resourceManager, Resolver &resolver, boost::asio::io_service &io_service, BasicConfig const &config, beast::insight::Collector::ptr const &collector)
Creates the implementation of Overlay.
beast::xor_shift_engine & default_prng()
Return the default random engine.
STL namespace.
T piecewise_construct
T push_back(T... args)
T shuffle(T... args)
T reserve(T... args)
T reset(T... args)
T setfill(T... args)
T setw(T... args)
T size(T... args)
T str(T... args)
static boost::asio::ip::tcp::endpoint to_asio_endpoint(IP::Endpoint const &address)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
Used to indicate the result of a server connection handoff.
Definition: Handoff.h:40
bool keep_alive
Definition: Handoff.h:46
std::shared_ptr< Writer > response
Definition: Handoff.h:49
void on_timer(error_code ec)
Definition: OverlayImpl.cpp:93
Timer(OverlayImpl &overlay)
Definition: OverlayImpl.cpp:70
beast::IP::Address public_ip
Definition: Overlay.h:69
std::uint32_t crawlOptions
Definition: Overlay.h:71
std::shared_ptr< boost::asio::ssl::context > context
Definition: Overlay.h:68
std::optional< std::uint32_t > networkID
Definition: Overlay.h:72
PeerFinder configuration settings.
static Config makeConfig(ripple::Config const &config, std::uint16_t port, bool validationPublicKey, int ipLimit)
Make PeerFinder::Config from configuration parameters.
void addMetrics(protocol::MessageType type, std::uint32_t val)
Add protocol message metrics.
Definition: TxMetrics.cpp:31
T substr(T... args)
T to_string(T... args)
T what(T... args)