rippled
Loading...
Searching...
No Matches
PeerImp.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/app/consensus/RCLValidations.h>
21#include <xrpld/app/ledger/InboundLedgers.h>
22#include <xrpld/app/ledger/InboundTransactions.h>
23#include <xrpld/app/ledger/LedgerMaster.h>
24#include <xrpld/app/ledger/TransactionMaster.h>
25#include <xrpld/app/misc/HashRouter.h>
26#include <xrpld/app/misc/LoadFeeTrack.h>
27#include <xrpld/app/misc/NetworkOPs.h>
28#include <xrpld/app/misc/Transaction.h>
29#include <xrpld/app/misc/ValidatorList.h>
30#include <xrpld/app/tx/apply.h>
31#include <xrpld/overlay/Cluster.h>
32#include <xrpld/overlay/detail/PeerImp.h>
33#include <xrpld/overlay/detail/Tuning.h>
34#include <xrpld/perflog/PerfLog.h>
35
36#include <xrpl/basics/UptimeClock.h>
37#include <xrpl/basics/base64.h>
38#include <xrpl/basics/random.h>
39#include <xrpl/basics/safe_cast.h>
40#include <xrpl/protocol/TxFlags.h>
41#include <xrpl/protocol/digest.h>
42
43#include <boost/algorithm/string/predicate.hpp>
44#include <boost/beast/core/ostream.hpp>
45
46#include <algorithm>
47#include <chrono>
48#include <memory>
49#include <mutex>
50#include <numeric>
51#include <sstream>
52
53using namespace std::chrono_literals;
54
55namespace ripple {
56
57namespace {
59std::chrono::milliseconds constexpr peerHighLatency{300};
60
62std::chrono::seconds constexpr peerTimerInterval{60};
63
65std::chrono::seconds constexpr shutdownTimerInterval{5};
66
67} // namespace
68
69// TODO: Remove this exclusion once unit tests are added after the hotfix
70// release.
71
73 Application& app,
74 id_t id,
76 http_request_type&& request,
77 PublicKey const& publicKey,
79 Resource::Consumer consumer,
81 OverlayImpl& overlay)
82 : Child(overlay)
83 , app_(app)
84 , id_(id)
85 , sink_(app_.journal("Peer"), makePrefix(id))
86 , p_sink_(app_.journal("Protocol"), makePrefix(id))
87 , journal_(sink_)
88 , p_journal_(p_sink_)
89 , stream_ptr_(std::move(stream_ptr))
90 , socket_(stream_ptr_->next_layer().socket())
91 , stream_(*stream_ptr_)
92 , strand_(boost::asio::make_strand(socket_.get_executor()))
93 , timer_(waitable_timer{socket_.get_executor()})
94 , remote_address_(slot->remote_endpoint())
95 , overlay_(overlay)
96 , inbound_(true)
97 , protocol_(protocol)
98 , tracking_(Tracking::unknown)
99 , trackingTime_(clock_type::now())
100 , publicKey_(publicKey)
101 , lastPingTime_(clock_type::now())
102 , creationTime_(clock_type::now())
103 , squelch_(app_.journal("Squelch"))
104 , usage_(consumer)
105 , fee_{Resource::feeTrivialPeer, ""}
106 , slot_(slot)
107 , request_(std::move(request))
108 , headers_(request_)
109 , compressionEnabled_(
111 headers_,
113 "lz4",
114 app_.config().COMPRESSION)
115 ? Compressed::On
116 : Compressed::Off)
117 , txReduceRelayEnabled_(peerFeatureEnabled(
118 headers_,
120 app_.config().TX_REDUCE_RELAY_ENABLE))
121 , ledgerReplayEnabled_(peerFeatureEnabled(
122 headers_,
124 app_.config().LEDGER_REPLAY))
125 , ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
126{
127 JLOG(journal_.info())
128 << "compression enabled " << (compressionEnabled_ == Compressed::On)
129 << " vp reduce-relay base squelch enabled "
131 headers_,
134 << " tx reduce-relay enabled " << txReduceRelayEnabled_ << " on "
135 << remote_address_ << " " << id_;
136}
137
139{
140 bool const inCluster{cluster()};
141
146
147 if (inCluster)
148 {
149 JLOG(journal_.warn()) << name() << " left cluster";
150 }
151}
152
153// Helper function to check for valid uint256 values in protobuf buffers
154static bool
156{
157 return pBuffStr.size() == uint256::size();
158}
159
160void
162{
163 if (!strand_.running_in_this_thread())
165
166 auto parseLedgerHash =
168 if (uint256 ret; ret.parseHex(value))
169 return ret;
170
171 if (auto const s = base64_decode(value); s.size() == uint256::size())
172 return uint256{s};
173
174 return std::nullopt;
175 };
176
178 std::optional<uint256> previous;
179
180 if (auto const iter = headers_.find("Closed-Ledger");
181 iter != headers_.end())
182 {
183 closed = parseLedgerHash(iter->value());
184
185 if (!closed)
186 fail("Malformed handshake data (1)");
187 }
188
189 if (auto const iter = headers_.find("Previous-Ledger");
190 iter != headers_.end())
191 {
192 previous = parseLedgerHash(iter->value());
193
194 if (!previous)
195 fail("Malformed handshake data (2)");
196 }
197
198 if (previous && !closed)
199 fail("Malformed handshake data (3)");
200
201 {
203 if (closed)
204 closedLedgerHash_ = *closed;
205 if (previous)
206 previousLedgerHash_ = *previous;
207 }
208
209 if (inbound_)
210 doAccept();
211 else
213
214 // Anything else that needs to be done with the connection should be
215 // done in doProtocolStart
216}
217
218void
220{
221 if (!strand_.running_in_this_thread())
223
224 if (!socket_.is_open())
225 return;
226
227 // The rationale for using different severity levels is that
228 // outbound connections are under our control and may be logged
229 // at a higher level, but inbound connections are more numerous and
230 // uncontrolled so to prevent log flooding the severity is reduced.
231 JLOG(journal_.debug()) << "stop: Stop";
232
233 shutdown();
234}
235
236//------------------------------------------------------------------------------
237
238void
240{
241 if (!strand_.running_in_this_thread())
242 return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
243
244 if (!socket_.is_open())
245 return;
246
247 // we are in progress of closing the connection
248 if (shutdown_)
249 return tryAsyncShutdown();
250
251 auto validator = m->getValidatorKey();
252 if (validator && !squelch_.expireSquelch(*validator))
253 {
256 static_cast<int>(m->getBuffer(compressionEnabled_).size()));
257 return;
258 }
259
260 // report categorized outgoing traffic
262 safe_cast<TrafficCount::category>(m->getCategory()),
263 static_cast<int>(m->getBuffer(compressionEnabled_).size()));
264
265 // report total outgoing traffic
268 static_cast<int>(m->getBuffer(compressionEnabled_).size()));
269
270 auto sendq_size = send_queue_.size();
271
272 if (sendq_size < Tuning::targetSendQueue)
273 {
274 // To detect a peer that does not read from their
275 // side of the connection, we expect a peer to have
276 // a small senq periodically
277 large_sendq_ = 0;
278 }
279 else if (auto sink = journal_.debug();
280 sink && (sendq_size % Tuning::sendQueueLogFreq) == 0)
281 {
282 std::string const n = name();
283 sink << (n.empty() ? remote_address_.to_string() : n)
284 << " sendq: " << sendq_size;
285 }
286
287 send_queue_.push(m);
288
289 if (sendq_size != 0)
290 return;
291
292 writePending_ = true;
293 boost::asio::async_write(
294 stream_,
295 boost::asio::buffer(
296 send_queue_.front()->getBuffer(compressionEnabled_)),
297 bind_executor(
298 strand_,
299 std::bind(
302 std::placeholders::_1,
303 std::placeholders::_2)));
304}
305
306void
308{
309 if (!strand_.running_in_this_thread())
310 return post(
312
313 if (!txQueue_.empty())
314 {
315 protocol::TMHaveTransactions ht;
316 std::for_each(txQueue_.begin(), txQueue_.end(), [&](auto const& hash) {
317 ht.add_hashes(hash.data(), hash.size());
318 });
319 JLOG(p_journal_.trace()) << "sendTxQueue " << txQueue_.size();
320 txQueue_.clear();
321 send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
322 }
323}
324
325void
327{
328 if (!strand_.running_in_this_thread())
329 return post(
331
333 {
334 JLOG(p_journal_.warn()) << "addTxQueue exceeds the cap";
335 sendTxQueue();
336 }
337
338 txQueue_.insert(hash);
339 JLOG(p_journal_.trace()) << "addTxQueue " << txQueue_.size();
340}
341
342void
344{
345 if (!strand_.running_in_this_thread())
346 return post(
347 strand_,
349
350 auto removed = txQueue_.erase(hash);
351 JLOG(p_journal_.trace()) << "removeTxQueue " << removed;
352}
353
354void
356{
357 if ((usage_.charge(fee, context) == Resource::drop) &&
358 usage_.disconnect(p_journal_) && strand_.running_in_this_thread())
359 {
360 // Sever the connection
362 fail("charge: Resources");
363 }
364}
365
366//------------------------------------------------------------------------------
367
368bool
370{
371 auto const iter = headers_.find("Crawl");
372 if (iter == headers_.end())
373 return false;
374 return boost::iequals(iter->value(), "public");
375}
376
377bool
379{
380 return static_cast<bool>(app_.cluster().member(publicKey_));
381}
382
385{
386 if (inbound_)
387 return headers_["User-Agent"];
388 return headers_["Server"];
389}
390
393{
395
396 ret[jss::public_key] = toBase58(TokenType::NodePublic, publicKey_);
397 ret[jss::address] = remote_address_.to_string();
398
399 if (inbound_)
400 ret[jss::inbound] = true;
401
402 if (cluster())
403 {
404 ret[jss::cluster] = true;
405
406 if (auto const n = name(); !n.empty())
407 // Could move here if Json::Value supported moving from a string
408 ret[jss::name] = n;
409 }
410
411 if (auto const d = domain(); !d.empty())
412 ret[jss::server_domain] = std::string{d};
413
414 if (auto const nid = headers_["Network-ID"]; !nid.empty())
415 ret[jss::network_id] = std::string{nid};
416
417 ret[jss::load] = usage_.balance();
418
419 if (auto const version = getVersion(); !version.empty())
420 ret[jss::version] = std::string{version};
421
422 ret[jss::protocol] = to_string(protocol_);
423
424 {
426 if (latency_)
427 ret[jss::latency] = static_cast<Json::UInt>(latency_->count());
428 }
429
430 ret[jss::uptime] = static_cast<Json::UInt>(
431 std::chrono::duration_cast<std::chrono::seconds>(uptime()).count());
432
433 std::uint32_t minSeq, maxSeq;
434 ledgerRange(minSeq, maxSeq);
435
436 if ((minSeq != 0) || (maxSeq != 0))
437 ret[jss::complete_ledgers] =
438 std::to_string(minSeq) + " - " + std::to_string(maxSeq);
439
440 switch (tracking_.load())
441 {
443 ret[jss::track] = "diverged";
444 break;
445
447 ret[jss::track] = "unknown";
448 break;
449
451 // Nothing to do here
452 break;
453 }
454
455 uint256 closedLedgerHash;
456 protocol::TMStatusChange last_status;
457 {
459 closedLedgerHash = closedLedgerHash_;
460 last_status = last_status_;
461 }
462
463 if (closedLedgerHash != beast::zero)
464 ret[jss::ledger] = to_string(closedLedgerHash);
465
466 if (last_status.has_newstatus())
467 {
468 switch (last_status.newstatus())
469 {
470 case protocol::nsCONNECTING:
471 ret[jss::status] = "connecting";
472 break;
473
474 case protocol::nsCONNECTED:
475 ret[jss::status] = "connected";
476 break;
477
478 case protocol::nsMONITORING:
479 ret[jss::status] = "monitoring";
480 break;
481
482 case protocol::nsVALIDATING:
483 ret[jss::status] = "validating";
484 break;
485
486 case protocol::nsSHUTTING:
487 ret[jss::status] = "shutting";
488 break;
489
490 default:
491 JLOG(p_journal_.warn())
492 << "Unknown status: " << last_status.newstatus();
493 }
494 }
495
496 ret[jss::metrics] = Json::Value(Json::objectValue);
497 ret[jss::metrics][jss::total_bytes_recv] =
498 std::to_string(metrics_.recv.total_bytes());
499 ret[jss::metrics][jss::total_bytes_sent] =
500 std::to_string(metrics_.sent.total_bytes());
501 ret[jss::metrics][jss::avg_bps_recv] =
502 std::to_string(metrics_.recv.average_bytes());
503 ret[jss::metrics][jss::avg_bps_sent] =
504 std::to_string(metrics_.sent.average_bytes());
505
506 return ret;
507}
508
509bool
511{
512 switch (f)
513 {
515 return protocol_ >= make_protocol(2, 1);
517 return protocol_ >= make_protocol(2, 2);
520 }
521 return false;
522}
523
524//------------------------------------------------------------------------------
525
526bool
528{
529 {
531 if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_) &&
533 return true;
534 if (std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
535 recentLedgers_.end())
536 return true;
537 }
538 return false;
539}
540
541void
543{
545
546 minSeq = minLedger_;
547 maxSeq = maxLedger_;
548}
549
550bool
551PeerImp::hasTxSet(uint256 const& hash) const
552{
554 return std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
555 recentTxSets_.end();
556}
557
558void
560{
561 // Operations on closedLedgerHash_ and previousLedgerHash_ must be
562 // guarded by recentLock_.
566}
567
568bool
570{
572 return (tracking_ != Tracking::diverged) && (uMin >= minLedger_) &&
573 (uMax <= maxLedger_);
574}
575
576//------------------------------------------------------------------------------
577
578void
580{
581 XRPL_ASSERT(
582 strand_.running_in_this_thread(),
583 "ripple::PeerImp::fail : strand in this thread");
584
585 if (!socket_.is_open())
586 return;
587
588 JLOG(journal_.warn()) << name << " from "
590 << " at " << remote_address_.to_string() << ": "
591 << ec.message();
592
593 shutdown();
594}
595
596void
598{
599 if (!strand_.running_in_this_thread())
600 return post(
601 strand_,
602 std::bind(
603 (void(Peer::*)(std::string const&)) & PeerImp::fail,
605 reason));
606
607 if (!socket_.is_open())
608 return;
609
610 // Call to name() locks, log only if the message will be outputed
612 {
613 std::string const n = name();
614 JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
615 << " failed: " << reason;
616 }
617
618 shutdown();
619}
620
621void
623{
624 XRPL_ASSERT(
625 strand_.running_in_this_thread(),
626 "ripple::PeerImp::tryAsyncShutdown : strand in this thread");
627
629 return;
630
632 return;
633
634 shutdownStarted_ = true;
635
636 setTimer(shutdownTimerInterval);
637
638 // gracefully shutdown the SSL socket, performing a shutdown handshake
639 stream_.async_shutdown(bind_executor(
640 strand_,
641 std::bind(
642 &PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
643}
644
645void
647{
648 XRPL_ASSERT(
649 strand_.running_in_this_thread(),
650 "ripple::PeerImp::shutdown: strand in this thread");
651
652 if (!socket_.is_open() || shutdown_)
653 return;
654
655 shutdown_ = true;
656
657 boost::beast::get_lowest_layer(stream_).cancel();
658
660}
661
662void
664{
665 cancelTimer();
666 if (ec)
667 {
668 // - eof: the stream was cleanly closed
669 // - operation_aborted: an expired timer (slow shutdown)
670 // - stream_truncated: the tcp connection closed (no handshake) it could
671 // occur if a peer does not perform a graceful disconnect
672 // - broken_pipe: the peer is gone
673 bool shouldLog =
674 (ec != boost::asio::error::eof &&
675 ec != boost::asio::error::operation_aborted &&
676 ec.message().find("application data after close notify") ==
677 std::string::npos);
678
679 if (shouldLog)
680 {
681 JLOG(journal_.debug()) << "onShutdown: " << ec.message();
682 }
683 }
684
685 close();
686}
687
688void
690{
691 XRPL_ASSERT(
692 strand_.running_in_this_thread(),
693 "ripple::PeerImp::close : strand in this thread");
694
695 if (!socket_.is_open())
696 return;
697
698 cancelTimer();
699
700 error_code ec;
701 socket_.close(ec);
702
704
705 // The rationale for using different severity levels is that
706 // outbound connections are under our control and may be logged
707 // at a higher level, but inbound connections are more numerous and
708 // uncontrolled so to prevent log flooding the severity is reduced.
709 JLOG((inbound_ ? journal_.debug() : journal_.info())) << "close: Closed";
710}
711
712//------------------------------------------------------------------------------
713
714void
716{
717 try
718 {
719 timer_.expires_after(interval);
720 }
721 catch (std::exception const& ex)
722 {
723 JLOG(journal_.error()) << "setTimer: " << ex.what();
724 return shutdown();
725 }
726
727 timer_.async_wait(bind_executor(
728 strand_,
729 std::bind(
730 &PeerImp::onTimer, shared_from_this(), std::placeholders::_1)));
731}
732
733void
735{
736 XRPL_ASSERT(
737 strand_.running_in_this_thread(),
738 "ripple::PeerImp::onTimer : strand in this thread");
739
740 if (!socket_.is_open())
741 return;
742
743 if (ec)
744 {
745 // do not initiate shutdown, timers are frequently cancelled
746 if (ec == boost::asio::error::operation_aborted)
747 return;
748
749 // This should never happen
750 JLOG(journal_.error()) << "onTimer: " << ec.message();
751 return close();
752 }
753
754 // the timer expired before the shutdown completed
755 // force close the connection
756 if (shutdown_)
757 {
758 JLOG(journal_.debug()) << "onTimer: shutdown timer expired";
759 return close();
760 }
761
763 return fail("Large send queue");
764
765 if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged)
766 {
767 clock_type::duration duration;
768
769 {
771 duration = clock_type::now() - trackingTime_;
772 }
773
774 if ((t == Tracking::diverged &&
775 (duration > app_.config().MAX_DIVERGED_TIME)) ||
776 (t == Tracking::unknown &&
777 (duration > app_.config().MAX_UNKNOWN_TIME)))
778 {
780 return fail("Not useful");
781 }
782 }
783
784 // Already waiting for PONG
785 if (lastPingSeq_)
786 return fail("Ping Timeout");
787
789 lastPingSeq_ = rand_int<std::uint32_t>();
790
791 protocol::TMPing message;
792 message.set_type(protocol::TMPing::ptPING);
793 message.set_seq(*lastPingSeq_);
794
795 send(std::make_shared<Message>(message, protocol::mtPING));
796
797 setTimer(peerTimerInterval);
798}
799
800void
802{
803 try
804 {
805 timer_.cancel();
806 }
807 catch (std::exception const& ex)
808 {
809 JLOG(journal_.error()) << "cancelTimer: " << ex.what();
810 }
811}
812
815{
817 ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
818 return ss.str();
819}
820
821//------------------------------------------------------------------------------
822void
824{
825 XRPL_ASSERT(
826 read_buffer_.size() == 0,
827 "ripple::PeerImp::doAccept : empty read buffer");
828
829 JLOG(journal_.debug()) << "doAccept: " << remote_address_;
830
831 // a shutdown was initiated before the handshake, there is nothing to do
832 if (shutdown_)
833 return tryAsyncShutdown();
834
835 auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
836
837 // This shouldn't fail since we already computed
838 // the shared value successfully in OverlayImpl
839 if (!sharedValue)
840 return fail("makeSharedValue: Unexpected failure");
841
842 JLOG(journal_.debug()) << "Protocol: " << to_string(protocol_);
843 JLOG(journal_.info()) << "Public Key: "
845
846 if (auto member = app_.cluster().member(publicKey_))
847 {
848 {
850 name_ = *member;
851 }
852 JLOG(journal_.info()) << "Cluster name: " << *member;
853 }
854
856
857 // XXX Set timer: connection is in grace period to be useful.
858 // XXX Set timer: connection idle (idle may vary depending on connection
859 // type.)
860
862
863 boost::beast::ostream(*write_buffer) << makeResponse(
865 request_,
868 *sharedValue,
870 protocol_,
871 app_);
872
873 // Write the whole buffer and only start protocol when that's done.
874 boost::asio::async_write(
875 stream_,
876 write_buffer->data(),
877 boost::asio::transfer_all(),
878 bind_executor(
879 strand_,
880 [this, write_buffer, self = shared_from_this()](
881 error_code ec, std::size_t bytes_transferred) {
882 if (!socket_.is_open())
883 return;
884 if (ec == boost::asio::error::operation_aborted)
885 return tryAsyncShutdown();
886 if (ec)
887 return fail("onWriteResponse", ec);
888 if (write_buffer->size() == bytes_transferred)
889 return doProtocolStart();
890 return fail("Failed to write header");
891 }));
892}
893
896{
897 std::shared_lock read_lock{nameMutex_};
898 return name_;
899}
900
903{
904 return headers_["Server-Domain"];
905}
906
907//------------------------------------------------------------------------------
908
909// Protocol logic
910
911void
913{
914 // a shutdown was initiated before the handshare, there is nothing to do
915 if (shutdown_)
916 return tryAsyncShutdown();
917
919
920 // Send all the validator lists that have been loaded
922 {
924 [&](std::string const& manifest,
925 std::uint32_t version,
927 PublicKey const& pubKey,
928 std::size_t maxSequence,
929 uint256 const& hash) {
931 *this,
932 0,
933 pubKey,
934 maxSequence,
935 version,
936 manifest,
937 blobInfos,
939 p_journal_);
940
941 // Don't send it next time.
943 });
944 }
945
946 if (auto m = overlay_.getManifestsMessage())
947 send(m);
948
949 setTimer(peerTimerInterval);
950}
951
952// Called repeatedly with protocol message data
953void
955{
956 XRPL_ASSERT(
957 strand_.running_in_this_thread(),
958 "ripple::PeerImp::onReadMessage : strand in this thread");
959
960 readPending_ = false;
961
962 if (!socket_.is_open())
963 return;
964
965 if (ec)
966 {
967 if (ec == boost::asio::error::eof)
968 {
969 JLOG(journal_.debug()) << "EOF";
970 return shutdown();
971 }
972
973 if (ec == boost::asio::error::operation_aborted)
974 return tryAsyncShutdown();
975
976 return fail("onReadMessage", ec);
977 }
978 // we started shutdown, no reason to process further data
979 if (shutdown_)
980 return tryAsyncShutdown();
981
982 if (auto stream = journal_.trace())
983 {
984 stream << "onReadMessage: "
985 << (bytes_transferred > 0
986 ? to_string(bytes_transferred) + " bytes"
987 : "");
988 }
989
990 metrics_.recv.add_message(bytes_transferred);
991
992 read_buffer_.commit(bytes_transferred);
993
994 auto hint = Tuning::readBufferBytes;
995
996 while (read_buffer_.size() > 0)
997 {
998 std::size_t bytes_consumed;
999
1000 using namespace std::chrono_literals;
1001 std::tie(bytes_consumed, ec) = perf::measureDurationAndLog(
1002 [&]() {
1003 return invokeProtocolMessage(read_buffer_.data(), *this, hint);
1004 },
1005 "invokeProtocolMessage",
1006 350ms,
1007 journal_);
1008
1009 if (!socket_.is_open())
1010 return;
1011
1012 // the error_code is produced by invokeProtocolMessage
1013 // it could be due to a bad message
1014 if (ec)
1015 return fail("onReadMessage", ec);
1016
1017 if (bytes_consumed == 0)
1018 break;
1019
1020 read_buffer_.consume(bytes_consumed);
1021 }
1022
1023 // check if a shutdown was initiated while processing messages
1024 if (shutdown_)
1025 return tryAsyncShutdown();
1026
1027 readPending_ = true;
1028
1029 XRPL_ASSERT(
1030 !shutdownStarted_, "ripple::PeerImp::onReadMessage : shutdown started");
1031
1032 // Timeout on writes only
1033 stream_.async_read_some(
1035 bind_executor(
1036 strand_,
1037 std::bind(
1040 std::placeholders::_1,
1041 std::placeholders::_2)));
1042}
1043
1044void
1046{
1047 XRPL_ASSERT(
1048 strand_.running_in_this_thread(),
1049 "ripple::PeerImp::onWriteMessage : strand in this thread");
1050
1051 writePending_ = false;
1052
1053 if (!socket_.is_open())
1054 return;
1055
1056 if (ec)
1057 {
1058 if (ec == boost::asio::error::operation_aborted)
1059 return tryAsyncShutdown();
1060
1061 return fail("onWriteMessage", ec);
1062 }
1063
1064 if (auto stream = journal_.trace())
1065 {
1066 stream << "onWriteMessage: "
1067 << (bytes_transferred > 0
1068 ? to_string(bytes_transferred) + " bytes"
1069 : "");
1070 }
1071
1072 metrics_.sent.add_message(bytes_transferred);
1073
1074 XRPL_ASSERT(
1075 !send_queue_.empty(),
1076 "ripple::PeerImp::onWriteMessage : non-empty send buffer");
1077 send_queue_.pop();
1078
1079 if (shutdown_)
1080 return tryAsyncShutdown();
1081
1082 if (!send_queue_.empty())
1083 {
1084 writePending_ = true;
1085 XRPL_ASSERT(
1087 "ripple::PeerImp::onWriteMessage : shutdown started");
1088
1089 // Timeout on writes only
1090 return boost::asio::async_write(
1091 stream_,
1092 boost::asio::buffer(
1093 send_queue_.front()->getBuffer(compressionEnabled_)),
1094 bind_executor(
1095 strand_,
1096 std::bind(
1099 std::placeholders::_1,
1100 std::placeholders::_2)));
1101 }
1102}
1103
1104//------------------------------------------------------------------------------
1105//
1106// ProtocolHandler
1107//
1108//------------------------------------------------------------------------------
1109
1110void
1112{
1113 // TODO
1114}
1115
1116void
1118 std::uint16_t type,
1120 std::size_t size,
1121 std::size_t uncompressed_size,
1122 bool isCompressed)
1123{
1124 auto const name = protocolMessageName(type);
1127
1128 auto const category = TrafficCount::categorize(
1129 *m, static_cast<protocol::MessageType>(type), true);
1130
1131 // report total incoming traffic
1133 TrafficCount::category::total, static_cast<int>(size));
1134
1135 // increase the traffic received for a specific category
1136 overlay_.reportInboundTraffic(category, static_cast<int>(size));
1137
1138 using namespace protocol;
1139 if ((type == MessageType::mtTRANSACTION ||
1140 type == MessageType::mtHAVE_TRANSACTIONS ||
1141 type == MessageType::mtTRANSACTIONS ||
1142 // GET_OBJECTS
1144 // GET_LEDGER
1147 // LEDGER_DATA
1151 {
1153 static_cast<MessageType>(type), static_cast<std::uint64_t>(size));
1154 }
1155 JLOG(journal_.trace()) << "onMessageBegin: " << type << " " << size << " "
1156 << uncompressed_size << " " << isCompressed;
1157}
1158
1159void
1167
1168void
1170{
1171 auto const s = m->list_size();
1172
1173 if (s == 0)
1174 {
1176 return;
1177 }
1178
1179 if (s > 100)
1181
1183 jtMANIFEST, "receiveManifests", [this, that = shared_from_this(), m]() {
1184 overlay_.onManifests(m, that);
1185 });
1186}
1187
1188void
1190{
1191 if (m->type() == protocol::TMPing::ptPING)
1192 {
1193 // We have received a ping request, reply with a pong
1195 m->set_type(protocol::TMPing::ptPONG);
1196 send(std::make_shared<Message>(*m, protocol::mtPING));
1197 return;
1198 }
1199
1200 if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
1201 {
1202 // Only reset the ping sequence if we actually received a
1203 // PONG with the correct cookie. That way, any peers which
1204 // respond with incorrect cookies will eventually time out.
1205 if (m->seq() == lastPingSeq_)
1206 {
1208
1209 // Update latency estimate
1210 auto const rtt = std::chrono::round<std::chrono::milliseconds>(
1212
1214
1215 if (latency_)
1216 latency_ = (*latency_ * 7 + rtt) / 8;
1217 else
1218 latency_ = rtt;
1219 }
1220
1221 return;
1222 }
1223}
1224
1225void
1227{
1228 // VFALCO NOTE I think we should drop the peer immediately
1229 if (!cluster())
1230 {
1231 fee_.update(Resource::feeUselessData, "unknown cluster");
1232 return;
1233 }
1234
1235 for (int i = 0; i < m->clusternodes().size(); ++i)
1236 {
1237 protocol::TMClusterNode const& node = m->clusternodes(i);
1238
1240 if (node.has_nodename())
1241 name = node.nodename();
1242
1243 auto const publicKey =
1244 parseBase58<PublicKey>(TokenType::NodePublic, node.publickey());
1245
1246 // NIKB NOTE We should drop the peer immediately if
1247 // they send us a public key we can't parse
1248 if (publicKey)
1249 {
1250 auto const reportTime =
1251 NetClock::time_point{NetClock::duration{node.reporttime()}};
1252
1254 *publicKey, name, node.nodeload(), reportTime);
1255 }
1256 }
1257
1258 int loadSources = m->loadsources().size();
1259 if (loadSources != 0)
1260 {
1261 Resource::Gossip gossip;
1262 gossip.items.reserve(loadSources);
1263 for (int i = 0; i < m->loadsources().size(); ++i)
1264 {
1265 protocol::TMLoadSource const& node = m->loadsources(i);
1267 item.address = beast::IP::Endpoint::from_string(node.name());
1268 item.balance = node.cost();
1269 if (item.address != beast::IP::Endpoint())
1270 gossip.items.push_back(item);
1271 }
1273 }
1274
1275 // Calculate the cluster fee:
1276 auto const thresh = app_.timeKeeper().now() - 90s;
1277 std::uint32_t clusterFee = 0;
1278
1280 fees.reserve(app_.cluster().size());
1281
1282 app_.cluster().for_each([&fees, thresh](ClusterNode const& status) {
1283 if (status.getReportTime() >= thresh)
1284 fees.push_back(status.getLoadFee());
1285 });
1286
1287 if (!fees.empty())
1288 {
1289 auto const index = fees.size() / 2;
1290 std::nth_element(fees.begin(), fees.begin() + index, fees.end());
1291 clusterFee = fees[index];
1292 }
1293
1294 app_.getFeeTrack().setClusterFee(clusterFee);
1295}
1296
1297void
1299{
1300 // Don't allow endpoints from peers that are not known tracking or are
1301 // not using a version of the message that we support:
1302 if (tracking_.load() != Tracking::converged || m->version() != 2)
1303 return;
1304
1305 // The number is arbitrary and doesn't have any real significance or
1306 // implication for the protocol.
1307 if (m->endpoints_v2().size() >= 1024)
1308 {
1309 fee_.update(Resource::feeUselessData, "endpoints too large");
1310 return;
1311 }
1312
1314 endpoints.reserve(m->endpoints_v2().size());
1315
1316 auto malformed = 0;
1317 for (auto const& tm : m->endpoints_v2())
1318 {
1319 auto result = beast::IP::Endpoint::from_string_checked(tm.endpoint());
1320
1321 if (!result)
1322 {
1323 JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {"
1324 << tm.endpoint() << "}";
1325 malformed++;
1326 continue;
1327 }
1328
1329 // If hops == 0, this Endpoint describes the peer we are connected
1330 // to -- in that case, we take the remote address seen on the
1331 // socket and store that in the IP::Endpoint. If this is the first
1332 // time, then we'll verify that their listener can receive incoming
1333 // by performing a connectivity test. if hops > 0, then we just
1334 // take the address/port we were given
1335 if (tm.hops() == 0)
1336 result = remote_address_.at_port(result->port());
1337
1338 endpoints.emplace_back(*result, tm.hops());
1339 }
1340
1341 // Charge the peer for each malformed endpoint. As there still may be
1342 // multiple valid endpoints we don't return early.
1343 if (malformed > 0)
1344 {
1345 fee_.update(
1346 Resource::feeInvalidData * malformed,
1347 std::to_string(malformed) + " malformed endpoints");
1348 }
1349
1350 if (!endpoints.empty())
1351 overlay_.peerFinder().on_endpoints(slot_, endpoints);
1352}
1353
1354void
1359
1360void
1363 bool eraseTxQueue,
1364 bool batch)
1365{
1366 XRPL_ASSERT(
1367 eraseTxQueue != batch,
1368 ("ripple::PeerImp::handleTransaction : valid inputs"));
1370 return;
1371
1373 {
1374 // If we've never been in synch, there's nothing we can do
1375 // with a transaction
1376 JLOG(p_journal_.debug()) << "Ignoring incoming transaction: "
1377 << "Need network ledger";
1378 return;
1379 }
1380
1381 SerialIter sit(makeSlice(m->rawtransaction()));
1382
1383 try
1384 {
1385 auto stx = std::make_shared<STTx const>(sit);
1386 uint256 txID = stx->getTransactionID();
1387
1388 // Charge strongly for attempting to relay a txn with tfInnerBatchTxn
1389 // LCOV_EXCL_START
1390 if (stx->isFlag(tfInnerBatchTxn) &&
1391 getCurrentTransactionRules()->enabled(featureBatch))
1392 {
1393 JLOG(p_journal_.warn()) << "Ignoring Network relayed Tx containing "
1394 "tfInnerBatchTxn (handleTransaction).";
1395 fee_.update(Resource::feeModerateBurdenPeer, "inner batch txn");
1396 return;
1397 }
1398 // LCOV_EXCL_STOP
1399
1400 HashRouterFlags flags;
1401 constexpr std::chrono::seconds tx_interval = 10s;
1402
1403 if (!app_.getHashRouter().shouldProcess(txID, id_, flags, tx_interval))
1404 {
1405 // we have seen this transaction recently
1406 if (any(flags & HashRouterFlags::BAD))
1407 {
1408 fee_.update(Resource::feeUselessData, "known bad");
1409 JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
1410 }
1411
1412 // Erase only if the server has seen this tx. If the server has not
1413 // seen this tx then the tx could not has been queued for this peer.
1414 else if (eraseTxQueue && txReduceRelayEnabled())
1415 removeTxQueue(txID);
1416
1420
1421 return;
1422 }
1423
1424 JLOG(p_journal_.debug()) << "Got tx " << txID;
1425
1426 bool checkSignature = true;
1427 if (cluster())
1428 {
1429 if (!m->has_deferred() || !m->deferred())
1430 {
1431 // Skip local checks if a server we trust
1432 // put the transaction in its open ledger
1433 flags |= HashRouterFlags::TRUSTED;
1434 }
1435
1436 // for non-validator nodes only -- localPublicKey is set for
1437 // validators only
1439 {
1440 // For now, be paranoid and have each validator
1441 // check each transaction, regardless of source
1442 checkSignature = false;
1443 }
1444 }
1445
1447 {
1448 JLOG(p_journal_.trace())
1449 << "No new transactions until synchronized";
1450 }
1451 else if (
1454 {
1456 JLOG(p_journal_.info()) << "Transaction queue is full";
1457 }
1458 else
1459 {
1462 "recvTransaction->checkTransaction",
1464 flags,
1465 checkSignature,
1466 batch,
1467 stx]() {
1468 if (auto peer = weak.lock())
1469 peer->checkTransaction(
1470 flags, checkSignature, stx, batch);
1471 });
1472 }
1473 }
1474 catch (std::exception const& ex)
1475 {
1476 JLOG(p_journal_.warn())
1477 << "Transaction invalid: " << strHex(m->rawtransaction())
1478 << ". Exception: " << ex.what();
1479 }
1480}
1481
1482void
1484{
1485 auto badData = [&](std::string const& msg) {
1486 fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
1487 JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
1488 };
1489 auto const itype{m->itype()};
1490
1491 // Verify ledger info type
1492 if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
1493 return badData("Invalid ledger info type");
1494
1495 auto const ltype = [&m]() -> std::optional<::protocol::TMLedgerType> {
1496 if (m->has_ltype())
1497 return m->ltype();
1498 return std::nullopt;
1499 }();
1500
1501 if (itype == protocol::liTS_CANDIDATE)
1502 {
1503 if (!m->has_ledgerhash())
1504 return badData("Invalid TX candidate set, missing TX set hash");
1505 }
1506 else if (
1507 !m->has_ledgerhash() && !m->has_ledgerseq() &&
1508 !(ltype && *ltype == protocol::ltCLOSED))
1509 {
1510 return badData("Invalid request");
1511 }
1512
1513 // Verify ledger type
1514 if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
1515 return badData("Invalid ledger type");
1516
1517 // Verify ledger hash
1518 if (m->has_ledgerhash() && !stringIsUint256Sized(m->ledgerhash()))
1519 return badData("Invalid ledger hash");
1520
1521 // Verify ledger sequence
1522 if (m->has_ledgerseq())
1523 {
1524 auto const ledgerSeq{m->ledgerseq()};
1525
1526 // Check if within a reasonable range
1527 using namespace std::chrono_literals;
1529 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1530 {
1531 return badData(
1532 "Invalid ledger sequence " + std::to_string(ledgerSeq));
1533 }
1534 }
1535
1536 // Verify ledger node IDs
1537 if (itype != protocol::liBASE)
1538 {
1539 if (m->nodeids_size() <= 0)
1540 return badData("Invalid ledger node IDs");
1541
1542 for (auto const& nodeId : m->nodeids())
1543 {
1545 return badData("Invalid SHAMap node ID");
1546 }
1547 }
1548
1549 // Verify query type
1550 if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
1551 return badData("Invalid query type");
1552
1553 // Verify query depth
1554 if (m->has_querydepth())
1555 {
1556 if (m->querydepth() > Tuning::maxQueryDepth ||
1557 itype == protocol::liBASE)
1558 {
1559 return badData("Invalid query depth");
1560 }
1561 }
1562
1563 // Queue a job to process the request
1565 app_.getJobQueue().addJob(jtLEDGER_REQ, "recvGetLedger", [weak, m]() {
1566 if (auto peer = weak.lock())
1567 peer->processLedgerRequest(m);
1568 });
1569}
1570
1571void
1573{
1574 JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
1576 {
1577 fee_.update(
1578 Resource::feeMalformedRequest, "proof_path_request disabled");
1579 return;
1580 }
1581
1582 fee_.update(
1583 Resource::feeModerateBurdenPeer, "received a proof path request");
1586 jtREPLAY_REQ, "recvProofPathRequest", [weak, m]() {
1587 if (auto peer = weak.lock())
1588 {
1589 auto reply =
1590 peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
1591 if (reply.has_error())
1592 {
1593 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1594 peer->charge(
1595 Resource::feeMalformedRequest,
1596 "proof_path_request");
1597 else
1598 peer->charge(
1599 Resource::feeRequestNoReply, "proof_path_request");
1600 }
1601 else
1602 {
1603 peer->send(std::make_shared<Message>(
1604 reply, protocol::mtPROOF_PATH_RESPONSE));
1605 }
1606 }
1607 });
1608}
1609
1610void
1612{
1613 if (!ledgerReplayEnabled_)
1614 {
1615 fee_.update(
1616 Resource::feeMalformedRequest, "proof_path_response disabled");
1617 return;
1618 }
1619
1620 if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
1621 {
1622 fee_.update(Resource::feeInvalidData, "proof_path_response");
1623 }
1624}
1625
1626void
1628{
1629 JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
1630 if (!ledgerReplayEnabled_)
1631 {
1632 fee_.update(
1633 Resource::feeMalformedRequest, "replay_delta_request disabled");
1634 return;
1635 }
1636
1637 fee_.fee = Resource::feeModerateBurdenPeer;
1638 std::weak_ptr<PeerImp> weak = shared_from_this();
1639 app_.getJobQueue().addJob(
1640 jtREPLAY_REQ, "recvReplayDeltaRequest", [weak, m]() {
1641 if (auto peer = weak.lock())
1642 {
1643 auto reply =
1644 peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
1645 if (reply.has_error())
1646 {
1647 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1648 peer->charge(
1649 Resource::feeMalformedRequest,
1650 "replay_delta_request");
1651 else
1652 peer->charge(
1653 Resource::feeRequestNoReply,
1654 "replay_delta_request");
1655 }
1656 else
1657 {
1658 peer->send(std::make_shared<Message>(
1659 reply, protocol::mtREPLAY_DELTA_RESPONSE));
1660 }
1661 }
1662 });
1663}
1664
1665void
1667{
1668 if (!ledgerReplayEnabled_)
1669 {
1670 fee_.update(
1671 Resource::feeMalformedRequest, "replay_delta_response disabled");
1672 return;
1673 }
1674
1675 if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
1676 {
1677 fee_.update(Resource::feeInvalidData, "replay_delta_response");
1678 }
1679}
1680
1681void
1683{
1684 auto badData = [&](std::string const& msg) {
1685 fee_.update(Resource::feeInvalidData, msg);
1686 JLOG(p_journal_.warn()) << "TMLedgerData: " << msg;
1687 };
1688
1689 // Verify ledger hash
1690 if (!stringIsUint256Sized(m->ledgerhash()))
1691 return badData("Invalid ledger hash");
1692
1693 // Verify ledger sequence
1694 {
1695 auto const ledgerSeq{m->ledgerseq()};
1696 if (m->type() == protocol::liTS_CANDIDATE)
1697 {
1698 if (ledgerSeq != 0)
1699 {
1700 return badData(
1701 "Invalid ledger sequence " + std::to_string(ledgerSeq));
1702 }
1703 }
1704 else
1705 {
1706 // Check if within a reasonable range
1707 using namespace std::chrono_literals;
1708 if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1709 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1710 {
1711 return badData(
1712 "Invalid ledger sequence " + std::to_string(ledgerSeq));
1713 }
1714 }
1715 }
1716
1717 // Verify ledger info type
1718 if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
1719 return badData("Invalid ledger info type");
1720
1721 // Verify reply error
1722 if (m->has_error() &&
1723 (m->error() < protocol::reNO_LEDGER ||
1724 m->error() > protocol::reBAD_REQUEST))
1725 {
1726 return badData("Invalid reply error");
1727 }
1728
1729 // Verify ledger nodes.
1730 if (m->nodes_size() <= 0 || m->nodes_size() > Tuning::hardMaxReplyNodes)
1731 {
1732 return badData(
1733 "Invalid Ledger/TXset nodes " + std::to_string(m->nodes_size()));
1734 }
1735
1736 // If there is a request cookie, attempt to relay the message
1737 if (m->has_requestcookie())
1738 {
1739 if (auto peer = overlay_.findPeerByShortID(m->requestcookie()))
1740 {
1741 m->clear_requestcookie();
1742 peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
1743 }
1744 else
1745 {
1746 JLOG(p_journal_.info()) << "Unable to route TX/ledger data reply";
1747 }
1748 return;
1749 }
1750
1751 uint256 const ledgerHash{m->ledgerhash()};
1752
1753 // Otherwise check if received data for a candidate transaction set
1754 if (m->type() == protocol::liTS_CANDIDATE)
1755 {
1756 std::weak_ptr<PeerImp> weak{shared_from_this()};
1757 app_.getJobQueue().addJob(
1758 jtTXN_DATA, "recvPeerData", [weak, ledgerHash, m]() {
1759 if (auto peer = weak.lock())
1760 {
1761 peer->app_.getInboundTransactions().gotData(
1762 ledgerHash, peer, m);
1763 }
1764 });
1765 return;
1766 }
1767
1768 // Consume the message
1769 app_.getInboundLedgers().gotLedgerData(ledgerHash, shared_from_this(), m);
1770}
1771
1772void
1774{
1775 protocol::TMProposeSet& set = *m;
1776
1777 auto const sig = makeSlice(set.signature());
1778
1779 // Preliminary check for the validity of the signature: A DER encoded
1780 // signature can't be longer than 72 bytes.
1781 if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
1782 (publicKeyType(makeSlice(set.nodepubkey())) != KeyType::secp256k1))
1783 {
1784 JLOG(p_journal_.warn()) << "Proposal: malformed";
1785 fee_.update(
1786 Resource::feeInvalidSignature,
1787 " signature can't be longer than 72 bytes");
1788 return;
1789 }
1790
1791 if (!stringIsUint256Sized(set.currenttxhash()) ||
1792 !stringIsUint256Sized(set.previousledger()))
1793 {
1794 JLOG(p_journal_.warn()) << "Proposal: malformed";
1795 fee_.update(Resource::feeMalformedRequest, "bad hashes");
1796 return;
1797 }
1798
1799 // RH TODO: when isTrusted = false we should probably also cache a key
1800 // suppression for 30 seconds to avoid doing a relatively expensive lookup
1801 // every time a spam packet is received
1802 PublicKey const publicKey{makeSlice(set.nodepubkey())};
1803 auto const isTrusted = app_.validators().trusted(publicKey);
1804
1805 // If the operator has specified that untrusted proposals be dropped then
1806 // this happens here I.e. before further wasting CPU verifying the signature
1807 // of an untrusted key
1808 if (!isTrusted)
1809 {
1810 // report untrusted proposal messages
1811 overlay_.reportInboundTraffic(
1812 TrafficCount::category::proposal_untrusted,
1813 Message::messageSize(*m));
1814
1815 if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
1816 return;
1817 }
1818
1819 uint256 const proposeHash{set.currenttxhash()};
1820 uint256 const prevLedger{set.previousledger()};
1821
1822 NetClock::time_point const closeTime{NetClock::duration{set.closetime()}};
1823
1824 uint256 const suppression = proposalUniqueId(
1825 proposeHash,
1826 prevLedger,
1827 set.proposeseq(),
1828 closeTime,
1829 publicKey.slice(),
1830 sig);
1831
1832 if (auto [added, relayed] =
1833 app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
1834 !added)
1835 {
1836 // Count unique messages (Slots has it's own 'HashRouter'), which a peer
1837 // receives within IDLED seconds since the message has been relayed.
1838 if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
1839 overlay_.updateSlotAndSquelch(
1840 suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
1841
1842 // report duplicate proposal messages
1843 overlay_.reportInboundTraffic(
1844 TrafficCount::category::proposal_duplicate,
1845 Message::messageSize(*m));
1846
1847 JLOG(p_journal_.trace()) << "Proposal: duplicate";
1848
1849 return;
1850 }
1851
1852 if (!isTrusted)
1853 {
1854 if (tracking_.load() == Tracking::diverged)
1855 {
1856 JLOG(p_journal_.debug())
1857 << "Proposal: Dropping untrusted (peer divergence)";
1858 return;
1859 }
1860
1861 if (!cluster() && app_.getFeeTrack().isLoadedLocal())
1862 {
1863 JLOG(p_journal_.debug()) << "Proposal: Dropping untrusted (load)";
1864 return;
1865 }
1866 }
1867
1868 JLOG(p_journal_.trace())
1869 << "Proposal: " << (isTrusted ? "trusted" : "untrusted");
1870
1871 auto proposal = RCLCxPeerPos(
1872 publicKey,
1873 sig,
1874 suppression,
1876 prevLedger,
1877 set.proposeseq(),
1878 proposeHash,
1879 closeTime,
1880 app_.timeKeeper().closeTime(),
1881 calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
1882
1883 std::weak_ptr<PeerImp> weak = shared_from_this();
1884 app_.getJobQueue().addJob(
1885 isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
1886 "recvPropose->checkPropose",
1887 [weak, isTrusted, m, proposal]() {
1888 if (auto peer = weak.lock())
1889 peer->checkPropose(isTrusted, m, proposal);
1890 });
1891}
1892
1893void
1895{
1896 JLOG(p_journal_.trace()) << "Status: Change";
1897
1898 if (!m->has_networktime())
1899 m->set_networktime(app_.timeKeeper().now().time_since_epoch().count());
1900
1901 {
1902 std::lock_guard sl(recentLock_);
1903 if (!last_status_.has_newstatus() || m->has_newstatus())
1904 last_status_ = *m;
1905 else
1906 {
1907 // preserve old status
1908 protocol::NodeStatus status = last_status_.newstatus();
1909 last_status_ = *m;
1910 m->set_newstatus(status);
1911 }
1912 }
1913
1914 if (m->newevent() == protocol::neLOST_SYNC)
1915 {
1916 bool outOfSync{false};
1917 {
1918 // Operations on closedLedgerHash_ and previousLedgerHash_ must be
1919 // guarded by recentLock_.
1920 std::lock_guard sl(recentLock_);
1921 if (!closedLedgerHash_.isZero())
1922 {
1923 outOfSync = true;
1924 closedLedgerHash_.zero();
1925 }
1926 previousLedgerHash_.zero();
1927 }
1928 if (outOfSync)
1929 {
1930 JLOG(p_journal_.debug()) << "Status: Out of sync";
1931 }
1932 return;
1933 }
1934
1935 {
1936 uint256 closedLedgerHash{};
1937 bool const peerChangedLedgers{
1938 m->has_ledgerhash() && stringIsUint256Sized(m->ledgerhash())};
1939
1940 {
1941 // Operations on closedLedgerHash_ and previousLedgerHash_ must be
1942 // guarded by recentLock_.
1943 std::lock_guard sl(recentLock_);
1944 if (peerChangedLedgers)
1945 {
1946 closedLedgerHash_ = m->ledgerhash();
1947 closedLedgerHash = closedLedgerHash_;
1948 addLedger(closedLedgerHash, sl);
1949 }
1950 else
1951 {
1952 closedLedgerHash_.zero();
1953 }
1954
1955 if (m->has_ledgerhashprevious() &&
1956 stringIsUint256Sized(m->ledgerhashprevious()))
1957 {
1958 previousLedgerHash_ = m->ledgerhashprevious();
1959 addLedger(previousLedgerHash_, sl);
1960 }
1961 else
1962 {
1963 previousLedgerHash_.zero();
1964 }
1965 }
1966 if (peerChangedLedgers)
1967 {
1968 JLOG(p_journal_.debug()) << "LCL is " << closedLedgerHash;
1969 }
1970 else
1971 {
1972 JLOG(p_journal_.debug()) << "Status: No ledger";
1973 }
1974 }
1975
1976 if (m->has_firstseq() && m->has_lastseq())
1977 {
1978 std::lock_guard sl(recentLock_);
1979
1980 minLedger_ = m->firstseq();
1981 maxLedger_ = m->lastseq();
1982
1983 if ((maxLedger_ < minLedger_) || (minLedger_ == 0) || (maxLedger_ == 0))
1984 minLedger_ = maxLedger_ = 0;
1985 }
1986
1987 if (m->has_ledgerseq() &&
1988 app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
1989 {
1990 checkTracking(
1991 m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex());
1992 }
1993
1994 app_.getOPs().pubPeerStatus([=, this]() -> Json::Value {
1996
1997 if (m->has_newstatus())
1998 {
1999 switch (m->newstatus())
2000 {
2001 case protocol::nsCONNECTING:
2002 j[jss::status] = "CONNECTING";
2003 break;
2004 case protocol::nsCONNECTED:
2005 j[jss::status] = "CONNECTED";
2006 break;
2007 case protocol::nsMONITORING:
2008 j[jss::status] = "MONITORING";
2009 break;
2010 case protocol::nsVALIDATING:
2011 j[jss::status] = "VALIDATING";
2012 break;
2013 case protocol::nsSHUTTING:
2014 j[jss::status] = "SHUTTING";
2015 break;
2016 }
2017 }
2018
2019 if (m->has_newevent())
2020 {
2021 switch (m->newevent())
2022 {
2023 case protocol::neCLOSING_LEDGER:
2024 j[jss::action] = "CLOSING_LEDGER";
2025 break;
2026 case protocol::neACCEPTED_LEDGER:
2027 j[jss::action] = "ACCEPTED_LEDGER";
2028 break;
2029 case protocol::neSWITCHED_LEDGER:
2030 j[jss::action] = "SWITCHED_LEDGER";
2031 break;
2032 case protocol::neLOST_SYNC:
2033 j[jss::action] = "LOST_SYNC";
2034 break;
2035 }
2036 }
2037
2038 if (m->has_ledgerseq())
2039 {
2040 j[jss::ledger_index] = m->ledgerseq();
2041 }
2042
2043 if (m->has_ledgerhash())
2044 {
2045 uint256 closedLedgerHash{};
2046 {
2047 std::lock_guard sl(recentLock_);
2048 closedLedgerHash = closedLedgerHash_;
2049 }
2050 j[jss::ledger_hash] = to_string(closedLedgerHash);
2051 }
2052
2053 if (m->has_networktime())
2054 {
2055 j[jss::date] = Json::UInt(m->networktime());
2056 }
2057
2058 if (m->has_firstseq() && m->has_lastseq())
2059 {
2060 j[jss::ledger_index_min] = Json::UInt(m->firstseq());
2061 j[jss::ledger_index_max] = Json::UInt(m->lastseq());
2062 }
2063
2064 return j;
2065 });
2066}
2067
2068void
2069PeerImp::checkTracking(std::uint32_t validationSeq)
2070{
2071 std::uint32_t serverSeq;
2072 {
2073 // Extract the sequence number of the highest
2074 // ledger this peer has
2075 std::lock_guard sl(recentLock_);
2076
2077 serverSeq = maxLedger_;
2078 }
2079 if (serverSeq != 0)
2080 {
2081 // Compare the peer's ledger sequence to the
2082 // sequence of a recently-validated ledger
2083 checkTracking(serverSeq, validationSeq);
2084 }
2085}
2086
2087void
2088PeerImp::checkTracking(std::uint32_t seq1, std::uint32_t seq2)
2089{
2090 int diff = std::max(seq1, seq2) - std::min(seq1, seq2);
2091
2092 if (diff < Tuning::convergedLedgerLimit)
2093 {
2094 // The peer's ledger sequence is close to the validation's
2095 tracking_ = Tracking::converged;
2096 }
2097
2098 if ((diff > Tuning::divergedLedgerLimit) &&
2099 (tracking_.load() != Tracking::diverged))
2100 {
2101 // The peer's ledger sequence is way off the validation's
2102 std::lock_guard sl(recentLock_);
2103
2104 tracking_ = Tracking::diverged;
2105 trackingTime_ = clock_type::now();
2106 }
2107}
2108
2109void
2111{
2112 if (!stringIsUint256Sized(m->hash()))
2113 {
2114 fee_.update(Resource::feeMalformedRequest, "bad hash");
2115 return;
2116 }
2117
2118 uint256 const hash{m->hash()};
2119
2120 if (m->status() == protocol::tsHAVE)
2121 {
2122 std::lock_guard sl(recentLock_);
2123
2124 if (std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
2125 recentTxSets_.end())
2126 {
2127 fee_.update(Resource::feeUselessData, "duplicate (tsHAVE)");
2128 return;
2129 }
2130
2131 recentTxSets_.push_back(hash);
2132 }
2133}
2134
2135void
2136PeerImp::onValidatorListMessage(
2137 std::string const& messageType,
2138 std::string const& manifest,
2139 std::uint32_t version,
2140 std::vector<ValidatorBlobInfo> const& blobs)
2141{
2142 // If there are no blobs, the message is malformed (possibly because of
2143 // ValidatorList class rules), so charge accordingly and skip processing.
2144 if (blobs.empty())
2145 {
2146 JLOG(p_journal_.warn()) << "Ignored malformed " << messageType
2147 << " from peer " << remote_address_;
2148 // This shouldn't ever happen with a well-behaved peer
2149 fee_.update(Resource::feeHeavyBurdenPeer, "no blobs");
2150 return;
2151 }
2152
2153 auto const hash = sha512Half(manifest, blobs, version);
2154
2155 JLOG(p_journal_.debug())
2156 << "Received " << messageType << " from " << remote_address_.to_string()
2157 << " (" << id_ << ")";
2158
2159 if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
2160 {
2161 JLOG(p_journal_.debug())
2162 << messageType << ": received duplicate " << messageType;
2163 // Charging this fee here won't hurt the peer in the normal
2164 // course of operation (ie. refresh every 5 minutes), but
2165 // will add up if the peer is misbehaving.
2166 fee_.update(Resource::feeUselessData, "duplicate");
2167 return;
2168 }
2169
2170 auto const applyResult = app_.validators().applyListsAndBroadcast(
2171 manifest,
2172 version,
2173 blobs,
2174 remote_address_.to_string(),
2175 hash,
2176 app_.overlay(),
2177 app_.getHashRouter(),
2178 app_.getOPs());
2179
2180 JLOG(p_journal_.debug())
2181 << "Processed " << messageType << " version " << version << " from "
2182 << (applyResult.publisherKey ? strHex(*applyResult.publisherKey)
2183 : "unknown or invalid publisher")
2184 << " from " << remote_address_.to_string() << " (" << id_
2185 << ") with best result " << to_string(applyResult.bestDisposition());
2186
2187 // Act based on the best result
2188 switch (applyResult.bestDisposition())
2189 {
2190 // New list
2191 case ListDisposition::accepted:
2192 // Newest list is expired, and that needs to be broadcast, too
2193 case ListDisposition::expired:
2194 // Future list
2195 case ListDisposition::pending: {
2196 std::lock_guard<std::mutex> sl(recentLock_);
2197
2198 XRPL_ASSERT(
2199 applyResult.publisherKey,
2200 "ripple::PeerImp::onValidatorListMessage : publisher key is "
2201 "set");
2202 auto const& pubKey = *applyResult.publisherKey;
2203#ifndef NDEBUG
2204 if (auto const iter = publisherListSequences_.find(pubKey);
2205 iter != publisherListSequences_.end())
2206 {
2207 XRPL_ASSERT(
2208 iter->second < applyResult.sequence,
2209 "ripple::PeerImp::onValidatorListMessage : lower sequence");
2210 }
2211#endif
2212 publisherListSequences_[pubKey] = applyResult.sequence;
2213 }
2214 break;
2215 case ListDisposition::same_sequence:
2216 case ListDisposition::known_sequence:
2217#ifndef NDEBUG
2218 {
2219 std::lock_guard<std::mutex> sl(recentLock_);
2220 XRPL_ASSERT(
2221 applyResult.sequence && applyResult.publisherKey,
2222 "ripple::PeerImp::onValidatorListMessage : nonzero sequence "
2223 "and set publisher key");
2224 XRPL_ASSERT(
2225 publisherListSequences_[*applyResult.publisherKey] <=
2226 applyResult.sequence,
2227 "ripple::PeerImp::onValidatorListMessage : maximum sequence");
2228 }
2229#endif // !NDEBUG
2230
2231 break;
2232 case ListDisposition::stale:
2233 case ListDisposition::untrusted:
2234 case ListDisposition::invalid:
2235 case ListDisposition::unsupported_version:
2236 break;
2237 default:
2238 UNREACHABLE(
2239 "ripple::PeerImp::onValidatorListMessage : invalid best list "
2240 "disposition");
2241 }
2242
2243 // Charge based on the worst result
2244 switch (applyResult.worstDisposition())
2245 {
2246 case ListDisposition::accepted:
2247 case ListDisposition::expired:
2248 case ListDisposition::pending:
2249 // No charges for good data
2250 break;
2251 case ListDisposition::same_sequence:
2252 case ListDisposition::known_sequence:
2253 // Charging this fee here won't hurt the peer in the normal
2254 // course of operation (ie. refresh every 5 minutes), but
2255 // will add up if the peer is misbehaving.
2256 fee_.update(
2257 Resource::feeUselessData,
2258 " duplicate (same_sequence or known_sequence)");
2259 break;
2260 case ListDisposition::stale:
2261 // There are very few good reasons for a peer to send an
2262 // old list, particularly more than once.
2263 fee_.update(Resource::feeInvalidData, "expired");
2264 break;
2265 case ListDisposition::untrusted:
2266 // Charging this fee here won't hurt the peer in the normal
2267 // course of operation (ie. refresh every 5 minutes), but
2268 // will add up if the peer is misbehaving.
2269 fee_.update(Resource::feeUselessData, "untrusted");
2270 break;
2271 case ListDisposition::invalid:
2272 // This shouldn't ever happen with a well-behaved peer
2273 fee_.update(
2274 Resource::feeInvalidSignature, "invalid list disposition");
2275 break;
2276 case ListDisposition::unsupported_version:
2277 // During a version transition, this may be legitimate.
2278 // If it happens frequently, that's probably bad.
2279 fee_.update(Resource::feeInvalidData, "version");
2280 break;
2281 default:
2282 UNREACHABLE(
2283 "ripple::PeerImp::onValidatorListMessage : invalid worst list "
2284 "disposition");
2285 }
2286
2287 // Log based on all the results.
2288 for (auto const& [disp, count] : applyResult.dispositions)
2289 {
2290 switch (disp)
2291 {
2292 // New list
2293 case ListDisposition::accepted:
2294 JLOG(p_journal_.debug())
2295 << "Applied " << count << " new " << messageType
2296 << "(s) from peer " << remote_address_;
2297 break;
2298 // Newest list is expired, and that needs to be broadcast, too
2299 case ListDisposition::expired:
2300 JLOG(p_journal_.debug())
2301 << "Applied " << count << " expired " << messageType
2302 << "(s) from peer " << remote_address_;
2303 break;
2304 // Future list
2305 case ListDisposition::pending:
2306 JLOG(p_journal_.debug())
2307 << "Processed " << count << " future " << messageType
2308 << "(s) from peer " << remote_address_;
2309 break;
2310 case ListDisposition::same_sequence:
2311 JLOG(p_journal_.warn())
2312 << "Ignored " << count << " " << messageType
2313 << "(s) with current sequence from peer "
2314 << remote_address_;
2315 break;
2316 case ListDisposition::known_sequence:
2317 JLOG(p_journal_.warn())
2318 << "Ignored " << count << " " << messageType
2319 << "(s) with future sequence from peer " << remote_address_;
2320 break;
2321 case ListDisposition::stale:
2322 JLOG(p_journal_.warn())
2323 << "Ignored " << count << "stale " << messageType
2324 << "(s) from peer " << remote_address_;
2325 break;
2326 case ListDisposition::untrusted:
2327 JLOG(p_journal_.warn())
2328 << "Ignored " << count << " untrusted " << messageType
2329 << "(s) from peer " << remote_address_;
2330 break;
2331 case ListDisposition::unsupported_version:
2332 JLOG(p_journal_.warn())
2333 << "Ignored " << count << "unsupported version "
2334 << messageType << "(s) from peer " << remote_address_;
2335 break;
2336 case ListDisposition::invalid:
2337 JLOG(p_journal_.warn())
2338 << "Ignored " << count << "invalid " << messageType
2339 << "(s) from peer " << remote_address_;
2340 break;
2341 default:
2342 UNREACHABLE(
2343 "ripple::PeerImp::onValidatorListMessage : invalid list "
2344 "disposition");
2345 }
2346 }
2347}
2348
2349void
2351{
2352 try
2353 {
2354 if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
2355 {
2356 JLOG(p_journal_.debug())
2357 << "ValidatorList: received validator list from peer using "
2358 << "protocol version " << to_string(protocol_)
2359 << " which shouldn't support this feature.";
2360 fee_.update(Resource::feeUselessData, "unsupported peer");
2361 return;
2362 }
2363 onValidatorListMessage(
2364 "ValidatorList",
2365 m->manifest(),
2366 m->version(),
2367 ValidatorList::parseBlobs(*m));
2368 }
2369 catch (std::exception const& e)
2370 {
2371 JLOG(p_journal_.warn()) << "ValidatorList: Exception, " << e.what()
2372 << " from peer " << remote_address_;
2373 using namespace std::string_literals;
2374 fee_.update(Resource::feeInvalidData, e.what());
2375 }
2376}
2377
2378void
2379PeerImp::onMessage(
2381{
2382 try
2383 {
2384 if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
2385 {
2386 JLOG(p_journal_.debug())
2387 << "ValidatorListCollection: received validator list from peer "
2388 << "using protocol version " << to_string(protocol_)
2389 << " which shouldn't support this feature.";
2390 fee_.update(Resource::feeUselessData, "unsupported peer");
2391 return;
2392 }
2393 else if (m->version() < 2)
2394 {
2395 JLOG(p_journal_.debug())
2396 << "ValidatorListCollection: received invalid validator list "
2397 "version "
2398 << m->version() << " from peer using protocol version "
2399 << to_string(protocol_);
2400 fee_.update(Resource::feeInvalidData, "wrong version");
2401 return;
2402 }
2403 onValidatorListMessage(
2404 "ValidatorListCollection",
2405 m->manifest(),
2406 m->version(),
2407 ValidatorList::parseBlobs(*m));
2408 }
2409 catch (std::exception const& e)
2410 {
2411 JLOG(p_journal_.warn()) << "ValidatorListCollection: Exception, "
2412 << e.what() << " from peer " << remote_address_;
2413 using namespace std::string_literals;
2414 fee_.update(Resource::feeInvalidData, e.what());
2415 }
2416}
2417
2418void
2420{
2421 if (m->validation().size() < 50)
2422 {
2423 JLOG(p_journal_.warn()) << "Validation: Too small";
2424 fee_.update(Resource::feeMalformedRequest, "too small");
2425 return;
2426 }
2427
2428 try
2429 {
2430 auto const closeTime = app_.timeKeeper().closeTime();
2431
2433 {
2434 SerialIter sit(makeSlice(m->validation()));
2436 std::ref(sit),
2437 [this](PublicKey const& pk) {
2438 return calcNodeID(
2439 app_.validatorManifests().getMasterKey(pk));
2440 },
2441 false);
2442 val->setSeen(closeTime);
2443 }
2444
2445 if (!isCurrent(
2446 app_.getValidations().parms(),
2447 app_.timeKeeper().closeTime(),
2448 val->getSignTime(),
2449 val->getSeenTime()))
2450 {
2451 JLOG(p_journal_.trace()) << "Validation: Not current";
2452 fee_.update(Resource::feeUselessData, "not current");
2453 return;
2454 }
2455
2456 // RH TODO: when isTrusted = false we should probably also cache a key
2457 // suppression for 30 seconds to avoid doing a relatively expensive
2458 // lookup every time a spam packet is received
2459 auto const isTrusted =
2460 app_.validators().trusted(val->getSignerPublic());
2461
2462 // If the operator has specified that untrusted validations be
2463 // dropped then this happens here I.e. before further wasting CPU
2464 // verifying the signature of an untrusted key
2465 if (!isTrusted)
2466 {
2467 // increase untrusted validations received
2468 overlay_.reportInboundTraffic(
2469 TrafficCount::category::validation_untrusted,
2470 Message::messageSize(*m));
2471
2472 if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
2473 return;
2474 }
2475
2476 auto key = sha512Half(makeSlice(m->validation()));
2477
2478 auto [added, relayed] =
2479 app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
2480
2481 if (!added)
2482 {
2483 // Count unique messages (Slots has it's own 'HashRouter'), which a
2484 // peer receives within IDLED seconds since the message has been
2485 // relayed.
2486 if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
2487 overlay_.updateSlotAndSquelch(
2488 key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
2489
2490 // increase duplicate validations received
2491 overlay_.reportInboundTraffic(
2492 TrafficCount::category::validation_duplicate,
2493 Message::messageSize(*m));
2494
2495 JLOG(p_journal_.trace()) << "Validation: duplicate";
2496 return;
2497 }
2498
2499 if (!isTrusted && (tracking_.load() == Tracking::diverged))
2500 {
2501 JLOG(p_journal_.debug())
2502 << "Dropping untrusted validation from diverged peer";
2503 }
2504 else if (isTrusted || !app_.getFeeTrack().isLoadedLocal())
2505 {
2506 std::string const name = [isTrusted, val]() {
2507 std::string ret =
2508 isTrusted ? "Trusted validation" : "Untrusted validation";
2509
2510#ifdef DEBUG
2511 ret += " " +
2512 std::to_string(val->getFieldU32(sfLedgerSequence)) + ": " +
2513 to_string(val->getNodeID());
2514#endif
2515
2516 return ret;
2517 }();
2518
2519 std::weak_ptr<PeerImp> weak = shared_from_this();
2520 app_.getJobQueue().addJob(
2521 isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
2522 name,
2523 [weak, val, m, key]() {
2524 if (auto peer = weak.lock())
2525 peer->checkValidation(val, key, m);
2526 });
2527 }
2528 else
2529 {
2530 JLOG(p_journal_.debug())
2531 << "Dropping untrusted validation for load";
2532 }
2533 }
2534 catch (std::exception const& e)
2535 {
2536 JLOG(p_journal_.warn())
2537 << "Exception processing validation: " << e.what();
2538 using namespace std::string_literals;
2539 fee_.update(Resource::feeMalformedRequest, e.what());
2540 }
2541}
2542
2543void
2545{
2546 protocol::TMGetObjectByHash& packet = *m;
2547
2548 JLOG(p_journal_.trace()) << "received TMGetObjectByHash " << packet.type()
2549 << " " << packet.objects_size();
2550
2551 if (packet.query())
2552 {
2553 // this is a query
2554 if (send_queue_.size() >= Tuning::dropSendQueue)
2555 {
2556 JLOG(p_journal_.debug()) << "GetObject: Large send queue";
2557 return;
2558 }
2559
2560 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2561 {
2562 doFetchPack(m);
2563 return;
2564 }
2565
2566 if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
2567 {
2568 if (!txReduceRelayEnabled())
2569 {
2570 JLOG(p_journal_.error())
2571 << "TMGetObjectByHash: tx reduce-relay is disabled";
2572 fee_.update(Resource::feeMalformedRequest, "disabled");
2573 return;
2574 }
2575
2576 std::weak_ptr<PeerImp> weak = shared_from_this();
2577 app_.getJobQueue().addJob(
2578 jtREQUESTED_TXN, "doTransactions", [weak, m]() {
2579 if (auto peer = weak.lock())
2580 peer->doTransactions(m);
2581 });
2582 return;
2583 }
2584
2585 protocol::TMGetObjectByHash reply;
2586
2587 reply.set_query(false);
2588
2589 if (packet.has_seq())
2590 reply.set_seq(packet.seq());
2591
2592 reply.set_type(packet.type());
2593
2594 if (packet.has_ledgerhash())
2595 {
2596 if (!stringIsUint256Sized(packet.ledgerhash()))
2597 {
2598 fee_.update(Resource::feeMalformedRequest, "ledger hash");
2599 return;
2600 }
2601
2602 reply.set_ledgerhash(packet.ledgerhash());
2603 }
2604
2605 fee_.update(
2606 Resource::feeModerateBurdenPeer,
2607 " received a get object by hash request");
2608
2609 // This is a very minimal implementation
2610 for (int i = 0; i < packet.objects_size(); ++i)
2611 {
2612 auto const& obj = packet.objects(i);
2613 if (obj.has_hash() && stringIsUint256Sized(obj.hash()))
2614 {
2615 uint256 const hash{obj.hash()};
2616 // VFALCO TODO Move this someplace more sensible so we dont
2617 // need to inject the NodeStore interfaces.
2618 std::uint32_t seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
2619 auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
2620 if (nodeObject)
2621 {
2622 protocol::TMIndexedObject& newObj = *reply.add_objects();
2623 newObj.set_hash(hash.begin(), hash.size());
2624 newObj.set_data(
2625 &nodeObject->getData().front(),
2626 nodeObject->getData().size());
2627
2628 if (obj.has_nodeid())
2629 newObj.set_index(obj.nodeid());
2630 if (obj.has_ledgerseq())
2631 newObj.set_ledgerseq(obj.ledgerseq());
2632
2633 // VFALCO NOTE "seq" in the message is obsolete
2634 }
2635 }
2636 }
2637
2638 JLOG(p_journal_.trace()) << "GetObj: " << reply.objects_size() << " of "
2639 << packet.objects_size();
2640 send(std::make_shared<Message>(reply, protocol::mtGET_OBJECTS));
2641 }
2642 else
2643 {
2644 // this is a reply
2645 std::uint32_t pLSeq = 0;
2646 bool pLDo = true;
2647 bool progress = false;
2648
2649 for (int i = 0; i < packet.objects_size(); ++i)
2650 {
2651 protocol::TMIndexedObject const& obj = packet.objects(i);
2652
2653 if (obj.has_hash() && stringIsUint256Sized(obj.hash()))
2654 {
2655 if (obj.has_ledgerseq())
2656 {
2657 if (obj.ledgerseq() != pLSeq)
2658 {
2659 if (pLDo && (pLSeq != 0))
2660 {
2661 JLOG(p_journal_.debug())
2662 << "GetObj: Full fetch pack for " << pLSeq;
2663 }
2664 pLSeq = obj.ledgerseq();
2665 pLDo = !app_.getLedgerMaster().haveLedger(pLSeq);
2666
2667 if (!pLDo)
2668 {
2669 JLOG(p_journal_.debug())
2670 << "GetObj: Late fetch pack for " << pLSeq;
2671 }
2672 else
2673 progress = true;
2674 }
2675 }
2676
2677 if (pLDo)
2678 {
2679 uint256 const hash{obj.hash()};
2680
2681 app_.getLedgerMaster().addFetchPack(
2682 hash,
2684 obj.data().begin(), obj.data().end()));
2685 }
2686 }
2687 }
2688
2689 if (pLDo && (pLSeq != 0))
2690 {
2691 JLOG(p_journal_.debug())
2692 << "GetObj: Partial fetch pack for " << pLSeq;
2693 }
2694 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2695 app_.getLedgerMaster().gotFetchPack(progress, pLSeq);
2696 }
2697}
2698
2699void
2701{
2702 if (!txReduceRelayEnabled())
2703 {
2704 JLOG(p_journal_.error())
2705 << "TMHaveTransactions: tx reduce-relay is disabled";
2706 fee_.update(Resource::feeMalformedRequest, "disabled");
2707 return;
2708 }
2709
2710 std::weak_ptr<PeerImp> weak = shared_from_this();
2711 app_.getJobQueue().addJob(
2712 jtMISSING_TXN, "handleHaveTransactions", [weak, m]() {
2713 if (auto peer = weak.lock())
2714 peer->handleHaveTransactions(m);
2715 });
2716}
2717
2718void
2719PeerImp::handleHaveTransactions(
2721{
2722 protocol::TMGetObjectByHash tmBH;
2723 tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
2724 tmBH.set_query(true);
2725
2726 JLOG(p_journal_.trace())
2727 << "received TMHaveTransactions " << m->hashes_size();
2728
2729 for (std::uint32_t i = 0; i < m->hashes_size(); i++)
2730 {
2731 if (!stringIsUint256Sized(m->hashes(i)))
2732 {
2733 JLOG(p_journal_.error())
2734 << "TMHaveTransactions with invalid hash size";
2735 fee_.update(Resource::feeMalformedRequest, "hash size");
2736 return;
2737 }
2738
2739 uint256 hash(m->hashes(i));
2740
2741 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2742
2743 JLOG(p_journal_.trace()) << "checking transaction " << (bool)txn;
2744
2745 if (!txn)
2746 {
2747 JLOG(p_journal_.debug()) << "adding transaction to request";
2748
2749 auto obj = tmBH.add_objects();
2750 obj->set_hash(hash.data(), hash.size());
2751 }
2752 else
2753 {
2754 // Erase only if a peer has seen this tx. If the peer has not
2755 // seen this tx then the tx could not has been queued for this
2756 // peer.
2757 removeTxQueue(hash);
2758 }
2759 }
2760
2761 JLOG(p_journal_.trace())
2762 << "transaction request object is " << tmBH.objects_size();
2763
2764 if (tmBH.objects_size() > 0)
2765 send(std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS));
2766}
2767
2768void
2770{
2771 if (!txReduceRelayEnabled())
2772 {
2773 JLOG(p_journal_.error())
2774 << "TMTransactions: tx reduce-relay is disabled";
2775 fee_.update(Resource::feeMalformedRequest, "disabled");
2776 return;
2777 }
2778
2779 JLOG(p_journal_.trace())
2780 << "received TMTransactions " << m->transactions_size();
2781
2782 overlay_.addTxMetrics(m->transactions_size());
2783
2784 for (std::uint32_t i = 0; i < m->transactions_size(); ++i)
2785 handleTransaction(
2787 m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
2788 false,
2789 true);
2790}
2791
2792void
2793PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
2794{
2795 using on_message_fn =
2797 if (!strand_.running_in_this_thread())
2798 return post(
2799 strand_,
2800 std::bind(
2801 (on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
2802
2803 if (!m->has_validatorpubkey())
2804 {
2805 fee_.update(Resource::feeInvalidData, "squelch no pubkey");
2806 return;
2807 }
2808 auto validator = m->validatorpubkey();
2809 auto const slice{makeSlice(validator)};
2810 if (!publicKeyType(slice))
2811 {
2812 fee_.update(Resource::feeInvalidData, "squelch bad pubkey");
2813 return;
2814 }
2815 PublicKey key(slice);
2816
2817 // Ignore the squelch for validator's own messages.
2818 if (key == app_.getValidationPublicKey())
2819 {
2820 JLOG(p_journal_.debug())
2821 << "onMessage: TMSquelch discarding validator's squelch " << slice;
2822 return;
2823 }
2824
2825 std::uint32_t duration =
2826 m->has_squelchduration() ? m->squelchduration() : 0;
2827 if (!m->squelch())
2828 squelch_.removeSquelch(key);
2829 else if (!squelch_.addSquelch(key, std::chrono::seconds{duration}))
2830 fee_.update(Resource::feeInvalidData, "squelch duration");
2831
2832 JLOG(p_journal_.debug())
2833 << "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
2834}
2835
2836//--------------------------------------------------------------------------
2837
2838void
2839PeerImp::addLedger(
2840 uint256 const& hash,
2841 std::lock_guard<std::mutex> const& lockedRecentLock)
2842{
2843 // lockedRecentLock is passed as a reminder that recentLock_ must be
2844 // locked by the caller.
2845 (void)lockedRecentLock;
2846
2847 if (std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
2848 recentLedgers_.end())
2849 return;
2850
2851 recentLedgers_.push_back(hash);
2852}
2853
2854void
2855PeerImp::doFetchPack(std::shared_ptr<protocol::TMGetObjectByHash> const& packet)
2856{
2857 // VFALCO TODO Invert this dependency using an observer and shared state
2858 // object. Don't queue fetch pack jobs if we're under load or we already
2859 // have some queued.
2860 if (app_.getFeeTrack().isLoadedLocal() ||
2861 (app_.getLedgerMaster().getValidatedLedgerAge() > 40s) ||
2862 (app_.getJobQueue().getJobCount(jtPACK) > 10))
2863 {
2864 JLOG(p_journal_.info()) << "Too busy to make fetch pack";
2865 return;
2866 }
2867
2868 if (!stringIsUint256Sized(packet->ledgerhash()))
2869 {
2870 JLOG(p_journal_.warn()) << "FetchPack hash size malformed";
2871 fee_.update(Resource::feeMalformedRequest, "hash size");
2872 return;
2873 }
2874
2875 fee_.fee = Resource::feeHeavyBurdenPeer;
2876
2877 uint256 const hash{packet->ledgerhash()};
2878
2879 std::weak_ptr<PeerImp> weak = shared_from_this();
2880 auto elapsed = UptimeClock::now();
2881 auto const pap = &app_;
2882 app_.getJobQueue().addJob(
2883 jtPACK, "MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
2884 pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
2885 });
2886}
2887
2888void
2889PeerImp::doTransactions(
2891{
2892 protocol::TMTransactions reply;
2893
2894 JLOG(p_journal_.trace()) << "received TMGetObjectByHash requesting tx "
2895 << packet->objects_size();
2896
2897 if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE)
2898 {
2899 JLOG(p_journal_.error()) << "doTransactions, invalid number of hashes";
2900 fee_.update(Resource::feeMalformedRequest, "too big");
2901 return;
2902 }
2903
2904 for (std::uint32_t i = 0; i < packet->objects_size(); ++i)
2905 {
2906 auto const& obj = packet->objects(i);
2907
2908 if (!stringIsUint256Sized(obj.hash()))
2909 {
2910 fee_.update(Resource::feeMalformedRequest, "hash size");
2911 return;
2912 }
2913
2914 uint256 hash(obj.hash());
2915
2916 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2917
2918 if (!txn)
2919 {
2920 JLOG(p_journal_.error()) << "doTransactions, transaction not found "
2921 << Slice(hash.data(), hash.size());
2922 fee_.update(Resource::feeMalformedRequest, "tx not found");
2923 return;
2924 }
2925
2926 Serializer s;
2927 auto tx = reply.add_transactions();
2928 auto sttx = txn->getSTransaction();
2929 sttx->add(s);
2930 tx->set_rawtransaction(s.data(), s.size());
2931 tx->set_status(
2932 txn->getStatus() == INCLUDED ? protocol::tsCURRENT
2933 : protocol::tsNEW);
2934 tx->set_receivetimestamp(
2935 app_.timeKeeper().now().time_since_epoch().count());
2936 tx->set_deferred(txn->getSubmitResult().queued);
2937 }
2938
2939 if (reply.transactions_size() > 0)
2940 send(std::make_shared<Message>(reply, protocol::mtTRANSACTIONS));
2941}
2942
2943void
2944PeerImp::checkTransaction(
2945 HashRouterFlags flags,
2946 bool checkSignature,
2947 std::shared_ptr<STTx const> const& stx,
2948 bool batch)
2949{
2950 // VFALCO TODO Rewrite to not use exceptions
2951 try
2952 {
2953 // charge strongly for relaying batch txns
2954 // LCOV_EXCL_START
2955 if (stx->isFlag(tfInnerBatchTxn) &&
2956 getCurrentTransactionRules()->enabled(featureBatch))
2957 {
2958 JLOG(p_journal_.warn()) << "Ignoring Network relayed Tx containing "
2959 "tfInnerBatchTxn (checkSignature).";
2960 charge(Resource::feeModerateBurdenPeer, "inner batch txn");
2961 return;
2962 }
2963 // LCOV_EXCL_STOP
2964
2965 // Expired?
2966 if (stx->isFieldPresent(sfLastLedgerSequence) &&
2967 (stx->getFieldU32(sfLastLedgerSequence) <
2968 app_.getLedgerMaster().getValidLedgerIndex()))
2969 {
2970 JLOG(p_journal_.info())
2971 << "Marking transaction " << stx->getTransactionID()
2972 << "as BAD because it's expired";
2973 app_.getHashRouter().setFlags(
2974 stx->getTransactionID(), HashRouterFlags::BAD);
2975 charge(Resource::feeUselessData, "expired tx");
2976 return;
2977 }
2978
2979 if (isPseudoTx(*stx))
2980 {
2981 // Don't do anything with pseudo transactions except put them in the
2982 // TransactionMaster cache
2983 std::string reason;
2984 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2985 XRPL_ASSERT(
2986 tx->getStatus() == NEW,
2987 "ripple::PeerImp::checkTransaction Transaction created "
2988 "correctly");
2989 if (tx->getStatus() == NEW)
2990 {
2991 JLOG(p_journal_.debug())
2992 << "Processing " << (batch ? "batch" : "unsolicited")
2993 << " pseudo-transaction tx " << tx->getID();
2994
2995 app_.getMasterTransaction().canonicalize(&tx);
2996 // Tell the overlay about it, but don't relay it.
2997 auto const toSkip =
2998 app_.getHashRouter().shouldRelay(tx->getID());
2999 if (toSkip)
3000 {
3001 JLOG(p_journal_.debug())
3002 << "Passing skipped pseudo pseudo-transaction tx "
3003 << tx->getID();
3004 app_.overlay().relay(tx->getID(), {}, *toSkip);
3005 }
3006 if (!batch)
3007 {
3008 JLOG(p_journal_.debug())
3009 << "Charging for pseudo-transaction tx " << tx->getID();
3010 charge(Resource::feeUselessData, "pseudo tx");
3011 }
3012
3013 return;
3014 }
3015 }
3016
3017 if (checkSignature)
3018 {
3019 // Check the signature before handing off to the job queue.
3020 if (auto [valid, validReason] = checkValidity(
3021 app_.getHashRouter(),
3022 *stx,
3023 app_.getLedgerMaster().getValidatedRules(),
3024 app_.config());
3025 valid != Validity::Valid)
3026 {
3027 if (!validReason.empty())
3028 {
3029 JLOG(p_journal_.debug())
3030 << "Exception checking transaction: " << validReason;
3031 }
3032
3033 // Probably not necessary to set HashRouterFlags::BAD, but
3034 // doesn't hurt.
3035 app_.getHashRouter().setFlags(
3036 stx->getTransactionID(), HashRouterFlags::BAD);
3037 charge(
3038 Resource::feeInvalidSignature,
3039 "check transaction signature failure");
3040 return;
3041 }
3042 }
3043 else
3044 {
3046 app_.getHashRouter(), stx->getTransactionID(), Validity::Valid);
3047 }
3048
3049 std::string reason;
3050 auto tx = std::make_shared<Transaction>(stx, reason, app_);
3051
3052 if (tx->getStatus() == INVALID)
3053 {
3054 if (!reason.empty())
3055 {
3056 JLOG(p_journal_.debug())
3057 << "Exception checking transaction: " << reason;
3058 }
3059 app_.getHashRouter().setFlags(
3060 stx->getTransactionID(), HashRouterFlags::BAD);
3061 charge(Resource::feeInvalidSignature, "tx (impossible)");
3062 return;
3063 }
3064
3065 bool const trusted = any(flags & HashRouterFlags::TRUSTED);
3066 app_.getOPs().processTransaction(
3067 tx, trusted, false, NetworkOPs::FailHard::no);
3068 }
3069 catch (std::exception const& ex)
3070 {
3071 JLOG(p_journal_.warn())
3072 << "Exception in " << __func__ << ": " << ex.what();
3073 app_.getHashRouter().setFlags(
3074 stx->getTransactionID(), HashRouterFlags::BAD);
3075 using namespace std::string_literals;
3076 charge(Resource::feeInvalidData, "tx "s + ex.what());
3077 }
3078}
3079
3080// Called from our JobQueue
3081void
3082PeerImp::checkPropose(
3083 bool isTrusted,
3085 RCLCxPeerPos peerPos)
3086{
3087 JLOG(p_journal_.trace())
3088 << "Checking " << (isTrusted ? "trusted" : "UNTRUSTED") << " proposal";
3089
3090 XRPL_ASSERT(packet, "ripple::PeerImp::checkPropose : non-null packet");
3091
3092 if (!cluster() && !peerPos.checkSign())
3093 {
3094 std::string desc{"Proposal fails sig check"};
3095 JLOG(p_journal_.warn()) << desc;
3096 charge(Resource::feeInvalidSignature, desc);
3097 return;
3098 }
3099
3100 bool relay;
3101
3102 if (isTrusted)
3103 relay = app_.getOPs().processTrustedProposal(peerPos);
3104 else
3105 relay = app_.config().RELAY_UNTRUSTED_PROPOSALS == 1 || cluster();
3106
3107 if (relay)
3108 {
3109 // haveMessage contains peers, which are suppressed; i.e. the peers
3110 // are the source of the message, consequently the message should
3111 // not be relayed to these peers. But the message must be counted
3112 // as part of the squelch logic.
3113 auto haveMessage = app_.overlay().relay(
3114 *packet, peerPos.suppressionID(), peerPos.publicKey());
3115 if (!haveMessage.empty())
3116 overlay_.updateSlotAndSquelch(
3117 peerPos.suppressionID(),
3118 peerPos.publicKey(),
3119 std::move(haveMessage),
3120 protocol::mtPROPOSE_LEDGER);
3121 }
3122}
3123
3124void
3125PeerImp::checkValidation(
3127 uint256 const& key,
3129{
3130 if (!val->isValid())
3131 {
3132 std::string desc{"Validation forwarded by peer is invalid"};
3133 JLOG(p_journal_.debug()) << desc;
3134 charge(Resource::feeInvalidSignature, desc);
3135 return;
3136 }
3137
3138 // FIXME it should be safe to remove this try/catch. Investigate codepaths.
3139 try
3140 {
3141 if (app_.getOPs().recvValidation(val, std::to_string(id())) ||
3142 cluster())
3143 {
3144 // haveMessage contains peers, which are suppressed; i.e. the peers
3145 // are the source of the message, consequently the message should
3146 // not be relayed to these peers. But the message must be counted
3147 // as part of the squelch logic.
3148 auto haveMessage =
3149 overlay_.relay(*packet, key, val->getSignerPublic());
3150 if (!haveMessage.empty())
3151 {
3152 overlay_.updateSlotAndSquelch(
3153 key,
3154 val->getSignerPublic(),
3155 std::move(haveMessage),
3156 protocol::mtVALIDATION);
3157 }
3158 }
3159 }
3160 catch (std::exception const& ex)
3161 {
3162 JLOG(p_journal_.trace())
3163 << "Exception processing validation: " << ex.what();
3164 using namespace std::string_literals;
3165 charge(Resource::feeMalformedRequest, "validation "s + ex.what());
3166 }
3167}
3168
3169// Returns the set of peers that can help us get
3170// the TX tree with the specified root hash.
3171//
3173getPeerWithTree(OverlayImpl& ov, uint256 const& rootHash, PeerImp const* skip)
3174{
3176 int retScore = 0;
3177
3179 if (p->hasTxSet(rootHash) && p.get() != skip)
3180 {
3181 auto score = p->getScore(true);
3182 if (!ret || (score > retScore))
3183 {
3184 ret = std::move(p);
3185 retScore = score;
3186 }
3187 }
3188 });
3189
3190 return ret;
3191}
3192
3193// Returns a random peer weighted by how likely to
3194// have the ledger and how responsive it is.
3195//
3198 OverlayImpl& ov,
3199 uint256 const& ledgerHash,
3200 LedgerIndex ledger,
3201 PeerImp const* skip)
3202{
3204 int retScore = 0;
3205
3207 if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
3208 {
3209 auto score = p->getScore(true);
3210 if (!ret || (score > retScore))
3211 {
3212 ret = std::move(p);
3213 retScore = score;
3214 }
3215 }
3216 });
3217
3218 return ret;
3219}
3220
3221void
3222PeerImp::sendLedgerBase(
3223 std::shared_ptr<Ledger const> const& ledger,
3224 protocol::TMLedgerData& ledgerData)
3225{
3226 JLOG(p_journal_.trace()) << "sendLedgerBase: Base data";
3227
3228 Serializer s(sizeof(LedgerInfo));
3229 addRaw(ledger->info(), s);
3230 ledgerData.add_nodes()->set_nodedata(s.getDataPtr(), s.getLength());
3231
3232 auto const& stateMap{ledger->stateMap()};
3233 if (stateMap.getHash() != beast::zero)
3234 {
3235 // Return account state root node if possible
3236 Serializer root(768);
3237
3238 stateMap.serializeRoot(root);
3239 ledgerData.add_nodes()->set_nodedata(
3240 root.getDataPtr(), root.getLength());
3241
3242 if (ledger->info().txHash != beast::zero)
3243 {
3244 auto const& txMap{ledger->txMap()};
3245 if (txMap.getHash() != beast::zero)
3246 {
3247 // Return TX root node if possible
3248 root.erase();
3249 txMap.serializeRoot(root);
3250 ledgerData.add_nodes()->set_nodedata(
3251 root.getDataPtr(), root.getLength());
3252 }
3253 }
3254 }
3255
3256 auto message{
3257 std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
3258 send(message);
3259}
3260
3262PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
3263{
3264 JLOG(p_journal_.trace()) << "getLedger: Ledger";
3265
3267
3268 if (m->has_ledgerhash())
3269 {
3270 // Attempt to find ledger by hash
3271 uint256 const ledgerHash{m->ledgerhash()};
3272 ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
3273 if (!ledger)
3274 {
3275 JLOG(p_journal_.trace())
3276 << "getLedger: Don't have ledger with hash " << ledgerHash;
3277
3278 if (m->has_querytype() && !m->has_requestcookie())
3279 {
3280 // Attempt to relay the request to a peer
3281 if (auto const peer = getPeerWithLedger(
3282 overlay_,
3283 ledgerHash,
3284 m->has_ledgerseq() ? m->ledgerseq() : 0,
3285 this))
3286 {
3287 m->set_requestcookie(id());
3288 peer->send(
3289 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3290 JLOG(p_journal_.debug())
3291 << "getLedger: Request relayed to peer";
3292 return ledger;
3293 }
3294
3295 JLOG(p_journal_.trace())
3296 << "getLedger: Failed to find peer to relay request";
3297 }
3298 }
3299 }
3300 else if (m->has_ledgerseq())
3301 {
3302 // Attempt to find ledger by sequence
3303 if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
3304 {
3305 JLOG(p_journal_.debug())
3306 << "getLedger: Early ledger sequence request";
3307 }
3308 else
3309 {
3310 ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
3311 if (!ledger)
3312 {
3313 JLOG(p_journal_.debug())
3314 << "getLedger: Don't have ledger with sequence "
3315 << m->ledgerseq();
3316 }
3317 }
3318 }
3319 else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
3320 {
3321 ledger = app_.getLedgerMaster().getClosedLedger();
3322 }
3323
3324 if (ledger)
3325 {
3326 // Validate retrieved ledger sequence
3327 auto const ledgerSeq{ledger->info().seq};
3328 if (m->has_ledgerseq())
3329 {
3330 if (ledgerSeq != m->ledgerseq())
3331 {
3332 // Do not resource charge a peer responding to a relay
3333 if (!m->has_requestcookie())
3334 charge(
3335 Resource::feeMalformedRequest, "get_ledger ledgerSeq");
3336
3337 ledger.reset();
3338 JLOG(p_journal_.warn())
3339 << "getLedger: Invalid ledger sequence " << ledgerSeq;
3340 }
3341 }
3342 else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
3343 {
3344 ledger.reset();
3345 JLOG(p_journal_.debug())
3346 << "getLedger: Early ledger sequence request " << ledgerSeq;
3347 }
3348 }
3349 else
3350 {
3351 JLOG(p_journal_.debug()) << "getLedger: Unable to find ledger";
3352 }
3353
3354 return ledger;
3355}
3356
3358PeerImp::getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const
3359{
3360 JLOG(p_journal_.trace()) << "getTxSet: TX set";
3361
3362 uint256 const txSetHash{m->ledgerhash()};
3364 app_.getInboundTransactions().getSet(txSetHash, false)};
3365 if (!shaMap)
3366 {
3367 if (m->has_querytype() && !m->has_requestcookie())
3368 {
3369 // Attempt to relay the request to a peer
3370 if (auto const peer = getPeerWithTree(overlay_, txSetHash, this))
3371 {
3372 m->set_requestcookie(id());
3373 peer->send(
3374 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3375 JLOG(p_journal_.debug()) << "getTxSet: Request relayed";
3376 }
3377 else
3378 {
3379 JLOG(p_journal_.debug())
3380 << "getTxSet: Failed to find relay peer";
3381 }
3382 }
3383 else
3384 {
3385 JLOG(p_journal_.debug()) << "getTxSet: Failed to find TX set";
3386 }
3387 }
3388
3389 return shaMap;
3390}
3391
3392void
3393PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
3394{
3395 // Do not resource charge a peer responding to a relay
3396 if (!m->has_requestcookie())
3397 charge(
3398 Resource::feeModerateBurdenPeer, "received a get ledger request");
3399
3402 SHAMap const* map{nullptr};
3403 protocol::TMLedgerData ledgerData;
3404 bool fatLeaves{true};
3405 auto const itype{m->itype()};
3406
3407 if (itype == protocol::liTS_CANDIDATE)
3408 {
3409 if (sharedMap = getTxSet(m); !sharedMap)
3410 return;
3411 map = sharedMap.get();
3412
3413 // Fill out the reply
3414 ledgerData.set_ledgerseq(0);
3415 ledgerData.set_ledgerhash(m->ledgerhash());
3416 ledgerData.set_type(protocol::liTS_CANDIDATE);
3417 if (m->has_requestcookie())
3418 ledgerData.set_requestcookie(m->requestcookie());
3419
3420 // We'll already have most transactions
3421 fatLeaves = false;
3422 }
3423 else
3424 {
3425 if (send_queue_.size() >= Tuning::dropSendQueue)
3426 {
3427 JLOG(p_journal_.debug())
3428 << "processLedgerRequest: Large send queue";
3429 return;
3430 }
3431 if (app_.getFeeTrack().isLoadedLocal() && !cluster())
3432 {
3433 JLOG(p_journal_.debug()) << "processLedgerRequest: Too busy";
3434 return;
3435 }
3436
3437 if (ledger = getLedger(m); !ledger)
3438 return;
3439
3440 // Fill out the reply
3441 auto const ledgerHash{ledger->info().hash};
3442 ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
3443 ledgerData.set_ledgerseq(ledger->info().seq);
3444 ledgerData.set_type(itype);
3445 if (m->has_requestcookie())
3446 ledgerData.set_requestcookie(m->requestcookie());
3447
3448 switch (itype)
3449 {
3450 case protocol::liBASE:
3451 sendLedgerBase(ledger, ledgerData);
3452 return;
3453
3454 case protocol::liTX_NODE:
3455 map = &ledger->txMap();
3456 JLOG(p_journal_.trace()) << "processLedgerRequest: TX map hash "
3457 << to_string(map->getHash());
3458 break;
3459
3460 case protocol::liAS_NODE:
3461 map = &ledger->stateMap();
3462 JLOG(p_journal_.trace())
3463 << "processLedgerRequest: Account state map hash "
3464 << to_string(map->getHash());
3465 break;
3466
3467 default:
3468 // This case should not be possible here
3469 JLOG(p_journal_.error())
3470 << "processLedgerRequest: Invalid ledger info type";
3471 return;
3472 }
3473 }
3474
3475 if (!map)
3476 {
3477 JLOG(p_journal_.warn()) << "processLedgerRequest: Unable to find map";
3478 return;
3479 }
3480
3481 // Add requested node data to reply
3482 if (m->nodeids_size() > 0)
3483 {
3484 auto const queryDepth{
3485 m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)};
3486
3488
3489 for (int i = 0; i < m->nodeids_size() &&
3490 ledgerData.nodes_size() < Tuning::softMaxReplyNodes;
3491 ++i)
3492 {
3493 auto const shaMapNodeId{deserializeSHAMapNodeID(m->nodeids(i))};
3494
3495 data.clear();
3496 data.reserve(Tuning::softMaxReplyNodes);
3497
3498 try
3499 {
3500 if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
3501 {
3502 JLOG(p_journal_.trace())
3503 << "processLedgerRequest: getNodeFat got "
3504 << data.size() << " nodes";
3505
3506 for (auto const& d : data)
3507 {
3508 if (ledgerData.nodes_size() >=
3509 Tuning::hardMaxReplyNodes)
3510 break;
3511 protocol::TMLedgerNode* node{ledgerData.add_nodes()};
3512 node->set_nodeid(d.first.getRawString());
3513 node->set_nodedata(d.second.data(), d.second.size());
3514 }
3515 }
3516 else
3517 {
3518 JLOG(p_journal_.warn())
3519 << "processLedgerRequest: getNodeFat returns false";
3520 }
3521 }
3522 catch (std::exception const& e)
3523 {
3524 std::string info;
3525 switch (itype)
3526 {
3527 case protocol::liBASE:
3528 // This case should not be possible here
3529 info = "Ledger base";
3530 break;
3531
3532 case protocol::liTX_NODE:
3533 info = "TX node";
3534 break;
3535
3536 case protocol::liAS_NODE:
3537 info = "AS node";
3538 break;
3539
3540 case protocol::liTS_CANDIDATE:
3541 info = "TS candidate";
3542 break;
3543
3544 default:
3545 info = "Invalid";
3546 break;
3547 }
3548
3549 if (!m->has_ledgerhash())
3550 info += ", no hash specified";
3551
3552 JLOG(p_journal_.warn())
3553 << "processLedgerRequest: getNodeFat with nodeId "
3554 << *shaMapNodeId << " and ledger info type " << info
3555 << " throws exception: " << e.what();
3556 }
3557 }
3558
3559 JLOG(p_journal_.info())
3560 << "processLedgerRequest: Got request for " << m->nodeids_size()
3561 << " nodes at depth " << queryDepth << ", return "
3562 << ledgerData.nodes_size() << " nodes";
3563 }
3564
3565 if (ledgerData.nodes_size() == 0)
3566 return;
3567
3568 send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
3569}
3570
3571int
3572PeerImp::getScore(bool haveItem) const
3573{
3574 // Random component of score, used to break ties and avoid
3575 // overloading the "best" peer
3576 static int const spRandomMax = 9999;
3577
3578 // Score for being very likely to have the thing we are
3579 // look for; should be roughly spRandomMax
3580 static int const spHaveItem = 10000;
3581
3582 // Score reduction for each millisecond of latency; should
3583 // be roughly spRandomMax divided by the maximum reasonable
3584 // latency
3585 static int const spLatency = 30;
3586
3587 // Penalty for unknown latency; should be roughly spRandomMax
3588 static int const spNoLatency = 8000;
3589
3590 int score = rand_int(spRandomMax);
3591
3592 if (haveItem)
3593 score += spHaveItem;
3594
3596 {
3597 std::lock_guard sl(recentLock_);
3598 latency = latency_;
3599 }
3600
3601 if (latency)
3602 score -= latency->count() * spLatency;
3603 else
3604 score -= spNoLatency;
3605
3606 return score;
3607}
3608
3609bool
3610PeerImp::isHighLatency() const
3611{
3612 std::lock_guard sl(recentLock_);
3613 return latency_ >= peerHighLatency;
3614}
3615
3616void
3617PeerImp::Metrics::add_message(std::uint64_t bytes)
3618{
3619 using namespace std::chrono_literals;
3620 std::unique_lock lock{mutex_};
3621
3622 totalBytes_ += bytes;
3623 accumBytes_ += bytes;
3624 auto const timeElapsed = clock_type::now() - intervalStart_;
3625 auto const timeElapsedInSecs =
3626 std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
3627
3628 if (timeElapsedInSecs >= 1s)
3629 {
3630 auto const avgBytes = accumBytes_ / timeElapsedInSecs.count();
3631 rollingAvg_.push_back(avgBytes);
3632
3633 auto const totalBytes =
3634 std::accumulate(rollingAvg_.begin(), rollingAvg_.end(), 0ull);
3635 rollingAvgBytes_ = totalBytes / rollingAvg_.size();
3636
3637 intervalStart_ = clock_type::now();
3638 accumBytes_ = 0;
3639 }
3640}
3641
3643PeerImp::Metrics::average_bytes() const
3644{
3645 std::shared_lock lock{mutex_};
3646 return rollingAvgBytes_;
3647}
3648
3650PeerImp::Metrics::total_bytes() const
3651{
3652 std::shared_lock lock{mutex_};
3653 return totalBytes_;
3654}
3655
3656} // namespace ripple
T accumulate(T... args)
T bind(T... args)
Represents a JSON value.
Definition json_value.h:149
A version-independent IP address and port combination.
Definition IPEndpoint.h:38
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:75
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Definition IPEndpoint.h:68
static Endpoint from_string(std::string const &s)
std::string to_string() const
Returns a string representing the endpoint.
Stream error() const
Definition Journal.h:346
Stream debug() const
Definition Journal.h:328
bool active(Severity level) const
Returns true if any message would be logged at this severity level.
Definition Journal.h:314
Stream info() const
Definition Journal.h:334
Stream trace() const
Severity stream access functions.
Definition Journal.h:322
Stream warn() const
Definition Journal.h:340
virtual Config & config()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual TimeKeeper & timeKeeper()=0
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
virtual ValidatorList & validators()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual LedgerMaster & getLedgerMaster()=0
virtual Cluster & cluster()=0
virtual HashRouter & getHashRouter()=0
void for_each(std::function< void(ClusterNode const &)> func) const
Invokes the callback once for every cluster node.
Definition Cluster.cpp:83
std::size_t size() const
The number of nodes in the cluster list.
Definition Cluster.cpp:49
bool update(PublicKey const &identity, std::string name, std::uint32_t loadFee=0, NetClock::time_point reportTime=NetClock::time_point{})
Store information about the state of a cluster node.
Definition Cluster.cpp:57
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
Definition Cluster.cpp:38
bool VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE
Definition Config.h:248
bool TX_REDUCE_RELAY_METRICS
Definition Config.h:265
int MAX_TRANSACTIONS
Definition Config.h:226
std::chrono::seconds MAX_DIVERGED_TIME
Definition Config.h:284
std::chrono::seconds MAX_UNKNOWN_TIME
Definition Config.h:281
bool shouldProcess(uint256 const &key, PeerShortID peer, HashRouterFlags &flags, std::chrono::seconds tx_interval)
bool addSuppressionPeer(uint256 const &key, PeerShortID peer)
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition JobQueue.cpp:179
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition JobQueue.cpp:142
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:168
LedgerIndex getValidLedgerIndex()
std::chrono::seconds getValidatedLedgerAge()
void setClusterFee(std::uint32_t fee)
static std::size_t messageSize(::google::protobuf::Message const &message)
Definition Message.cpp:55
virtual bool isNeedNetworkLedger()=0
PeerFinder::Manager & peerFinder()
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
void onPeerDeactivate(Peer::id_t id)
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
void reportOutboundTraffic(TrafficCount::category cat, int bytes)
void for_each(UnaryFunc &&f) const
Resource::Manager & resourceManager()
void reportInboundTraffic(TrafficCount::category cat, int bytes)
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
Setup const & setup() const
std::shared_ptr< Message > getManifestsMessage()
void incPeerDisconnectCharges() override
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
virtual void on_endpoints(std::shared_ptr< Slot > const &slot, Endpoints const &endpoints)=0
Called when mtENDPOINTS is received.
virtual Config config()=0
Returns the configuration for the manager.
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
virtual void on_failure(std::shared_ptr< Slot > const &slot)=0
Called when an outbound connection is deemed to have failed.
This class manages established peer-to-peer connections, handles message exchange,...
Definition PeerImp.h:118
std::queue< std::shared_ptr< Message > > send_queue_
Definition PeerImp.h:241
std::unique_ptr< LoadEvent > load_event_
Definition PeerImp.h:256
boost::beast::http::fields const & headers_
Definition PeerImp.h:240
void onMessageEnd(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m)
Definition PeerImp.cpp:1160
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
Definition PeerImp.cpp:527
clock_type::duration uptime() const
Definition PeerImp.h:441
void removeTxQueue(uint256 const &hash) override
Remove transaction's hash from the transactions' hashes queue.
Definition PeerImp.cpp:343
protocol::TMStatusChange last_status_
Definition PeerImp.h:233
std::string name_
Definition PeerImp.h:165
boost::circular_buffer< uint256 > recentTxSets_
Definition PeerImp.h:176
std::unique_ptr< stream_type > stream_ptr_
Definition PeerImp.h:141
void onMessage(std::shared_ptr< protocol::TMManifests > const &m)
Definition PeerImp.cpp:1169
Tracking
Whether the peer's view of the ledger converges or diverges from ours.
Definition PeerImp.h:121
Compressed compressionEnabled_
Definition PeerImp.h:261
uint256 closedLedgerHash_
Definition PeerImp.h:172
std::string domain() const
Definition PeerImp.cpp:902
std::optional< std::uint32_t > lastPingSeq_
Definition PeerImp.h:179
beast::Journal const journal_
Definition PeerImp.h:139
virtual void run()
Definition PeerImp.cpp:161
struct ripple::PeerImp::@21 metrics_
void tryAsyncShutdown()
Attempts to perform a graceful SSL shutdown if conditions are met.
Definition PeerImp.cpp:622
LedgerIndex maxLedger_
Definition PeerImp.h:171
beast::Journal const p_journal_
Definition PeerImp.h:140
bool const inbound_
Definition PeerImp.h:156
PeerImp(PeerImp const &)=delete
Application & app_
Definition PeerImp.h:135
void stop() override
Definition PeerImp.cpp:219
void shutdown()
Initiates the peer disconnection sequence.
Definition PeerImp.cpp:646
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
Definition PeerImp.cpp:569
bool hasTxSet(uint256 const &hash) const override
Definition PeerImp.cpp:551
clock_type::time_point lastPingTime_
Definition PeerImp.h:180
void onMessageUnknown(std::uint16_t type)
Definition PeerImp.cpp:1111
std::shared_ptr< PeerFinder::Slot > const slot_
Definition PeerImp.h:236
std::shared_mutex nameMutex_
Definition PeerImp.h:166
boost::circular_buffer< uint256 > recentLedgers_
Definition PeerImp.h:175
id_t const id_
Definition PeerImp.h:136
std::optional< std::chrono::milliseconds > latency_
Definition PeerImp.h:178
void handleTransaction(std::shared_ptr< protocol::TMTransaction > const &m, bool eraseTxQueue, bool batch)
Called from onMessage(TMTransaction(s)).
Definition PeerImp.cpp:1361
beast::IP::Endpoint const remote_address_
Definition PeerImp.h:151
Json::Value json() override
Definition PeerImp.cpp:392
PublicKey const publicKey_
Definition PeerImp.h:164
void close()
Forcibly closes the underlying socket connection.
Definition PeerImp.cpp:689
hash_set< uint256 > txQueue_
Definition PeerImp.h:266
std::mutex recentLock_
Definition PeerImp.h:232
void onMessageBegin(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m, std::size_t size, std::size_t uncompressed_size, bool isCompressed)
Definition PeerImp.cpp:1117
bool txReduceRelayEnabled_
Definition PeerImp.h:268
void fail(std::string const &name, error_code ec)
Handles a failure associated with a specific error code.
Definition PeerImp.cpp:579
clock_type::time_point trackingTime_
Definition PeerImp.h:162
void cancelTimer() noexcept
Cancels any pending wait on the peer activity timer.
Definition PeerImp.cpp:801
bool shutdownStarted_
Definition PeerImp.h:247
socket_type & socket_
Definition PeerImp.h:142
ProtocolVersion protocol_
Definition PeerImp.h:159
reduce_relay::Squelch< UptimeClock > squelch_
Definition PeerImp.h:183
bool writePending_
Definition PeerImp.h:253
std::string getVersion() const
Return the version of rippled that the peer is running, if reported.
Definition PeerImp.cpp:384
uint256 previousLedgerHash_
Definition PeerImp.h:173
void charge(Resource::Charge const &fee, std::string const &context) override
Adjust this peer's load balance based on the type of load imposed.
Definition PeerImp.cpp:355
void onTimer(error_code const &ec)
Handles the expiration of the peer activity timer.
Definition PeerImp.cpp:734
void send(std::shared_ptr< Message > const &m) override
Definition PeerImp.cpp:239
static std::string makePrefix(id_t id)
Definition PeerImp.cpp:814
std::string name() const
Definition PeerImp.cpp:895
boost::system::error_code error_code
Definition PeerImp.h:125
void onReadMessage(error_code ec, std::size_t bytes_transferred)
Definition PeerImp.cpp:954
bool ledgerReplayEnabled_
Definition PeerImp.h:270
boost::asio::basic_waitable_timer< std::chrono::steady_clock > waitable_timer
Definition PeerImp.h:132
bool crawl() const
Returns true if this connection will publicly share its IP address.
Definition PeerImp.cpp:369
waitable_timer timer_
Definition PeerImp.h:147
void setTimer(std::chrono::seconds interval)
Sets and starts the peer timer.
Definition PeerImp.cpp:715
void sendTxQueue() override
Send aggregated transactions' hashes.
Definition PeerImp.cpp:307
bool txReduceRelayEnabled() const override
Definition PeerImp.h:511
bool supportsFeature(ProtocolFeature f) const override
Definition PeerImp.cpp:510
ChargeWithContext fee_
Definition PeerImp.h:235
void onWriteMessage(error_code ec, std::size_t bytes_transferred)
Definition PeerImp.cpp:1045
http_request_type request_
Definition PeerImp.h:238
OverlayImpl & overlay_
Definition PeerImp.h:155
LedgerIndex minLedger_
Definition PeerImp.h:170
virtual ~PeerImp()
Definition PeerImp.cpp:138
void addTxQueue(uint256 const &hash) override
Add transaction's hash to the transactions' hashes queue.
Definition PeerImp.cpp:326
stream_type & stream_
Definition PeerImp.h:143
bool cluster() const override
Returns true if this connection is a member of the cluster.
Definition PeerImp.cpp:378
void onShutdown(error_code ec)
Handles the completion of the asynchronous SSL shutdown.
Definition PeerImp.cpp:663
boost::asio::strand< boost::asio::executor > strand_
Definition PeerImp.h:144
void cycleStatus() override
Definition PeerImp.cpp:559
boost::beast::multi_buffer read_buffer_
Definition PeerImp.h:237
Resource::Consumer usage_
Definition PeerImp.h:234
bool readPending_
Definition PeerImp.h:250
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
Definition PeerImp.cpp:542
void doProtocolStart()
Definition PeerImp.cpp:912
std::atomic< Tracking > tracking_
Definition PeerImp.h:161
Represents a peer connection in the overlay.
A public key.
Definition PublicKey.h:61
A peer's signed, proposed position for use in RCLConsensus.
bool checkSign() const
Verify the signing hash of the proposal.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
uint256 const & suppressionID() const
Unique id used by hash router to suppress duplicates.
A consumption charge.
Definition Charge.h:30
An endpoint that consumes resources.
Definition Consumer.h:35
int balance()
Returns the credit balance representing consumption.
Definition Consumer.cpp:137
bool disconnect(beast::Journal const &j)
Returns true if the consumer should be disconnected.
Definition Consumer.cpp:124
Disposition charge(Charge const &fee, std::string const &context={})
Apply a load charge to the consumer.
Definition Consumer.cpp:106
virtual void importConsumers(std::string const &origin, Gossip const &gossip)=0
Import packaged consumer information.
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
Definition SHAMap.h:99
std::size_t size() const noexcept
Definition Serializer.h:72
void const * data() const noexcept
Definition Serializer.h:78
int getLength() const
Definition Serializer.h:233
void const * getDataPtr() const
Definition Serializer.h:223
An immutable linear range of bytes.
Definition Slice.h:46
time_point now() const override
Returns the current time, using the server's clock.
Definition TimeKeeper.h:64
static category categorize(::google::protobuf::Message const &message, protocol::MessageType type, bool inbound)
Given a protocol message, determine which traffic category it belongs to.
static void sendValidatorList(Peer &peer, std::uint64_t peerSequence, PublicKey const &publisherKey, std::size_t maxSequence, std::uint32_t rawVersion, std::string const &rawManifest, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, HashRouter &hashRouter, beast::Journal j)
void for_each_available(std::function< void(std::string const &manifest, std::uint32_t version, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, PublicKey const &pubKey, std::size_t maxSequence, uint256 const &hash)> func) const
Invokes the callback once for every available publisher list's raw data members.
static constexpr std::size_t size()
Definition base_uint.h:526
constexpr bool parseHex(std::string_view sv)
Parse a hex string into a base_uint.
Definition base_uint.h:503
T emplace_back(T... args)
T empty(T... args)
T find(T... args)
T for_each(T... args)
T get(T... args)
T is_same_v
T load(T... args)
T lock(T... args)
T max(T... args)
T min(T... args)
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:45
unsigned int UInt
Charge const feeMalformedRequest
Schedule of fees charged for imposing load on the server.
Charge const feeInvalidData
Charge const feeUselessData
Charge const feeTrivialPeer
Charge const feeModerateBurdenPeer
std::size_t constexpr readBufferBytes
Size of buffer used to read from the socket.
@ targetSendQueue
How many messages we consider reasonable sustained on a send queue.
@ maxQueryDepth
The maximum number of levels to search.
@ sendqIntervals
How many timer intervals a sendq has to stay large before we disconnect.
@ sendQueueLogFreq
How often to log send queue size.
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
auto measureDurationAndLog(Func &&func, std::string const &actionDescription, std::chrono::duration< Rep, Period > maxDelay, beast::Journal const &journal)
Definition PerfLog.h:187
static constexpr std::size_t MAX_TX_QUEUE_SIZE
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
uint256 proposalUniqueId(uint256 const &proposeHash, uint256 const &previousLedger, std::uint32_t proposeSeq, NetClock::time_point closeTime, Slice const &publicKey, Slice const &signature)
Calculate a unique identifier for a signed proposal.
constexpr ProtocolVersion make_protocol(std::uint16_t major, std::uint16_t minor)
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
Definition STTx.cpp:820
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
static constexpr char FEATURE_COMPR[]
Definition Handshake.h:141
bool isCurrent(ValidationParms const &p, NetClock::time_point now, NetClock::time_point signTime, NetClock::time_point seenTime)
Whether a validation is still current.
std::string base64_decode(std::string_view data)
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address public_ip, beast::IP::Address remote_ip, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
static bool stringIsUint256Sized(std::string const &pBuffStr)
Definition PeerImp.cpp:155
static constexpr char FEATURE_LEDGER_REPLAY[]
Definition Handshake.h:147
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
HashRouterFlags
Definition HashRouter.h:34
std::optional< KeyType > publicKeyType(Slice const &slice)
Returns the type of public key.
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:30
static std::shared_ptr< PeerImp > getPeerWithLedger(OverlayImpl &ov, uint256 const &ledgerHash, LedgerIndex ledger, PeerImp const *skip)
Definition PeerImp.cpp:3197
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:244
Stopwatch & stopwatch()
Returns an instance of a wall clock.
Definition chrono.h:119
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:33
NodeID calcNodeID(PublicKey const &)
Calculate the 160-bit node ID from a node public key.
static std::shared_ptr< PeerImp > getPeerWithTree(OverlayImpl &ov, uint256 const &rootHash, PeerImp const *skip)
Definition PeerImp.cpp:3173
bool peerFeatureEnabled(headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
Definition Handshake.h:198
void forceValidity(HashRouter &router, uint256 const &txid, Validity validity)
Sets the validity of a given transaction in the cache.
Definition apply.cpp:118
static constexpr char FEATURE_TXRR[]
Definition Handshake.h:145
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:630
std::optional< Rules > const & getCurrentTransactionRules()
Definition Rules.cpp:47
Number root(Number f, unsigned d)
Definition Number.cpp:636
@ manifest
Manifest.
@ proposal
proposal for signing
void addRaw(LedgerHeader const &, Serializer &, bool includeHash=false)
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
Definition apply.cpp:44
@ jtLEDGER_REQ
Definition Job.h:59
@ jtPROPOSAL_ut
Definition Job.h:60
@ jtREPLAY_REQ
Definition Job.h:58
@ jtTRANSACTION
Definition Job.h:62
@ jtPEER
Definition Job.h:80
@ jtREQUESTED_TXN
Definition Job.h:64
@ jtMISSING_TXN
Definition Job.h:63
@ jtVALIDATION_t
Definition Job.h:71
@ jtMANIFEST
Definition Job.h:55
@ jtTXN_DATA
Definition Job.h:69
@ jtPACK
Definition Job.h:43
@ jtVALIDATION_ut
Definition Job.h:54
@ jtPROPOSAL_t
Definition Job.h:74
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
Definition digest.h:224
static constexpr char FEATURE_VPRR[]
Definition Handshake.h:143
constexpr std::uint32_t tfInnerBatchTxn
Definition TxFlags.h:61
STL namespace.
T nth_element(T... args)
T ref(T... args)
T reserve(T... args)
T reset(T... args)
T setfill(T... args)
T setw(T... args)
T size(T... args)
T str(T... args)
Information about the notional ledger backing the view.
beast::IP::Address public_ip
Definition Overlay.h:69
std::optional< std::uint32_t > networkID
Definition Overlay.h:72
bool peerPrivate
true if we want our IP address kept private.
void update(Resource::Charge f, std::string const &add)
Definition PeerImp.h:218
Describes a single consumer.
Definition Gossip.h:37
beast::IP::Endpoint address
Definition Gossip.h:41
Data format for exchanging consumption information across peers.
Definition Gossip.h:32
std::vector< Item > items
Definition Gossip.h:44
T tie(T... args)
T to_string(T... args)
T what(T... args)