rippled
Loading...
Searching...
No Matches
OverlayImpl.h
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#ifndef RIPPLE_OVERLAY_OVERLAYIMPL_H_INCLUDED
21#define RIPPLE_OVERLAY_OVERLAYIMPL_H_INCLUDED
22
23#include <xrpld/app/main/Application.h>
24#include <xrpld/core/Job.h>
25#include <xrpld/overlay/Message.h>
26#include <xrpld/overlay/Overlay.h>
27#include <xrpld/overlay/Slot.h>
28#include <xrpld/overlay/detail/Handshake.h>
29#include <xrpld/overlay/detail/TrafficCount.h>
30#include <xrpld/overlay/detail/TxMetrics.h>
31#include <xrpld/peerfinder/PeerfinderManager.h>
32#include <xrpld/rpc/ServerHandler.h>
33
34#include <xrpl/basics/Resolver.h>
35#include <xrpl/basics/UnorderedContainers.h>
36#include <xrpl/basics/chrono.h>
37#include <xrpl/beast/utility/instrumentation.h>
38#include <xrpl/resource/ResourceManager.h>
39#include <xrpl/server/Handoff.h>
40
41#include <boost/asio/basic_waitable_timer.hpp>
42#include <boost/asio/ip/tcp.hpp>
43#include <boost/asio/ssl/context.hpp>
44#include <boost/asio/strand.hpp>
45#include <boost/container/flat_map.hpp>
46
47#include <atomic>
48#include <chrono>
49#include <condition_variable>
50#include <cstdint>
51#include <memory>
52#include <mutex>
53#include <optional>
54#include <unordered_map>
55
56namespace ripple {
57
58class PeerImp;
59class BasicConfig;
60
62{
63public:
64 class Child
65 {
66 protected:
68
69 explicit Child(OverlayImpl& overlay);
70
71 virtual ~Child();
72
73 public:
74 virtual void
75 stop() = 0;
76 };
77
78private:
80 using socket_type = boost::asio::ip::tcp::socket;
81 using address_type = boost::asio::ip::address;
82 using endpoint_type = boost::asio::ip::tcp::endpoint;
83 using error_code = boost::system::error_code;
84
86 {
87 boost::asio::basic_waitable_timer<clock_type> timer_;
88 bool stopping_{false};
89
90 explicit Timer(OverlayImpl& overlay);
91
92 void
93 stop() override;
94
95 void
96 async_wait();
97
98 void
100 };
101
103 boost::asio::io_service& io_service_;
105 boost::asio::io_service::strand strand_;
106 mutable std::recursive_mutex mutex_; // VFALCO use std::mutex
109 boost::container::flat_map<Child*, std::weak_ptr<Child>> list_;
124
126
127 // Transaction reduce-relay metrics
129
130 // A message with the list of manifests we send to peers
132 // Used to track whether we need to update the cached list of manifests
134 // Protects the message and the sequence list of manifests
136
137 //--------------------------------------------------------------------------
138
139public:
141 Application& app,
142 Setup const& setup,
143 ServerHandler& serverHandler,
145 Resolver& resolver,
146 boost::asio::io_service& io_service,
147 BasicConfig const& config,
148 beast::insight::Collector::ptr const& collector);
149
150 OverlayImpl(OverlayImpl const&) = delete;
152 operator=(OverlayImpl const&) = delete;
153
154 void
155 start() override;
156
157 void
158 stop() override;
159
162 {
163 return *m_peerFinder;
164 }
165
168 {
169 return m_resourceManager;
170 }
171
172 Setup const&
173 setup() const
174 {
175 return setup_;
176 }
177
178 Handoff
179 onHandoff(
181 http_request_type&& request,
182 endpoint_type remote_endpoint) override;
183
184 void
185 connect(beast::IP::Endpoint const& remote_endpoint) override;
186
187 int
188 limit() override;
189
191 size() const override;
192
194 json() override;
195
197 getActivePeers() const override;
198
210 std::set<Peer::id_t> const& toSkip,
211 std::size_t& active,
212 std::size_t& disabled,
213 std::size_t& enabledInSkip) const;
214
215 void checkTracking(std::uint32_t) override;
216
218 findPeerByShortID(Peer::id_t const& id) const override;
219
221 findPeerByPublicKey(PublicKey const& pubKey) override;
222
223 void
224 broadcast(protocol::TMProposeSet& m) override;
225
226 void
227 broadcast(protocol::TMValidation& m) override;
228
230 relay(
231 protocol::TMProposeSet& m,
232 uint256 const& uid,
233 PublicKey const& validator) override;
234
236 relay(
237 protocol::TMValidation& m,
238 uint256 const& uid,
239 PublicKey const& validator) override;
240
241 void
242 relay(
243 uint256 const&,
245 std::set<Peer::id_t> const& skip) override;
246
249
250 //--------------------------------------------------------------------------
251 //
252 // OverlayImpl
253 //
254
255 void
257
258 void
260
266 void
268
269 // Called when an active peer is destroyed.
270 void
272
273 // UnaryFunc will be called as
274 // void(std::shared_ptr<PeerImp>&&)
275 //
276 template <class UnaryFunc>
277 void
278 for_each(UnaryFunc&& f) const
279 {
281 {
283
284 // Iterate over a copy of the peer list because peer
285 // destruction can invalidate iterators.
286 wp.reserve(ids_.size());
287
288 for (auto& x : ids_)
289 wp.push_back(x.second);
290 }
291
292 for (auto& w : wp)
293 {
294 if (auto p = w.lock())
295 f(std::move(p));
296 }
297 }
298
299 // Called when TMManifests is received from a peer
300 void
303 std::shared_ptr<PeerImp> const& from);
304
305 static bool
306 isPeerUpgrade(http_request_type const& request);
307
308 template <class Body>
309 static bool
310 isPeerUpgrade(boost::beast::http::response<Body> const& response)
311 {
312 if (!is_upgrade(response))
313 return false;
314 return response.result() ==
315 boost::beast::http::status::switching_protocols;
316 }
317
318 template <class Fields>
319 static bool
320 is_upgrade(boost::beast::http::header<true, Fields> const& req)
321 {
322 if (req.version() < 11)
323 return false;
324 if (req.method() != boost::beast::http::verb::get)
325 return false;
326 if (!boost::beast::http::token_list{req["Connection"]}.exists(
327 "upgrade"))
328 return false;
329 return true;
330 }
331
332 template <class Fields>
333 static bool
334 is_upgrade(boost::beast::http::header<false, Fields> const& req)
335 {
336 if (req.version() < 11)
337 return false;
338 if (!boost::beast::http::token_list{req["Connection"]}.exists(
339 "upgrade"))
340 return false;
341 return true;
342 }
343
344 static std::string
346
347 void
348 reportTraffic(TrafficCount::category cat, bool isInbound, int bytes);
349
350 void
352 {
354 }
355
357 getJqTransOverflow() const override
358 {
359 return jqTransOverflow_;
360 }
361
362 void
364 {
366 }
367
369 getPeerDisconnect() const override
370 {
371 return peerDisconnects_;
372 }
373
374 void
376 {
378 }
379
382 {
384 }
385
387 networkID() const override
388 {
389 return setup_.networkID;
390 }
391
401 void
403 uint256 const& key,
404 PublicKey const& validator,
405 std::set<Peer::id_t>&& peers,
406 protocol::MessageType type);
407
410 void
412 uint256 const& key,
413 PublicKey const& validator,
414 Peer::id_t peer,
415 protocol::MessageType type);
416
422 void
424
426 txMetrics() const override
427 {
428 return txMetrics_.json();
429 }
430
432 template <typename... Args>
433 void
434 addTxMetrics(Args... args)
435 {
436 if (!strand_.running_in_this_thread())
437 return post(
438 strand_,
439 std::bind(&OverlayImpl::addTxMetrics<Args...>, this, args...));
440
441 txMetrics_.addMetrics(args...);
442 }
443
444private:
445 void
446 squelch(
447 PublicKey const& validator,
448 Peer::id_t const id,
449 std::uint32_t squelchDuration) const override;
450
451 void
452 unsquelch(PublicKey const& validator, Peer::id_t id) const override;
453
457 http_request_type const& request,
458 address_type remote_address);
459
463 http_request_type const& request,
464 address_type remote_address,
465 std::string msg);
466
472 bool
473 processCrawl(http_request_type const& req, Handoff& handoff);
474
482 bool
483 processValidatorList(http_request_type const& req, Handoff& handoff);
484
490 bool
491 processHealth(http_request_type const& req, Handoff& handoff);
492
497 bool
498 processRequest(http_request_type const& req, Handoff& handoff);
499
506
513
520
526 getUnlInfo();
527
528 //--------------------------------------------------------------------------
529
530 //
531 // PropertyStream
532 //
533
534 void
535 onWrite(beast::PropertyStream::Map& stream) override;
536
537 //--------------------------------------------------------------------------
538
539 void
540 remove(Child& child);
541
542 void
543 stopChildren();
544
545 void
546 autoConnect();
547
548 void
550
552 void
553 sendTxQueue();
554
557 void
559
560private:
562 {
564 char const* name,
565 beast::insight::Collector::ptr const& collector)
566 : bytesIn(collector->make_gauge(name, "Bytes_In"))
567 , bytesOut(collector->make_gauge(name, "Bytes_Out"))
568 , messagesIn(collector->make_gauge(name, "Messages_In"))
569 , messagesOut(collector->make_gauge(name, "Messages_Out"))
570 {
571 }
576 };
577
578 struct Stats
579 {
580 template <class Handler>
582 Handler const& handler,
583 beast::insight::Collector::ptr const& collector,
584 std::vector<TrafficGauges>&& trafficGauges_)
586 collector->make_gauge("Overlay", "Peer_Disconnects"))
587 , trafficGauges(std::move(trafficGauges_))
588 , hook(collector->make_hook(handler))
589 {
590 }
591
595 };
596
599
600private:
601 void
603 {
604 auto counts = m_traffic.getCounts();
606 XRPL_ASSERT(
607 counts.size() == m_stats.trafficGauges.size(),
608 "ripple::OverlayImpl::collect_metrics : counts size do match");
609
610 for (std::size_t i = 0; i < counts.size(); ++i)
611 {
612 m_stats.trafficGauges[i].bytesIn = counts[i].bytesIn;
613 m_stats.trafficGauges[i].bytesOut = counts[i].bytesOut;
614 m_stats.trafficGauges[i].messagesIn = counts[i].messagesIn;
615 m_stats.trafficGauges[i].messagesOut = counts[i].messagesOut;
616 }
618 }
619};
620
621} // namespace ripple
622
623#endif
T bind(T... args)
Represents a JSON value.
Definition: json_value.h:148
A version-independent IP address and port combination.
Definition: IPEndpoint.h:39
A generic endpoint for log messages.
Definition: Journal.h:60
std::string const & name() const
Returns the name of this source.
A metric for measuring an integral value.
Definition: Gauge.h:40
A reference to a handler for performing polled collection.
Definition: Hook.h:32
Holds unparsed configuration information.
Definition: BasicConfig.h:218
boost::system::error_code error_code
Definition: OverlayImpl.h:83
Json::Value getUnlInfo()
Returns information about the local server's UNL.
void stop() override
static std::string makePrefix(std::uint32_t id)
PeerFinder::Manager & peerFinder()
Definition: OverlayImpl.h:161
boost::asio::ip::tcp::endpoint endpoint_type
Definition: OverlayImpl.h:82
std::atomic< uint64_t > peerDisconnects_
Definition: OverlayImpl.h:122
bool processHealth(http_request_type const &req, Handoff &handoff)
Handles health requests.
boost::asio::ip::address address_type
Definition: OverlayImpl.h:81
static bool is_upgrade(boost::beast::http::header< true, Fields > const &req)
Definition: OverlayImpl.h:320
std::condition_variable_any cond_
Definition: OverlayImpl.h:107
void onWrite(beast::PropertyStream::Map &stream) override
Subclass override.
Json::Value txMetrics() const override
Returns tx reduce-relay metrics.
Definition: OverlayImpl.h:426
void deleteIdlePeers()
Check if peers stopped relaying messages and if slots stopped receiving messages from the validator.
Resolver & m_resolver
Definition: OverlayImpl.h:118
void reportTraffic(TrafficCount::category cat, bool isInbound, int bytes)
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
PeerSequence getActivePeers() const override
Returns a sequence representing the current list of peers.
void start() override
hash_map< std::shared_ptr< PeerFinder::Slot >, std::weak_ptr< PeerImp > > m_peers
Definition: OverlayImpl.h:116
void add_active(std::shared_ptr< PeerImp > const &peer)
std::shared_ptr< Peer > findPeerByPublicKey(PublicKey const &pubKey) override
Returns the peer with the matching public key, or null.
Resource::Manager & m_resourceManager
Definition: OverlayImpl.h:113
std::shared_ptr< Message > manifestMessage_
Definition: OverlayImpl.h:131
std::optional< std::uint32_t > manifestListSeq_
Definition: OverlayImpl.h:133
OverlayImpl & operator=(OverlayImpl const &)=delete
TrafficCount m_traffic
Definition: OverlayImpl.h:115
void squelch(PublicKey const &validator, Peer::id_t const id, std::uint32_t squelchDuration) const override
Squelch handler.
std::shared_ptr< Writer > makeErrorResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remote_address, std::string msg)
reduce_relay::Slots< UptimeClock > slots_
Definition: OverlayImpl.h:125
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
std::shared_ptr< Peer > findPeerByShortID(Peer::id_t const &id) const override
Returns the peer with the matching short id, or null.
std::atomic< Peer::id_t > next_id_
Definition: OverlayImpl.h:119
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
Definition: OverlayImpl.h:363
boost::asio::io_service & io_service_
Definition: OverlayImpl.h:103
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
Definition: OverlayImpl.h:434
Application & app_
Definition: OverlayImpl.h:102
std::optional< std::uint32_t > networkID() const override
Returns the ID of the network this server is configured for, if any.
Definition: OverlayImpl.h:387
std::weak_ptr< Timer > timer_
Definition: OverlayImpl.h:108
std::atomic< uint64_t > jqTransOverflow_
Definition: OverlayImpl.h:121
metrics::TxMetrics txMetrics_
Definition: OverlayImpl.h:128
void broadcast(protocol::TMProposeSet &m) override
Broadcast a proposal.
void onPeerDeactivate(Peer::id_t id)
std::mutex manifestLock_
Definition: OverlayImpl.h:135
bool processRequest(http_request_type const &req, Handoff &handoff)
Handles non-peer protocol requests.
std::recursive_mutex mutex_
Definition: OverlayImpl.h:106
std::uint64_t getPeerDisconnectCharges() const override
Definition: OverlayImpl.h:381
boost::asio::ip::tcp::socket socket_type
Definition: OverlayImpl.h:80
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
void sendTxQueue()
Send once a second transactions' hashes aggregated by peers.
std::set< Peer::id_t > relay(protocol::TMProposeSet &m, uint256 const &uid, PublicKey const &validator) override
Relay a proposal.
std::size_t size() const override
The number of active peers on the network Active peers are only those peers that have completed the h...
void unsquelch(PublicKey const &validator, Peer::id_t id) const override
Unsquelch handler.
std::shared_ptr< Writer > makeRedirectResponse(std::shared_ptr< PeerFinder::Slot > const &slot, http_request_type const &request, address_type remote_address)
void for_each(UnaryFunc &&f) const
Definition: OverlayImpl.h:278
static bool isPeerUpgrade(boost::beast::http::response< Body > const &response)
Definition: OverlayImpl.h:310
std::optional< boost::asio::io_service::work > work_
Definition: OverlayImpl.h:104
OverlayImpl(OverlayImpl const &)=delete
Json::Value getOverlayInfo()
Returns information about peers on the overlay network.
Resource::Manager & resourceManager()
Definition: OverlayImpl.h:167
static bool isPeerUpgrade(http_request_type const &request)
Json::Value getServerCounts()
Returns information about the local server's performance counters.
boost::asio::io_service::strand strand_
Definition: OverlayImpl.h:105
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
std::unique_ptr< PeerFinder::Manager > m_peerFinder
Definition: OverlayImpl.h:114
std::uint64_t getJqTransOverflow() const override
Definition: OverlayImpl.h:357
void connect(beast::IP::Endpoint const &remote_endpoint) override
Establish a peer connection to the specified endpoint.
Handoff onHandoff(std::unique_ptr< stream_type > &&bundle, http_request_type &&request, endpoint_type remote_endpoint) override
Conditionally accept an incoming HTTP request.
Setup const & setup() const
Definition: OverlayImpl.h:173
std::atomic< uint64_t > peerDisconnectsCharges_
Definition: OverlayImpl.h:123
std::shared_ptr< Message > getManifestsMessage()
hash_map< Peer::id_t, std::weak_ptr< PeerImp > > ids_
Definition: OverlayImpl.h:117
Json::Value getServerInfo()
Returns information about the local server.
bool processValidatorList(http_request_type const &req, Handoff &handoff)
Handles validator list requests.
Json::Value json() override
Return diagnostics on the status of all peers.
std::mutex m_statsMutex
Definition: OverlayImpl.h:598
void checkTracking(std::uint32_t) override
Calls the checkTracking function on each peer.
void incPeerDisconnectCharges() override
Definition: OverlayImpl.h:375
ServerHandler & serverHandler_
Definition: OverlayImpl.h:112
bool processCrawl(http_request_type const &req, Handoff &handoff)
Handles crawl requests.
static bool is_upgrade(boost::beast::http::header< false, Fields > const &req)
Definition: OverlayImpl.h:334
int limit() override
Returns the maximum number of peers we are configured to allow.
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, std::set< Peer::id_t > &&peers, protocol::MessageType type)
Updates message count for validator/peer.
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
Definition: OverlayImpl.h:351
beast::Journal const journal_
Definition: OverlayImpl.h:111
boost::container::flat_map< Child *, std::weak_ptr< Child > > list_
Definition: OverlayImpl.h:109
std::uint64_t getPeerDisconnect() const override
Definition: OverlayImpl.h:369
Manages the set of connected peers.
Definition: Overlay.h:49
std::vector< std::shared_ptr< Peer > > PeerSequence
Definition: Overlay.h:76
Maintains a set of IP addresses used for getting into the network.
A public key.
Definition: PublicKey.h:62
Tracks load and resource consumption.
auto const & getCounts() const
An up-to-date copy of all the counters.
Definition: TrafficCount.h:197
Slots is a container for validator's Slot and handles Slot update when a message is received from a v...
Definition: overlay/Slot.h:536
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: algorithm.h:26
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition: Handoff.h:33
STL namespace.
T push_back(T... args)
T reserve(T... args)
Used to indicate the result of a server connection handoff.
Definition: Handoff.h:40
beast::insight::Gauge peerDisconnects
Definition: OverlayImpl.h:592
std::vector< TrafficGauges > trafficGauges
Definition: OverlayImpl.h:593
Stats(Handler const &handler, beast::insight::Collector::ptr const &collector, std::vector< TrafficGauges > &&trafficGauges_)
Definition: OverlayImpl.h:581
beast::insight::Hook hook
Definition: OverlayImpl.h:594
void on_timer(error_code ec)
Definition: OverlayImpl.cpp:92
boost::asio::basic_waitable_timer< clock_type > timer_
Definition: OverlayImpl.h:87
beast::insight::Gauge messagesIn
Definition: OverlayImpl.h:574
beast::insight::Gauge bytesIn
Definition: OverlayImpl.h:572
beast::insight::Gauge messagesOut
Definition: OverlayImpl.h:575
TrafficGauges(char const *name, beast::insight::Collector::ptr const &collector)
Definition: OverlayImpl.h:563
beast::insight::Gauge bytesOut
Definition: OverlayImpl.h:573
std::optional< std::uint32_t > networkID
Definition: Overlay.h:72
Run transaction reduce-relay feature related metrics.
Definition: TxMetrics.h:89
void addMetrics(protocol::MessageType type, std::uint32_t val)
Add protocol message metrics.
Definition: TxMetrics.cpp:31
Json::Value json() const
Get json representation of the metrics.
Definition: TxMetrics.cpp:118