rippled
Loading...
Searching...
No Matches
PeerImp.cpp
1#include <xrpld/app/consensus/RCLValidations.h>
2#include <xrpld/app/ledger/InboundLedgers.h>
3#include <xrpld/app/ledger/InboundTransactions.h>
4#include <xrpld/app/ledger/LedgerMaster.h>
5#include <xrpld/app/ledger/TransactionMaster.h>
6#include <xrpld/app/misc/HashRouter.h>
7#include <xrpld/app/misc/LoadFeeTrack.h>
8#include <xrpld/app/misc/NetworkOPs.h>
9#include <xrpld/app/misc/Transaction.h>
10#include <xrpld/app/misc/ValidatorList.h>
11#include <xrpld/app/tx/apply.h>
12#include <xrpld/overlay/Cluster.h>
13#include <xrpld/overlay/detail/PeerImp.h>
14#include <xrpld/overlay/detail/Tuning.h>
15#include <xrpld/perflog/PerfLog.h>
16
17#include <xrpl/basics/UptimeClock.h>
18#include <xrpl/basics/base64.h>
19#include <xrpl/basics/random.h>
20#include <xrpl/basics/safe_cast.h>
21#include <xrpl/protocol/TxFlags.h>
22#include <xrpl/protocol/digest.h>
23
24#include <boost/algorithm/string/predicate.hpp>
25#include <boost/beast/core/ostream.hpp>
26
27#include <algorithm>
28#include <chrono>
29#include <memory>
30#include <mutex>
31#include <numeric>
32#include <sstream>
33
34using namespace std::chrono_literals;
35
36namespace ripple {
37
38namespace {
40std::chrono::milliseconds constexpr peerHighLatency{300};
41
43std::chrono::seconds constexpr peerTimerInterval{60};
44
46std::chrono::seconds constexpr shutdownTimerInterval{5};
47
48} // namespace
49
50// TODO: Remove this exclusion once unit tests are added after the hotfix
51// release.
52
54 Application& app,
55 id_t id,
57 http_request_type&& request,
58 PublicKey const& publicKey,
60 Resource::Consumer consumer,
62 OverlayImpl& overlay)
63 : Child(overlay)
64 , app_(app)
65 , id_(id)
66 , fingerprint_(
67 getFingerprint(slot->remote_endpoint(), publicKey, to_string(id)))
68 , prefix_(makePrefix(fingerprint_))
69 , sink_(app_.journal("Peer"), prefix_)
70 , p_sink_(app_.journal("Protocol"), prefix_)
71 , journal_(sink_)
72 , p_journal_(p_sink_)
73 , stream_ptr_(std::move(stream_ptr))
74 , socket_(stream_ptr_->next_layer().socket())
75 , stream_(*stream_ptr_)
76 , strand_(boost::asio::make_strand(socket_.get_executor()))
77 , timer_(waitable_timer{socket_.get_executor()})
78 , remote_address_(slot->remote_endpoint())
79 , overlay_(overlay)
80 , inbound_(true)
81 , protocol_(protocol)
82 , tracking_(Tracking::unknown)
83 , trackingTime_(clock_type::now())
84 , publicKey_(publicKey)
85 , lastPingTime_(clock_type::now())
86 , creationTime_(clock_type::now())
87 , squelch_(app_.journal("Squelch"))
88 , usage_(consumer)
89 , fee_{Resource::feeTrivialPeer, ""}
90 , slot_(slot)
91 , request_(std::move(request))
92 , headers_(request_)
93 , compressionEnabled_(
95 headers_,
97 "lz4",
98 app_.config().COMPRESSION)
99 ? Compressed::On
100 : Compressed::Off)
101 , txReduceRelayEnabled_(peerFeatureEnabled(
102 headers_,
104 app_.config().TX_REDUCE_RELAY_ENABLE))
105 , ledgerReplayEnabled_(peerFeatureEnabled(
106 headers_,
108 app_.config().LEDGER_REPLAY))
109 , ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
110{
111 JLOG(journal_.info())
112 << "compression enabled " << (compressionEnabled_ == Compressed::On)
113 << " vp reduce-relay base squelch enabled "
115 headers_,
118 << " tx reduce-relay enabled " << txReduceRelayEnabled_;
119}
120
122{
123 bool const inCluster{cluster()};
124
129
130 if (inCluster)
131 {
132 JLOG(journal_.warn()) << name() << " left cluster";
133 }
134}
135
136// Helper function to check for valid uint256 values in protobuf buffers
137static bool
139{
140 return pBuffStr.size() == uint256::size();
141}
142
143void
145{
146 if (!strand_.running_in_this_thread())
148
149 auto parseLedgerHash =
151 if (uint256 ret; ret.parseHex(value))
152 return ret;
153
154 if (auto const s = base64_decode(value); s.size() == uint256::size())
155 return uint256{s};
156
157 return std::nullopt;
158 };
159
161 std::optional<uint256> previous;
162
163 if (auto const iter = headers_.find("Closed-Ledger");
164 iter != headers_.end())
165 {
166 closed = parseLedgerHash(iter->value());
167
168 if (!closed)
169 fail("Malformed handshake data (1)");
170 }
171
172 if (auto const iter = headers_.find("Previous-Ledger");
173 iter != headers_.end())
174 {
175 previous = parseLedgerHash(iter->value());
176
177 if (!previous)
178 fail("Malformed handshake data (2)");
179 }
180
181 if (previous && !closed)
182 fail("Malformed handshake data (3)");
183
184 {
186 if (closed)
187 closedLedgerHash_ = *closed;
188 if (previous)
189 previousLedgerHash_ = *previous;
190 }
191
192 if (inbound_)
193 doAccept();
194 else
196
197 // Anything else that needs to be done with the connection should be
198 // done in doProtocolStart
199}
200
201void
203{
204 if (!strand_.running_in_this_thread())
206
207 if (!socket_.is_open())
208 return;
209
210 // The rationale for using different severity levels is that
211 // outbound connections are under our control and may be logged
212 // at a higher level, but inbound connections are more numerous and
213 // uncontrolled so to prevent log flooding the severity is reduced.
214 JLOG(journal_.debug()) << "stop: Stop";
215
216 shutdown();
217}
218
219//------------------------------------------------------------------------------
220
221void
223{
224 if (!strand_.running_in_this_thread())
225 return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
226
227 if (!socket_.is_open())
228 return;
229
230 // we are in progress of closing the connection
231 if (shutdown_)
232 return tryAsyncShutdown();
233
234 auto validator = m->getValidatorKey();
235 if (validator && !squelch_.expireSquelch(*validator))
236 {
239 static_cast<int>(m->getBuffer(compressionEnabled_).size()));
240 return;
241 }
242
243 // report categorized outgoing traffic
245 safe_cast<TrafficCount::category>(m->getCategory()),
246 static_cast<int>(m->getBuffer(compressionEnabled_).size()));
247
248 // report total outgoing traffic
251 static_cast<int>(m->getBuffer(compressionEnabled_).size()));
252
253 auto sendq_size = send_queue_.size();
254
255 if (sendq_size < Tuning::targetSendQueue)
256 {
257 // To detect a peer that does not read from their
258 // side of the connection, we expect a peer to have
259 // a small senq periodically
260 large_sendq_ = 0;
261 }
262 else if (auto sink = journal_.debug();
263 sink && (sendq_size % Tuning::sendQueueLogFreq) == 0)
264 {
265 std::string const n = name();
266 sink << n << " sendq: " << sendq_size;
267 }
268
269 send_queue_.push(m);
270
271 if (sendq_size != 0)
272 return;
273
274 writePending_ = true;
275 boost::asio::async_write(
276 stream_,
277 boost::asio::buffer(
278 send_queue_.front()->getBuffer(compressionEnabled_)),
279 bind_executor(
280 strand_,
281 std::bind(
284 std::placeholders::_1,
285 std::placeholders::_2)));
286}
287
288void
290{
291 if (!strand_.running_in_this_thread())
292 return post(
294
295 if (!txQueue_.empty())
296 {
297 protocol::TMHaveTransactions ht;
298 std::for_each(txQueue_.begin(), txQueue_.end(), [&](auto const& hash) {
299 ht.add_hashes(hash.data(), hash.size());
300 });
301 JLOG(p_journal_.trace()) << "sendTxQueue " << txQueue_.size();
302 txQueue_.clear();
303 send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
304 }
305}
306
307void
309{
310 if (!strand_.running_in_this_thread())
311 return post(
313
315 {
316 JLOG(p_journal_.warn()) << "addTxQueue exceeds the cap";
317 sendTxQueue();
318 }
319
320 txQueue_.insert(hash);
321 JLOG(p_journal_.trace()) << "addTxQueue " << txQueue_.size();
322}
323
324void
326{
327 if (!strand_.running_in_this_thread())
328 return post(
329 strand_,
331
332 auto removed = txQueue_.erase(hash);
333 JLOG(p_journal_.trace()) << "removeTxQueue " << removed;
334}
335
336void
338{
339 if ((usage_.charge(fee, context) == Resource::drop) &&
340 usage_.disconnect(p_journal_) && strand_.running_in_this_thread())
341 {
342 // Sever the connection
344 fail("charge: Resources");
345 }
346}
347
348//------------------------------------------------------------------------------
349
350bool
352{
353 auto const iter = headers_.find("Crawl");
354 if (iter == headers_.end())
355 return false;
356 return boost::iequals(iter->value(), "public");
357}
358
359bool
361{
362 return static_cast<bool>(app_.cluster().member(publicKey_));
363}
364
367{
368 if (inbound_)
369 return headers_["User-Agent"];
370 return headers_["Server"];
371}
372
375{
377
378 ret[jss::public_key] = toBase58(TokenType::NodePublic, publicKey_);
379 ret[jss::address] = remote_address_.to_string();
380
381 if (inbound_)
382 ret[jss::inbound] = true;
383
384 if (cluster())
385 {
386 ret[jss::cluster] = true;
387
388 if (auto const n = name(); !n.empty())
389 // Could move here if Json::Value supported moving from a string
390 ret[jss::name] = n;
391 }
392
393 if (auto const d = domain(); !d.empty())
394 ret[jss::server_domain] = std::string{d};
395
396 if (auto const nid = headers_["Network-ID"]; !nid.empty())
397 ret[jss::network_id] = std::string{nid};
398
399 ret[jss::load] = usage_.balance();
400
401 if (auto const version = getVersion(); !version.empty())
402 ret[jss::version] = std::string{version};
403
404 ret[jss::protocol] = to_string(protocol_);
405
406 {
408 if (latency_)
409 ret[jss::latency] = static_cast<Json::UInt>(latency_->count());
410 }
411
412 ret[jss::uptime] = static_cast<Json::UInt>(
413 std::chrono::duration_cast<std::chrono::seconds>(uptime()).count());
414
415 std::uint32_t minSeq, maxSeq;
416 ledgerRange(minSeq, maxSeq);
417
418 if ((minSeq != 0) || (maxSeq != 0))
419 ret[jss::complete_ledgers] =
420 std::to_string(minSeq) + " - " + std::to_string(maxSeq);
421
422 switch (tracking_.load())
423 {
425 ret[jss::track] = "diverged";
426 break;
427
429 ret[jss::track] = "unknown";
430 break;
431
433 // Nothing to do here
434 break;
435 }
436
437 uint256 closedLedgerHash;
438 protocol::TMStatusChange last_status;
439 {
441 closedLedgerHash = closedLedgerHash_;
442 last_status = last_status_;
443 }
444
445 if (closedLedgerHash != beast::zero)
446 ret[jss::ledger] = to_string(closedLedgerHash);
447
448 if (last_status.has_newstatus())
449 {
450 switch (last_status.newstatus())
451 {
452 case protocol::nsCONNECTING:
453 ret[jss::status] = "connecting";
454 break;
455
456 case protocol::nsCONNECTED:
457 ret[jss::status] = "connected";
458 break;
459
460 case protocol::nsMONITORING:
461 ret[jss::status] = "monitoring";
462 break;
463
464 case protocol::nsVALIDATING:
465 ret[jss::status] = "validating";
466 break;
467
468 case protocol::nsSHUTTING:
469 ret[jss::status] = "shutting";
470 break;
471
472 default:
473 JLOG(p_journal_.warn())
474 << "Unknown status: " << last_status.newstatus();
475 }
476 }
477
478 ret[jss::metrics] = Json::Value(Json::objectValue);
479 ret[jss::metrics][jss::total_bytes_recv] =
480 std::to_string(metrics_.recv.total_bytes());
481 ret[jss::metrics][jss::total_bytes_sent] =
482 std::to_string(metrics_.sent.total_bytes());
483 ret[jss::metrics][jss::avg_bps_recv] =
484 std::to_string(metrics_.recv.average_bytes());
485 ret[jss::metrics][jss::avg_bps_sent] =
486 std::to_string(metrics_.sent.average_bytes());
487
488 return ret;
489}
490
491bool
493{
494 switch (f)
495 {
497 return protocol_ >= make_protocol(2, 1);
499 return protocol_ >= make_protocol(2, 2);
502 }
503 return false;
504}
505
506//------------------------------------------------------------------------------
507
508bool
510{
511 {
513 if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_) &&
515 return true;
516 if (std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
517 recentLedgers_.end())
518 return true;
519 }
520 return false;
521}
522
523void
525{
527
528 minSeq = minLedger_;
529 maxSeq = maxLedger_;
530}
531
532bool
533PeerImp::hasTxSet(uint256 const& hash) const
534{
536 return std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
537 recentTxSets_.end();
538}
539
540void
542{
543 // Operations on closedLedgerHash_ and previousLedgerHash_ must be
544 // guarded by recentLock_.
548}
549
550bool
552{
554 return (tracking_ != Tracking::diverged) && (uMin >= minLedger_) &&
555 (uMax <= maxLedger_);
556}
557
558//------------------------------------------------------------------------------
559
560void
562{
563 XRPL_ASSERT(
564 strand_.running_in_this_thread(),
565 "ripple::PeerImp::fail : strand in this thread");
566
567 if (!socket_.is_open())
568 return;
569
570 JLOG(journal_.warn()) << name << ": " << ec.message();
571
572 shutdown();
573}
574
575void
577{
578 if (!strand_.running_in_this_thread())
579 return post(
580 strand_,
581 std::bind(
582 (void(Peer::*)(std::string const&)) & PeerImp::fail,
584 reason));
585
586 if (!socket_.is_open())
587 return;
588
589 // Call to name() locks, log only if the message will be outputed
591 {
592 std::string const n = name();
593 JLOG(journal_.warn()) << n << " failed: " << reason;
594 }
595
596 shutdown();
597}
598
599void
601{
602 XRPL_ASSERT(
603 strand_.running_in_this_thread(),
604 "ripple::PeerImp::tryAsyncShutdown : strand in this thread");
605
607 return;
608
610 return;
611
612 shutdownStarted_ = true;
613
614 setTimer(shutdownTimerInterval);
615
616 // gracefully shutdown the SSL socket, performing a shutdown handshake
617 stream_.async_shutdown(bind_executor(
618 strand_,
619 std::bind(
620 &PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
621}
622
623void
625{
626 XRPL_ASSERT(
627 strand_.running_in_this_thread(),
628 "ripple::PeerImp::shutdown: strand in this thread");
629
630 if (!socket_.is_open() || shutdown_)
631 return;
632
633 shutdown_ = true;
634
635 boost::beast::get_lowest_layer(stream_).cancel();
636
638}
639
640void
642{
643 cancelTimer();
644 if (ec)
645 {
646 // - eof: the stream was cleanly closed
647 // - operation_aborted: an expired timer (slow shutdown)
648 // - stream_truncated: the tcp connection closed (no handshake) it could
649 // occur if a peer does not perform a graceful disconnect
650 // - broken_pipe: the peer is gone
651 bool shouldLog =
652 (ec != boost::asio::error::eof &&
653 ec != boost::asio::error::operation_aborted &&
654 ec.message().find("application data after close notify") ==
655 std::string::npos);
656
657 if (shouldLog)
658 {
659 JLOG(journal_.debug()) << "onShutdown: " << ec.message();
660 }
661 }
662
663 close();
664}
665
666void
668{
669 XRPL_ASSERT(
670 strand_.running_in_this_thread(),
671 "ripple::PeerImp::close : strand in this thread");
672
673 if (!socket_.is_open())
674 return;
675
676 cancelTimer();
677
678 error_code ec;
679 socket_.close(ec);
680
682
683 // The rationale for using different severity levels is that
684 // outbound connections are under our control and may be logged
685 // at a higher level, but inbound connections are more numerous and
686 // uncontrolled so to prevent log flooding the severity is reduced.
687 JLOG((inbound_ ? journal_.debug() : journal_.info())) << "close: Closed";
688}
689
690//------------------------------------------------------------------------------
691
692void
694{
695 try
696 {
697 timer_.expires_after(interval);
698 }
699 catch (std::exception const& ex)
700 {
701 JLOG(journal_.error()) << "setTimer: " << ex.what();
702 return shutdown();
703 }
704
705 timer_.async_wait(bind_executor(
706 strand_,
707 std::bind(
708 &PeerImp::onTimer, shared_from_this(), std::placeholders::_1)));
709}
710
711//------------------------------------------------------------------------------
712
715{
717 ss << "[" << fingerprint << "] ";
718 return ss.str();
719}
720
721void
723{
724 XRPL_ASSERT(
725 strand_.running_in_this_thread(),
726 "ripple::PeerImp::onTimer : strand in this thread");
727
728 if (!socket_.is_open())
729 return;
730
731 if (ec)
732 {
733 // do not initiate shutdown, timers are frequently cancelled
734 if (ec == boost::asio::error::operation_aborted)
735 return;
736
737 // This should never happen
738 JLOG(journal_.error()) << "onTimer: " << ec.message();
739 return close();
740 }
741
742 // the timer expired before the shutdown completed
743 // force close the connection
744 if (shutdown_)
745 {
746 JLOG(journal_.debug()) << "onTimer: shutdown timer expired";
747 return close();
748 }
749
751 return fail("Large send queue");
752
753 if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged)
754 {
755 clock_type::duration duration;
756
757 {
759 duration = clock_type::now() - trackingTime_;
760 }
761
762 if ((t == Tracking::diverged &&
763 (duration > app_.config().MAX_DIVERGED_TIME)) ||
764 (t == Tracking::unknown &&
765 (duration > app_.config().MAX_UNKNOWN_TIME)))
766 {
768 return fail("Not useful");
769 }
770 }
771
772 // Already waiting for PONG
773 if (lastPingSeq_)
774 return fail("Ping Timeout");
775
777 lastPingSeq_ = rand_int<std::uint32_t>();
778
779 protocol::TMPing message;
780 message.set_type(protocol::TMPing::ptPING);
781 message.set_seq(*lastPingSeq_);
782
783 send(std::make_shared<Message>(message, protocol::mtPING));
784
785 setTimer(peerTimerInterval);
786}
787
788void
790{
791 try
792 {
793 timer_.cancel();
794 }
795 catch (std::exception const& ex)
796 {
797 JLOG(journal_.error()) << "cancelTimer: " << ex.what();
798 }
799}
800
801//------------------------------------------------------------------------------
802void
804{
805 XRPL_ASSERT(
806 read_buffer_.size() == 0,
807 "ripple::PeerImp::doAccept : empty read buffer");
808
809 JLOG(journal_.debug()) << "doAccept";
810
811 // a shutdown was initiated before the handshake, there is nothing to do
812 if (shutdown_)
813 return tryAsyncShutdown();
814
815 auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
816
817 // This shouldn't fail since we already computed
818 // the shared value successfully in OverlayImpl
819 if (!sharedValue)
820 return fail("makeSharedValue: Unexpected failure");
821
822 JLOG(journal_.debug()) << "Protocol: " << to_string(protocol_);
823
824 if (auto member = app_.cluster().member(publicKey_))
825 {
826 {
828 name_ = *member;
829 }
830 JLOG(journal_.info()) << "Cluster name: " << *member;
831 }
832
834
835 // XXX Set timer: connection is in grace period to be useful.
836 // XXX Set timer: connection idle (idle may vary depending on connection
837 // type.)
838
840
841 boost::beast::ostream(*write_buffer) << makeResponse(
843 request_,
846 *sharedValue,
848 protocol_,
849 app_);
850
851 // Write the whole buffer and only start protocol when that's done.
852 boost::asio::async_write(
853 stream_,
854 write_buffer->data(),
855 boost::asio::transfer_all(),
856 bind_executor(
857 strand_,
858 [this, write_buffer, self = shared_from_this()](
859 error_code ec, std::size_t bytes_transferred) {
860 if (!socket_.is_open())
861 return;
862 if (ec == boost::asio::error::operation_aborted)
863 return tryAsyncShutdown();
864 if (ec)
865 return fail("onWriteResponse", ec);
866 if (write_buffer->size() == bytes_transferred)
867 return doProtocolStart();
868 return fail("Failed to write header");
869 }));
870}
871
874{
875 std::shared_lock read_lock{nameMutex_};
876 return name_;
877}
878
881{
882 return headers_["Server-Domain"];
883}
884
885//------------------------------------------------------------------------------
886
887// Protocol logic
888
889void
891{
892 // a shutdown was initiated before the handshare, there is nothing to do
893 if (shutdown_)
894 return tryAsyncShutdown();
895
897
898 // Send all the validator lists that have been loaded
900 {
902 [&](std::string const& manifest,
903 std::uint32_t version,
905 PublicKey const& pubKey,
906 std::size_t maxSequence,
907 uint256 const& hash) {
909 *this,
910 0,
911 pubKey,
912 maxSequence,
913 version,
914 manifest,
915 blobInfos,
917 p_journal_);
918
919 // Don't send it next time.
921 });
922 }
923
924 if (auto m = overlay_.getManifestsMessage())
925 send(m);
926
927 setTimer(peerTimerInterval);
928}
929
930// Called repeatedly with protocol message data
931void
933{
934 XRPL_ASSERT(
935 strand_.running_in_this_thread(),
936 "ripple::PeerImp::onReadMessage : strand in this thread");
937
938 readPending_ = false;
939
940 if (!socket_.is_open())
941 return;
942
943 if (ec)
944 {
945 if (ec == boost::asio::error::eof)
946 {
947 JLOG(journal_.debug()) << "EOF";
948 return shutdown();
949 }
950
951 if (ec == boost::asio::error::operation_aborted)
952 return tryAsyncShutdown();
953
954 return fail("onReadMessage", ec);
955 }
956 // we started shutdown, no reason to process further data
957 if (shutdown_)
958 return tryAsyncShutdown();
959
960 if (auto stream = journal_.trace())
961 {
962 stream << "onReadMessage: "
963 << (bytes_transferred > 0
964 ? to_string(bytes_transferred) + " bytes"
965 : "");
966 }
967
968 metrics_.recv.add_message(bytes_transferred);
969
970 read_buffer_.commit(bytes_transferred);
971
972 auto hint = Tuning::readBufferBytes;
973
974 while (read_buffer_.size() > 0)
975 {
976 std::size_t bytes_consumed;
977
978 using namespace std::chrono_literals;
979 std::tie(bytes_consumed, ec) = perf::measureDurationAndLog(
980 [&]() {
981 return invokeProtocolMessage(read_buffer_.data(), *this, hint);
982 },
983 "invokeProtocolMessage",
984 350ms,
985 journal_);
986
987 if (!socket_.is_open())
988 return;
989
990 // the error_code is produced by invokeProtocolMessage
991 // it could be due to a bad message
992 if (ec)
993 return fail("onReadMessage", ec);
994
995 if (bytes_consumed == 0)
996 break;
997
998 read_buffer_.consume(bytes_consumed);
999 }
1000
1001 // check if a shutdown was initiated while processing messages
1002 if (shutdown_)
1003 return tryAsyncShutdown();
1004
1005 readPending_ = true;
1006
1007 XRPL_ASSERT(
1008 !shutdownStarted_, "ripple::PeerImp::onReadMessage : shutdown started");
1009
1010 // Timeout on writes only
1011 stream_.async_read_some(
1013 bind_executor(
1014 strand_,
1015 std::bind(
1018 std::placeholders::_1,
1019 std::placeholders::_2)));
1020}
1021
1022void
1024{
1025 XRPL_ASSERT(
1026 strand_.running_in_this_thread(),
1027 "ripple::PeerImp::onWriteMessage : strand in this thread");
1028
1029 writePending_ = false;
1030
1031 if (!socket_.is_open())
1032 return;
1033
1034 if (ec)
1035 {
1036 if (ec == boost::asio::error::operation_aborted)
1037 return tryAsyncShutdown();
1038
1039 return fail("onWriteMessage", ec);
1040 }
1041
1042 if (auto stream = journal_.trace())
1043 {
1044 stream << "onWriteMessage: "
1045 << (bytes_transferred > 0
1046 ? to_string(bytes_transferred) + " bytes"
1047 : "");
1048 }
1049
1050 metrics_.sent.add_message(bytes_transferred);
1051
1052 XRPL_ASSERT(
1053 !send_queue_.empty(),
1054 "ripple::PeerImp::onWriteMessage : non-empty send buffer");
1055 send_queue_.pop();
1056
1057 if (shutdown_)
1058 return tryAsyncShutdown();
1059
1060 if (!send_queue_.empty())
1061 {
1062 writePending_ = true;
1063 XRPL_ASSERT(
1065 "ripple::PeerImp::onWriteMessage : shutdown started");
1066
1067 // Timeout on writes only
1068 return boost::asio::async_write(
1069 stream_,
1070 boost::asio::buffer(
1071 send_queue_.front()->getBuffer(compressionEnabled_)),
1072 bind_executor(
1073 strand_,
1074 std::bind(
1077 std::placeholders::_1,
1078 std::placeholders::_2)));
1079 }
1080}
1081
1082//------------------------------------------------------------------------------
1083//
1084// ProtocolHandler
1085//
1086//------------------------------------------------------------------------------
1087
1088void
1090{
1091 // TODO
1092}
1093
1094void
1096 std::uint16_t type,
1098 std::size_t size,
1099 std::size_t uncompressed_size,
1100 bool isCompressed)
1101{
1102 auto const name = protocolMessageName(type);
1105
1106 auto const category = TrafficCount::categorize(
1107 *m, static_cast<protocol::MessageType>(type), true);
1108
1109 // report total incoming traffic
1111 TrafficCount::category::total, static_cast<int>(size));
1112
1113 // increase the traffic received for a specific category
1114 overlay_.reportInboundTraffic(category, static_cast<int>(size));
1115
1116 using namespace protocol;
1117 if ((type == MessageType::mtTRANSACTION ||
1118 type == MessageType::mtHAVE_TRANSACTIONS ||
1119 type == MessageType::mtTRANSACTIONS ||
1120 // GET_OBJECTS
1122 // GET_LEDGER
1125 // LEDGER_DATA
1129 {
1131 static_cast<MessageType>(type), static_cast<std::uint64_t>(size));
1132 }
1133 JLOG(journal_.trace()) << "onMessageBegin: " << type << " " << size << " "
1134 << uncompressed_size << " " << isCompressed;
1135}
1136
1137void
1145
1146void
1148{
1149 auto const s = m->list_size();
1150
1151 if (s == 0)
1152 {
1154 return;
1155 }
1156
1157 if (s > 100)
1159
1161 jtMANIFEST, "receiveManifests", [this, that = shared_from_this(), m]() {
1162 overlay_.onManifests(m, that);
1163 });
1164}
1165
1166void
1168{
1169 if (m->type() == protocol::TMPing::ptPING)
1170 {
1171 // We have received a ping request, reply with a pong
1173 m->set_type(protocol::TMPing::ptPONG);
1174 send(std::make_shared<Message>(*m, protocol::mtPING));
1175 return;
1176 }
1177
1178 if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
1179 {
1180 // Only reset the ping sequence if we actually received a
1181 // PONG with the correct cookie. That way, any peers which
1182 // respond with incorrect cookies will eventually time out.
1183 if (m->seq() == lastPingSeq_)
1184 {
1186
1187 // Update latency estimate
1188 auto const rtt = std::chrono::round<std::chrono::milliseconds>(
1190
1192
1193 if (latency_)
1194 latency_ = (*latency_ * 7 + rtt) / 8;
1195 else
1196 latency_ = rtt;
1197 }
1198
1199 return;
1200 }
1201}
1202
1203void
1205{
1206 // VFALCO NOTE I think we should drop the peer immediately
1207 if (!cluster())
1208 {
1209 fee_.update(Resource::feeUselessData, "unknown cluster");
1210 return;
1211 }
1212
1213 for (int i = 0; i < m->clusternodes().size(); ++i)
1214 {
1215 protocol::TMClusterNode const& node = m->clusternodes(i);
1216
1218 if (node.has_nodename())
1219 name = node.nodename();
1220
1221 auto const publicKey =
1222 parseBase58<PublicKey>(TokenType::NodePublic, node.publickey());
1223
1224 // NIKB NOTE We should drop the peer immediately if
1225 // they send us a public key we can't parse
1226 if (publicKey)
1227 {
1228 auto const reportTime =
1229 NetClock::time_point{NetClock::duration{node.reporttime()}};
1230
1232 *publicKey, name, node.nodeload(), reportTime);
1233 }
1234 }
1235
1236 int loadSources = m->loadsources().size();
1237 if (loadSources != 0)
1238 {
1239 Resource::Gossip gossip;
1240 gossip.items.reserve(loadSources);
1241 for (int i = 0; i < m->loadsources().size(); ++i)
1242 {
1243 protocol::TMLoadSource const& node = m->loadsources(i);
1245 item.address = beast::IP::Endpoint::from_string(node.name());
1246 item.balance = node.cost();
1247 if (item.address != beast::IP::Endpoint())
1248 gossip.items.push_back(item);
1249 }
1251 }
1252
1253 // Calculate the cluster fee:
1254 auto const thresh = app_.timeKeeper().now() - 90s;
1255 std::uint32_t clusterFee = 0;
1256
1258 fees.reserve(app_.cluster().size());
1259
1260 app_.cluster().for_each([&fees, thresh](ClusterNode const& status) {
1261 if (status.getReportTime() >= thresh)
1262 fees.push_back(status.getLoadFee());
1263 });
1264
1265 if (!fees.empty())
1266 {
1267 auto const index = fees.size() / 2;
1268 std::nth_element(fees.begin(), fees.begin() + index, fees.end());
1269 clusterFee = fees[index];
1270 }
1271
1272 app_.getFeeTrack().setClusterFee(clusterFee);
1273}
1274
1275void
1277{
1278 // Don't allow endpoints from peers that are not known tracking or are
1279 // not using a version of the message that we support:
1280 if (tracking_.load() != Tracking::converged || m->version() != 2)
1281 return;
1282
1283 // The number is arbitrary and doesn't have any real significance or
1284 // implication for the protocol.
1285 if (m->endpoints_v2().size() >= 1024)
1286 {
1287 fee_.update(Resource::feeUselessData, "endpoints too large");
1288 return;
1289 }
1290
1292 endpoints.reserve(m->endpoints_v2().size());
1293
1294 auto malformed = 0;
1295 for (auto const& tm : m->endpoints_v2())
1296 {
1297 auto result = beast::IP::Endpoint::from_string_checked(tm.endpoint());
1298
1299 if (!result)
1300 {
1301 JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {"
1302 << tm.endpoint() << "}";
1303 malformed++;
1304 continue;
1305 }
1306
1307 // If hops == 0, this Endpoint describes the peer we are connected
1308 // to -- in that case, we take the remote address seen on the
1309 // socket and store that in the IP::Endpoint. If this is the first
1310 // time, then we'll verify that their listener can receive incoming
1311 // by performing a connectivity test. if hops > 0, then we just
1312 // take the address/port we were given
1313 if (tm.hops() == 0)
1314 result = remote_address_.at_port(result->port());
1315
1316 endpoints.emplace_back(*result, tm.hops());
1317 }
1318
1319 // Charge the peer for each malformed endpoint. As there still may be
1320 // multiple valid endpoints we don't return early.
1321 if (malformed > 0)
1322 {
1323 fee_.update(
1324 Resource::feeInvalidData * malformed,
1325 std::to_string(malformed) + " malformed endpoints");
1326 }
1327
1328 if (!endpoints.empty())
1329 overlay_.peerFinder().on_endpoints(slot_, endpoints);
1330}
1331
1332void
1337
1338void
1341 bool eraseTxQueue,
1342 bool batch)
1343{
1344 XRPL_ASSERT(
1345 eraseTxQueue != batch,
1346 ("ripple::PeerImp::handleTransaction : valid inputs"));
1348 return;
1349
1351 {
1352 // If we've never been in synch, there's nothing we can do
1353 // with a transaction
1354 JLOG(p_journal_.debug()) << "Ignoring incoming transaction: "
1355 << "Need network ledger";
1356 return;
1357 }
1358
1359 SerialIter sit(makeSlice(m->rawtransaction()));
1360
1361 try
1362 {
1363 auto stx = std::make_shared<STTx const>(sit);
1364 uint256 txID = stx->getTransactionID();
1365
1366 // Charge strongly for attempting to relay a txn with tfInnerBatchTxn
1367 // LCOV_EXCL_START
1368 if (stx->isFlag(tfInnerBatchTxn) &&
1369 getCurrentTransactionRules()->enabled(featureBatch))
1370 {
1371 JLOG(p_journal_.warn()) << "Ignoring Network relayed Tx containing "
1372 "tfInnerBatchTxn (handleTransaction).";
1373 fee_.update(Resource::feeModerateBurdenPeer, "inner batch txn");
1374 return;
1375 }
1376 // LCOV_EXCL_STOP
1377
1378 HashRouterFlags flags;
1379 constexpr std::chrono::seconds tx_interval = 10s;
1380
1381 if (!app_.getHashRouter().shouldProcess(txID, id_, flags, tx_interval))
1382 {
1383 // we have seen this transaction recently
1384 if (any(flags & HashRouterFlags::BAD))
1385 {
1386 fee_.update(Resource::feeUselessData, "known bad");
1387 JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
1388 }
1389
1390 // Erase only if the server has seen this tx. If the server has not
1391 // seen this tx then the tx could not has been queued for this peer.
1392 else if (eraseTxQueue && txReduceRelayEnabled())
1393 removeTxQueue(txID);
1394
1398
1399 return;
1400 }
1401
1402 JLOG(p_journal_.debug()) << "Got tx " << txID;
1403
1404 bool checkSignature = true;
1405 if (cluster())
1406 {
1407 if (!m->has_deferred() || !m->deferred())
1408 {
1409 // Skip local checks if a server we trust
1410 // put the transaction in its open ledger
1411 flags |= HashRouterFlags::TRUSTED;
1412 }
1413
1414 // for non-validator nodes only -- localPublicKey is set for
1415 // validators only
1417 {
1418 // For now, be paranoid and have each validator
1419 // check each transaction, regardless of source
1420 checkSignature = false;
1421 }
1422 }
1423
1425 {
1426 JLOG(p_journal_.trace())
1427 << "No new transactions until synchronized";
1428 }
1429 else if (
1432 {
1434 JLOG(p_journal_.info()) << "Transaction queue is full";
1435 }
1436 else
1437 {
1440 "recvTransaction->checkTransaction",
1442 flags,
1443 checkSignature,
1444 batch,
1445 stx]() {
1446 if (auto peer = weak.lock())
1447 peer->checkTransaction(
1448 flags, checkSignature, stx, batch);
1449 });
1450 }
1451 }
1452 catch (std::exception const& ex)
1453 {
1454 JLOG(p_journal_.warn())
1455 << "Transaction invalid: " << strHex(m->rawtransaction())
1456 << ". Exception: " << ex.what();
1457 }
1458}
1459
1460void
1462{
1463 auto badData = [&](std::string const& msg) {
1464 fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
1465 JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
1466 };
1467 auto const itype{m->itype()};
1468
1469 // Verify ledger info type
1470 if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
1471 return badData("Invalid ledger info type");
1472
1473 auto const ltype = [&m]() -> std::optional<::protocol::TMLedgerType> {
1474 if (m->has_ltype())
1475 return m->ltype();
1476 return std::nullopt;
1477 }();
1478
1479 if (itype == protocol::liTS_CANDIDATE)
1480 {
1481 if (!m->has_ledgerhash())
1482 return badData("Invalid TX candidate set, missing TX set hash");
1483 }
1484 else if (
1485 !m->has_ledgerhash() && !m->has_ledgerseq() &&
1486 !(ltype && *ltype == protocol::ltCLOSED))
1487 {
1488 return badData("Invalid request");
1489 }
1490
1491 // Verify ledger type
1492 if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
1493 return badData("Invalid ledger type");
1494
1495 // Verify ledger hash
1496 if (m->has_ledgerhash() && !stringIsUint256Sized(m->ledgerhash()))
1497 return badData("Invalid ledger hash");
1498
1499 // Verify ledger sequence
1500 if (m->has_ledgerseq())
1501 {
1502 auto const ledgerSeq{m->ledgerseq()};
1503
1504 // Check if within a reasonable range
1505 using namespace std::chrono_literals;
1507 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1508 {
1509 return badData(
1510 "Invalid ledger sequence " + std::to_string(ledgerSeq));
1511 }
1512 }
1513
1514 // Verify ledger node IDs
1515 if (itype != protocol::liBASE)
1516 {
1517 if (m->nodeids_size() <= 0)
1518 return badData("Invalid ledger node IDs");
1519
1520 for (auto const& nodeId : m->nodeids())
1521 {
1523 return badData("Invalid SHAMap node ID");
1524 }
1525 }
1526
1527 // Verify query type
1528 if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
1529 return badData("Invalid query type");
1530
1531 // Verify query depth
1532 if (m->has_querydepth())
1533 {
1534 if (m->querydepth() > Tuning::maxQueryDepth ||
1535 itype == protocol::liBASE)
1536 {
1537 return badData("Invalid query depth");
1538 }
1539 }
1540
1541 // Queue a job to process the request
1543 app_.getJobQueue().addJob(jtLEDGER_REQ, "recvGetLedger", [weak, m]() {
1544 if (auto peer = weak.lock())
1545 peer->processLedgerRequest(m);
1546 });
1547}
1548
1549void
1551{
1552 JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
1554 {
1555 fee_.update(
1556 Resource::feeMalformedRequest, "proof_path_request disabled");
1557 return;
1558 }
1559
1560 fee_.update(
1561 Resource::feeModerateBurdenPeer, "received a proof path request");
1564 jtREPLAY_REQ, "recvProofPathRequest", [weak, m]() {
1565 if (auto peer = weak.lock())
1566 {
1567 auto reply =
1568 peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
1569 if (reply.has_error())
1570 {
1571 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1572 peer->charge(
1573 Resource::feeMalformedRequest,
1574 "proof_path_request");
1575 else
1576 peer->charge(
1577 Resource::feeRequestNoReply, "proof_path_request");
1578 }
1579 else
1580 {
1581 peer->send(std::make_shared<Message>(
1582 reply, protocol::mtPROOF_PATH_RESPONSE));
1583 }
1584 }
1585 });
1586}
1587
1588void
1590{
1591 if (!ledgerReplayEnabled_)
1592 {
1593 fee_.update(
1594 Resource::feeMalformedRequest, "proof_path_response disabled");
1595 return;
1596 }
1597
1598 if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
1599 {
1600 fee_.update(Resource::feeInvalidData, "proof_path_response");
1601 }
1602}
1603
1604void
1606{
1607 JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
1608 if (!ledgerReplayEnabled_)
1609 {
1610 fee_.update(
1611 Resource::feeMalformedRequest, "replay_delta_request disabled");
1612 return;
1613 }
1614
1615 fee_.fee = Resource::feeModerateBurdenPeer;
1616 std::weak_ptr<PeerImp> weak = shared_from_this();
1617 app_.getJobQueue().addJob(
1618 jtREPLAY_REQ, "recvReplayDeltaRequest", [weak, m]() {
1619 if (auto peer = weak.lock())
1620 {
1621 auto reply =
1622 peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
1623 if (reply.has_error())
1624 {
1625 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1626 peer->charge(
1627 Resource::feeMalformedRequest,
1628 "replay_delta_request");
1629 else
1630 peer->charge(
1631 Resource::feeRequestNoReply,
1632 "replay_delta_request");
1633 }
1634 else
1635 {
1636 peer->send(std::make_shared<Message>(
1637 reply, protocol::mtREPLAY_DELTA_RESPONSE));
1638 }
1639 }
1640 });
1641}
1642
1643void
1645{
1646 if (!ledgerReplayEnabled_)
1647 {
1648 fee_.update(
1649 Resource::feeMalformedRequest, "replay_delta_response disabled");
1650 return;
1651 }
1652
1653 if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
1654 {
1655 fee_.update(Resource::feeInvalidData, "replay_delta_response");
1656 }
1657}
1658
1659void
1661{
1662 auto badData = [&](std::string const& msg) {
1663 fee_.update(Resource::feeInvalidData, msg);
1664 JLOG(p_journal_.warn()) << "TMLedgerData: " << msg;
1665 };
1666
1667 // Verify ledger hash
1668 if (!stringIsUint256Sized(m->ledgerhash()))
1669 return badData("Invalid ledger hash");
1670
1671 // Verify ledger sequence
1672 {
1673 auto const ledgerSeq{m->ledgerseq()};
1674 if (m->type() == protocol::liTS_CANDIDATE)
1675 {
1676 if (ledgerSeq != 0)
1677 {
1678 return badData(
1679 "Invalid ledger sequence " + std::to_string(ledgerSeq));
1680 }
1681 }
1682 else
1683 {
1684 // Check if within a reasonable range
1685 using namespace std::chrono_literals;
1686 if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1687 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1688 {
1689 return badData(
1690 "Invalid ledger sequence " + std::to_string(ledgerSeq));
1691 }
1692 }
1693 }
1694
1695 // Verify ledger info type
1696 if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
1697 return badData("Invalid ledger info type");
1698
1699 // Verify reply error
1700 if (m->has_error() &&
1701 (m->error() < protocol::reNO_LEDGER ||
1702 m->error() > protocol::reBAD_REQUEST))
1703 {
1704 return badData("Invalid reply error");
1705 }
1706
1707 // Verify ledger nodes.
1708 if (m->nodes_size() <= 0 || m->nodes_size() > Tuning::hardMaxReplyNodes)
1709 {
1710 return badData(
1711 "Invalid Ledger/TXset nodes " + std::to_string(m->nodes_size()));
1712 }
1713
1714 // If there is a request cookie, attempt to relay the message
1715 if (m->has_requestcookie())
1716 {
1717 if (auto peer = overlay_.findPeerByShortID(m->requestcookie()))
1718 {
1719 m->clear_requestcookie();
1720 peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
1721 }
1722 else
1723 {
1724 JLOG(p_journal_.info()) << "Unable to route TX/ledger data reply";
1725 }
1726 return;
1727 }
1728
1729 uint256 const ledgerHash{m->ledgerhash()};
1730
1731 // Otherwise check if received data for a candidate transaction set
1732 if (m->type() == protocol::liTS_CANDIDATE)
1733 {
1734 std::weak_ptr<PeerImp> weak{shared_from_this()};
1735 app_.getJobQueue().addJob(
1736 jtTXN_DATA, "recvPeerData", [weak, ledgerHash, m]() {
1737 if (auto peer = weak.lock())
1738 {
1739 peer->app_.getInboundTransactions().gotData(
1740 ledgerHash, peer, m);
1741 }
1742 });
1743 return;
1744 }
1745
1746 // Consume the message
1747 app_.getInboundLedgers().gotLedgerData(ledgerHash, shared_from_this(), m);
1748}
1749
1750void
1752{
1753 protocol::TMProposeSet& set = *m;
1754
1755 auto const sig = makeSlice(set.signature());
1756
1757 // Preliminary check for the validity of the signature: A DER encoded
1758 // signature can't be longer than 72 bytes.
1759 if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
1760 (publicKeyType(makeSlice(set.nodepubkey())) != KeyType::secp256k1))
1761 {
1762 JLOG(p_journal_.warn()) << "Proposal: malformed";
1763 fee_.update(
1764 Resource::feeInvalidSignature,
1765 " signature can't be longer than 72 bytes");
1766 return;
1767 }
1768
1769 if (!stringIsUint256Sized(set.currenttxhash()) ||
1770 !stringIsUint256Sized(set.previousledger()))
1771 {
1772 JLOG(p_journal_.warn()) << "Proposal: malformed";
1773 fee_.update(Resource::feeMalformedRequest, "bad hashes");
1774 return;
1775 }
1776
1777 // RH TODO: when isTrusted = false we should probably also cache a key
1778 // suppression for 30 seconds to avoid doing a relatively expensive lookup
1779 // every time a spam packet is received
1780 PublicKey const publicKey{makeSlice(set.nodepubkey())};
1781 auto const isTrusted = app_.validators().trusted(publicKey);
1782
1783 // If the operator has specified that untrusted proposals be dropped then
1784 // this happens here I.e. before further wasting CPU verifying the signature
1785 // of an untrusted key
1786 if (!isTrusted)
1787 {
1788 // report untrusted proposal messages
1789 overlay_.reportInboundTraffic(
1790 TrafficCount::category::proposal_untrusted,
1791 Message::messageSize(*m));
1792
1793 if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
1794 return;
1795 }
1796
1797 uint256 const proposeHash{set.currenttxhash()};
1798 uint256 const prevLedger{set.previousledger()};
1799
1800 NetClock::time_point const closeTime{NetClock::duration{set.closetime()}};
1801
1802 uint256 const suppression = proposalUniqueId(
1803 proposeHash,
1804 prevLedger,
1805 set.proposeseq(),
1806 closeTime,
1807 publicKey.slice(),
1808 sig);
1809
1810 if (auto [added, relayed] =
1811 app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
1812 !added)
1813 {
1814 // Count unique messages (Slots has it's own 'HashRouter'), which a peer
1815 // receives within IDLED seconds since the message has been relayed.
1816 if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
1817 overlay_.updateSlotAndSquelch(
1818 suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
1819
1820 // report duplicate proposal messages
1821 overlay_.reportInboundTraffic(
1822 TrafficCount::category::proposal_duplicate,
1823 Message::messageSize(*m));
1824
1825 JLOG(p_journal_.trace()) << "Proposal: duplicate";
1826
1827 return;
1828 }
1829
1830 if (!isTrusted)
1831 {
1832 if (tracking_.load() == Tracking::diverged)
1833 {
1834 JLOG(p_journal_.debug())
1835 << "Proposal: Dropping untrusted (peer divergence)";
1836 return;
1837 }
1838
1839 if (!cluster() && app_.getFeeTrack().isLoadedLocal())
1840 {
1841 JLOG(p_journal_.debug()) << "Proposal: Dropping untrusted (load)";
1842 return;
1843 }
1844 }
1845
1846 JLOG(p_journal_.trace())
1847 << "Proposal: " << (isTrusted ? "trusted" : "untrusted");
1848
1849 auto proposal = RCLCxPeerPos(
1850 publicKey,
1851 sig,
1852 suppression,
1854 prevLedger,
1855 set.proposeseq(),
1856 proposeHash,
1857 closeTime,
1858 app_.timeKeeper().closeTime(),
1859 calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
1860
1861 std::weak_ptr<PeerImp> weak = shared_from_this();
1862 app_.getJobQueue().addJob(
1863 isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
1864 "recvPropose->checkPropose",
1865 [weak, isTrusted, m, proposal]() {
1866 if (auto peer = weak.lock())
1867 peer->checkPropose(isTrusted, m, proposal);
1868 });
1869}
1870
1871void
1873{
1874 JLOG(p_journal_.trace()) << "Status: Change";
1875
1876 if (!m->has_networktime())
1877 m->set_networktime(app_.timeKeeper().now().time_since_epoch().count());
1878
1879 {
1880 std::lock_guard sl(recentLock_);
1881 if (!last_status_.has_newstatus() || m->has_newstatus())
1882 last_status_ = *m;
1883 else
1884 {
1885 // preserve old status
1886 protocol::NodeStatus status = last_status_.newstatus();
1887 last_status_ = *m;
1888 m->set_newstatus(status);
1889 }
1890 }
1891
1892 if (m->newevent() == protocol::neLOST_SYNC)
1893 {
1894 bool outOfSync{false};
1895 {
1896 // Operations on closedLedgerHash_ and previousLedgerHash_ must be
1897 // guarded by recentLock_.
1898 std::lock_guard sl(recentLock_);
1899 if (!closedLedgerHash_.isZero())
1900 {
1901 outOfSync = true;
1902 closedLedgerHash_.zero();
1903 }
1904 previousLedgerHash_.zero();
1905 }
1906 if (outOfSync)
1907 {
1908 JLOG(p_journal_.debug()) << "Status: Out of sync";
1909 }
1910 return;
1911 }
1912
1913 {
1914 uint256 closedLedgerHash{};
1915 bool const peerChangedLedgers{
1916 m->has_ledgerhash() && stringIsUint256Sized(m->ledgerhash())};
1917
1918 {
1919 // Operations on closedLedgerHash_ and previousLedgerHash_ must be
1920 // guarded by recentLock_.
1921 std::lock_guard sl(recentLock_);
1922 if (peerChangedLedgers)
1923 {
1924 closedLedgerHash_ = m->ledgerhash();
1925 closedLedgerHash = closedLedgerHash_;
1926 addLedger(closedLedgerHash, sl);
1927 }
1928 else
1929 {
1930 closedLedgerHash_.zero();
1931 }
1932
1933 if (m->has_ledgerhashprevious() &&
1934 stringIsUint256Sized(m->ledgerhashprevious()))
1935 {
1936 previousLedgerHash_ = m->ledgerhashprevious();
1937 addLedger(previousLedgerHash_, sl);
1938 }
1939 else
1940 {
1941 previousLedgerHash_.zero();
1942 }
1943 }
1944 if (peerChangedLedgers)
1945 {
1946 JLOG(p_journal_.debug()) << "LCL is " << closedLedgerHash;
1947 }
1948 else
1949 {
1950 JLOG(p_journal_.debug()) << "Status: No ledger";
1951 }
1952 }
1953
1954 if (m->has_firstseq() && m->has_lastseq())
1955 {
1956 std::lock_guard sl(recentLock_);
1957
1958 minLedger_ = m->firstseq();
1959 maxLedger_ = m->lastseq();
1960
1961 if ((maxLedger_ < minLedger_) || (minLedger_ == 0) || (maxLedger_ == 0))
1962 minLedger_ = maxLedger_ = 0;
1963 }
1964
1965 if (m->has_ledgerseq() &&
1966 app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
1967 {
1968 checkTracking(
1969 m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex());
1970 }
1971
1972 app_.getOPs().pubPeerStatus([=, this]() -> Json::Value {
1974
1975 if (m->has_newstatus())
1976 {
1977 switch (m->newstatus())
1978 {
1979 case protocol::nsCONNECTING:
1980 j[jss::status] = "CONNECTING";
1981 break;
1982 case protocol::nsCONNECTED:
1983 j[jss::status] = "CONNECTED";
1984 break;
1985 case protocol::nsMONITORING:
1986 j[jss::status] = "MONITORING";
1987 break;
1988 case protocol::nsVALIDATING:
1989 j[jss::status] = "VALIDATING";
1990 break;
1991 case protocol::nsSHUTTING:
1992 j[jss::status] = "SHUTTING";
1993 break;
1994 }
1995 }
1996
1997 if (m->has_newevent())
1998 {
1999 switch (m->newevent())
2000 {
2001 case protocol::neCLOSING_LEDGER:
2002 j[jss::action] = "CLOSING_LEDGER";
2003 break;
2004 case protocol::neACCEPTED_LEDGER:
2005 j[jss::action] = "ACCEPTED_LEDGER";
2006 break;
2007 case protocol::neSWITCHED_LEDGER:
2008 j[jss::action] = "SWITCHED_LEDGER";
2009 break;
2010 case protocol::neLOST_SYNC:
2011 j[jss::action] = "LOST_SYNC";
2012 break;
2013 }
2014 }
2015
2016 if (m->has_ledgerseq())
2017 {
2018 j[jss::ledger_index] = m->ledgerseq();
2019 }
2020
2021 if (m->has_ledgerhash())
2022 {
2023 uint256 closedLedgerHash{};
2024 {
2025 std::lock_guard sl(recentLock_);
2026 closedLedgerHash = closedLedgerHash_;
2027 }
2028 j[jss::ledger_hash] = to_string(closedLedgerHash);
2029 }
2030
2031 if (m->has_networktime())
2032 {
2033 j[jss::date] = Json::UInt(m->networktime());
2034 }
2035
2036 if (m->has_firstseq() && m->has_lastseq())
2037 {
2038 j[jss::ledger_index_min] = Json::UInt(m->firstseq());
2039 j[jss::ledger_index_max] = Json::UInt(m->lastseq());
2040 }
2041
2042 return j;
2043 });
2044}
2045
2046void
2047PeerImp::checkTracking(std::uint32_t validationSeq)
2048{
2049 std::uint32_t serverSeq;
2050 {
2051 // Extract the sequence number of the highest
2052 // ledger this peer has
2053 std::lock_guard sl(recentLock_);
2054
2055 serverSeq = maxLedger_;
2056 }
2057 if (serverSeq != 0)
2058 {
2059 // Compare the peer's ledger sequence to the
2060 // sequence of a recently-validated ledger
2061 checkTracking(serverSeq, validationSeq);
2062 }
2063}
2064
2065void
2066PeerImp::checkTracking(std::uint32_t seq1, std::uint32_t seq2)
2067{
2068 int diff = std::max(seq1, seq2) - std::min(seq1, seq2);
2069
2070 if (diff < Tuning::convergedLedgerLimit)
2071 {
2072 // The peer's ledger sequence is close to the validation's
2073 tracking_ = Tracking::converged;
2074 }
2075
2076 if ((diff > Tuning::divergedLedgerLimit) &&
2077 (tracking_.load() != Tracking::diverged))
2078 {
2079 // The peer's ledger sequence is way off the validation's
2080 std::lock_guard sl(recentLock_);
2081
2082 tracking_ = Tracking::diverged;
2083 trackingTime_ = clock_type::now();
2084 }
2085}
2086
2087void
2089{
2090 if (!stringIsUint256Sized(m->hash()))
2091 {
2092 fee_.update(Resource::feeMalformedRequest, "bad hash");
2093 return;
2094 }
2095
2096 uint256 const hash{m->hash()};
2097
2098 if (m->status() == protocol::tsHAVE)
2099 {
2100 std::lock_guard sl(recentLock_);
2101
2102 if (std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
2103 recentTxSets_.end())
2104 {
2105 fee_.update(Resource::feeUselessData, "duplicate (tsHAVE)");
2106 return;
2107 }
2108
2109 recentTxSets_.push_back(hash);
2110 }
2111}
2112
2113void
2114PeerImp::onValidatorListMessage(
2115 std::string const& messageType,
2116 std::string const& manifest,
2117 std::uint32_t version,
2118 std::vector<ValidatorBlobInfo> const& blobs)
2119{
2120 // If there are no blobs, the message is malformed (possibly because of
2121 // ValidatorList class rules), so charge accordingly and skip processing.
2122 if (blobs.empty())
2123 {
2124 JLOG(p_journal_.warn()) << "Ignored malformed " << messageType;
2125 // This shouldn't ever happen with a well-behaved peer
2126 fee_.update(Resource::feeHeavyBurdenPeer, "no blobs");
2127 return;
2128 }
2129
2130 auto const hash = sha512Half(manifest, blobs, version);
2131
2132 JLOG(p_journal_.debug()) << "Received " << messageType;
2133
2134 if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
2135 {
2136 JLOG(p_journal_.debug())
2137 << messageType << ": received duplicate " << messageType;
2138 // Charging this fee here won't hurt the peer in the normal
2139 // course of operation (ie. refresh every 5 minutes), but
2140 // will add up if the peer is misbehaving.
2141 fee_.update(Resource::feeUselessData, "duplicate");
2142 return;
2143 }
2144
2145 auto const applyResult = app_.validators().applyListsAndBroadcast(
2146 manifest,
2147 version,
2148 blobs,
2149 remote_address_.to_string(),
2150 hash,
2151 app_.overlay(),
2152 app_.getHashRouter(),
2153 app_.getOPs());
2154
2155 JLOG(p_journal_.debug())
2156 << "Processed " << messageType << " version " << version << " from "
2157 << (applyResult.publisherKey ? strHex(*applyResult.publisherKey)
2158 : "unknown or invalid publisher")
2159 << " with best result " << to_string(applyResult.bestDisposition());
2160
2161 // Act based on the best result
2162 switch (applyResult.bestDisposition())
2163 {
2164 // New list
2165 case ListDisposition::accepted:
2166 // Newest list is expired, and that needs to be broadcast, too
2167 case ListDisposition::expired:
2168 // Future list
2169 case ListDisposition::pending: {
2170 std::lock_guard<std::mutex> sl(recentLock_);
2171
2172 XRPL_ASSERT(
2173 applyResult.publisherKey,
2174 "ripple::PeerImp::onValidatorListMessage : publisher key is "
2175 "set");
2176 auto const& pubKey = *applyResult.publisherKey;
2177#ifndef NDEBUG
2178 if (auto const iter = publisherListSequences_.find(pubKey);
2179 iter != publisherListSequences_.end())
2180 {
2181 XRPL_ASSERT(
2182 iter->second < applyResult.sequence,
2183 "ripple::PeerImp::onValidatorListMessage : lower sequence");
2184 }
2185#endif
2186 publisherListSequences_[pubKey] = applyResult.sequence;
2187 }
2188 break;
2189 case ListDisposition::same_sequence:
2190 case ListDisposition::known_sequence:
2191#ifndef NDEBUG
2192 {
2193 std::lock_guard<std::mutex> sl(recentLock_);
2194 XRPL_ASSERT(
2195 applyResult.sequence && applyResult.publisherKey,
2196 "ripple::PeerImp::onValidatorListMessage : nonzero sequence "
2197 "and set publisher key");
2198 XRPL_ASSERT(
2199 publisherListSequences_[*applyResult.publisherKey] <=
2200 applyResult.sequence,
2201 "ripple::PeerImp::onValidatorListMessage : maximum sequence");
2202 }
2203#endif // !NDEBUG
2204
2205 break;
2206 case ListDisposition::stale:
2207 case ListDisposition::untrusted:
2208 case ListDisposition::invalid:
2209 case ListDisposition::unsupported_version:
2210 break;
2211 // LCOV_EXCL_START
2212 default:
2213 UNREACHABLE(
2214 "ripple::PeerImp::onValidatorListMessage : invalid best list "
2215 "disposition");
2216 // LCOV_EXCL_STOP
2217 }
2218
2219 // Charge based on the worst result
2220 switch (applyResult.worstDisposition())
2221 {
2222 case ListDisposition::accepted:
2223 case ListDisposition::expired:
2224 case ListDisposition::pending:
2225 // No charges for good data
2226 break;
2227 case ListDisposition::same_sequence:
2228 case ListDisposition::known_sequence:
2229 // Charging this fee here won't hurt the peer in the normal
2230 // course of operation (ie. refresh every 5 minutes), but
2231 // will add up if the peer is misbehaving.
2232 fee_.update(
2233 Resource::feeUselessData,
2234 " duplicate (same_sequence or known_sequence)");
2235 break;
2236 case ListDisposition::stale:
2237 // There are very few good reasons for a peer to send an
2238 // old list, particularly more than once.
2239 fee_.update(Resource::feeInvalidData, "expired");
2240 break;
2241 case ListDisposition::untrusted:
2242 // Charging this fee here won't hurt the peer in the normal
2243 // course of operation (ie. refresh every 5 minutes), but
2244 // will add up if the peer is misbehaving.
2245 fee_.update(Resource::feeUselessData, "untrusted");
2246 break;
2247 case ListDisposition::invalid:
2248 // This shouldn't ever happen with a well-behaved peer
2249 fee_.update(
2250 Resource::feeInvalidSignature, "invalid list disposition");
2251 break;
2252 case ListDisposition::unsupported_version:
2253 // During a version transition, this may be legitimate.
2254 // If it happens frequently, that's probably bad.
2255 fee_.update(Resource::feeInvalidData, "version");
2256 break;
2257 // LCOV_EXCL_START
2258 default:
2259 UNREACHABLE(
2260 "ripple::PeerImp::onValidatorListMessage : invalid worst list "
2261 "disposition");
2262 // LCOV_EXCL_STOP
2263 }
2264
2265 // Log based on all the results.
2266 for (auto const& [disp, count] : applyResult.dispositions)
2267 {
2268 switch (disp)
2269 {
2270 // New list
2271 case ListDisposition::accepted:
2272 JLOG(p_journal_.debug())
2273 << "Applied " << count << " new " << messageType;
2274 break;
2275 // Newest list is expired, and that needs to be broadcast, too
2276 case ListDisposition::expired:
2277 JLOG(p_journal_.debug())
2278 << "Applied " << count << " expired " << messageType;
2279 break;
2280 // Future list
2281 case ListDisposition::pending:
2282 JLOG(p_journal_.debug())
2283 << "Processed " << count << " future " << messageType;
2284 break;
2285 case ListDisposition::same_sequence:
2286 JLOG(p_journal_.warn())
2287 << "Ignored " << count << " " << messageType
2288 << "(s) with current sequence";
2289 break;
2290 case ListDisposition::known_sequence:
2291 JLOG(p_journal_.warn())
2292 << "Ignored " << count << " " << messageType
2293 << "(s) with future sequence";
2294 break;
2295 case ListDisposition::stale:
2296 JLOG(p_journal_.warn())
2297 << "Ignored " << count << "stale " << messageType;
2298 break;
2299 case ListDisposition::untrusted:
2300 JLOG(p_journal_.warn())
2301 << "Ignored " << count << " untrusted " << messageType;
2302 break;
2303 case ListDisposition::unsupported_version:
2304 JLOG(p_journal_.warn())
2305 << "Ignored " << count << "unsupported version "
2306 << messageType;
2307 break;
2308 case ListDisposition::invalid:
2309 JLOG(p_journal_.warn())
2310 << "Ignored " << count << "invalid " << messageType;
2311 break;
2312 // LCOV_EXCL_START
2313 default:
2314 UNREACHABLE(
2315 "ripple::PeerImp::onValidatorListMessage : invalid list "
2316 "disposition");
2317 // LCOV_EXCL_STOP
2318 }
2319 }
2320}
2321
2322void
2324{
2325 try
2326 {
2327 if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
2328 {
2329 JLOG(p_journal_.debug())
2330 << "ValidatorList: received validator list from peer using "
2331 << "protocol version " << to_string(protocol_)
2332 << " which shouldn't support this feature.";
2333 fee_.update(Resource::feeUselessData, "unsupported peer");
2334 return;
2335 }
2336 onValidatorListMessage(
2337 "ValidatorList",
2338 m->manifest(),
2339 m->version(),
2340 ValidatorList::parseBlobs(*m));
2341 }
2342 catch (std::exception const& e)
2343 {
2344 JLOG(p_journal_.warn()) << "ValidatorList: Exception, " << e.what();
2345 using namespace std::string_literals;
2346 fee_.update(Resource::feeInvalidData, e.what());
2347 }
2348}
2349
2350void
2351PeerImp::onMessage(
2353{
2354 try
2355 {
2356 if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
2357 {
2358 JLOG(p_journal_.debug())
2359 << "ValidatorListCollection: received validator list from peer "
2360 << "using protocol version " << to_string(protocol_)
2361 << " which shouldn't support this feature.";
2362 fee_.update(Resource::feeUselessData, "unsupported peer");
2363 return;
2364 }
2365 else if (m->version() < 2)
2366 {
2367 JLOG(p_journal_.debug())
2368 << "ValidatorListCollection: received invalid validator list "
2369 "version "
2370 << m->version() << " from peer using protocol version "
2371 << to_string(protocol_);
2372 fee_.update(Resource::feeInvalidData, "wrong version");
2373 return;
2374 }
2375 onValidatorListMessage(
2376 "ValidatorListCollection",
2377 m->manifest(),
2378 m->version(),
2379 ValidatorList::parseBlobs(*m));
2380 }
2381 catch (std::exception const& e)
2382 {
2383 JLOG(p_journal_.warn())
2384 << "ValidatorListCollection: Exception, " << e.what();
2385 using namespace std::string_literals;
2386 fee_.update(Resource::feeInvalidData, e.what());
2387 }
2388}
2389
2390void
2392{
2393 if (m->validation().size() < 50)
2394 {
2395 JLOG(p_journal_.warn()) << "Validation: Too small";
2396 fee_.update(Resource::feeMalformedRequest, "too small");
2397 return;
2398 }
2399
2400 try
2401 {
2402 auto const closeTime = app_.timeKeeper().closeTime();
2403
2405 {
2406 SerialIter sit(makeSlice(m->validation()));
2408 std::ref(sit),
2409 [this](PublicKey const& pk) {
2410 return calcNodeID(
2411 app_.validatorManifests().getMasterKey(pk));
2412 },
2413 false);
2414 val->setSeen(closeTime);
2415 }
2416
2417 if (!isCurrent(
2418 app_.getValidations().parms(),
2419 app_.timeKeeper().closeTime(),
2420 val->getSignTime(),
2421 val->getSeenTime()))
2422 {
2423 JLOG(p_journal_.trace()) << "Validation: Not current";
2424 fee_.update(Resource::feeUselessData, "not current");
2425 return;
2426 }
2427
2428 // RH TODO: when isTrusted = false we should probably also cache a key
2429 // suppression for 30 seconds to avoid doing a relatively expensive
2430 // lookup every time a spam packet is received
2431 auto const isTrusted =
2432 app_.validators().trusted(val->getSignerPublic());
2433
2434 // If the operator has specified that untrusted validations be
2435 // dropped then this happens here I.e. before further wasting CPU
2436 // verifying the signature of an untrusted key
2437 if (!isTrusted)
2438 {
2439 // increase untrusted validations received
2440 overlay_.reportInboundTraffic(
2441 TrafficCount::category::validation_untrusted,
2442 Message::messageSize(*m));
2443
2444 if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
2445 return;
2446 }
2447
2448 auto key = sha512Half(makeSlice(m->validation()));
2449
2450 auto [added, relayed] =
2451 app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
2452
2453 if (!added)
2454 {
2455 // Count unique messages (Slots has it's own 'HashRouter'), which a
2456 // peer receives within IDLED seconds since the message has been
2457 // relayed.
2458 if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
2459 overlay_.updateSlotAndSquelch(
2460 key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
2461
2462 // increase duplicate validations received
2463 overlay_.reportInboundTraffic(
2464 TrafficCount::category::validation_duplicate,
2465 Message::messageSize(*m));
2466
2467 JLOG(p_journal_.trace()) << "Validation: duplicate";
2468 return;
2469 }
2470
2471 if (!isTrusted && (tracking_.load() == Tracking::diverged))
2472 {
2473 JLOG(p_journal_.debug())
2474 << "Dropping untrusted validation from diverged peer";
2475 }
2476 else if (isTrusted || !app_.getFeeTrack().isLoadedLocal())
2477 {
2478 std::string const name = [isTrusted, val]() {
2479 std::string ret =
2480 isTrusted ? "Trusted validation" : "Untrusted validation";
2481
2482#ifdef DEBUG
2483 ret += " " +
2484 std::to_string(val->getFieldU32(sfLedgerSequence)) + ": " +
2485 to_string(val->getNodeID());
2486#endif
2487
2488 return ret;
2489 }();
2490
2491 std::weak_ptr<PeerImp> weak = shared_from_this();
2492 app_.getJobQueue().addJob(
2493 isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
2494 name,
2495 [weak, val, m, key]() {
2496 if (auto peer = weak.lock())
2497 peer->checkValidation(val, key, m);
2498 });
2499 }
2500 else
2501 {
2502 JLOG(p_journal_.debug())
2503 << "Dropping untrusted validation for load";
2504 }
2505 }
2506 catch (std::exception const& e)
2507 {
2508 JLOG(p_journal_.warn())
2509 << "Exception processing validation: " << e.what();
2510 using namespace std::string_literals;
2511 fee_.update(Resource::feeMalformedRequest, e.what());
2512 }
2513}
2514
2515void
2517{
2518 protocol::TMGetObjectByHash& packet = *m;
2519
2520 JLOG(p_journal_.trace()) << "received TMGetObjectByHash " << packet.type()
2521 << " " << packet.objects_size();
2522
2523 if (packet.query())
2524 {
2525 // this is a query
2526 if (send_queue_.size() >= Tuning::dropSendQueue)
2527 {
2528 JLOG(p_journal_.debug()) << "GetObject: Large send queue";
2529 return;
2530 }
2531
2532 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2533 {
2534 doFetchPack(m);
2535 return;
2536 }
2537
2538 if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
2539 {
2540 if (!txReduceRelayEnabled())
2541 {
2542 JLOG(p_journal_.error())
2543 << "TMGetObjectByHash: tx reduce-relay is disabled";
2544 fee_.update(Resource::feeMalformedRequest, "disabled");
2545 return;
2546 }
2547
2548 std::weak_ptr<PeerImp> weak = shared_from_this();
2549 app_.getJobQueue().addJob(
2550 jtREQUESTED_TXN, "doTransactions", [weak, m]() {
2551 if (auto peer = weak.lock())
2552 peer->doTransactions(m);
2553 });
2554 return;
2555 }
2556
2557 protocol::TMGetObjectByHash reply;
2558
2559 reply.set_query(false);
2560
2561 if (packet.has_seq())
2562 reply.set_seq(packet.seq());
2563
2564 reply.set_type(packet.type());
2565
2566 if (packet.has_ledgerhash())
2567 {
2568 if (!stringIsUint256Sized(packet.ledgerhash()))
2569 {
2570 fee_.update(Resource::feeMalformedRequest, "ledger hash");
2571 return;
2572 }
2573
2574 reply.set_ledgerhash(packet.ledgerhash());
2575 }
2576
2577 fee_.update(
2578 Resource::feeModerateBurdenPeer,
2579 " received a get object by hash request");
2580
2581 // This is a very minimal implementation
2582 for (int i = 0; i < packet.objects_size(); ++i)
2583 {
2584 auto const& obj = packet.objects(i);
2585 if (obj.has_hash() && stringIsUint256Sized(obj.hash()))
2586 {
2587 uint256 const hash{obj.hash()};
2588 // VFALCO TODO Move this someplace more sensible so we dont
2589 // need to inject the NodeStore interfaces.
2590 std::uint32_t seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
2591 auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
2592 if (nodeObject)
2593 {
2594 protocol::TMIndexedObject& newObj = *reply.add_objects();
2595 newObj.set_hash(hash.begin(), hash.size());
2596 newObj.set_data(
2597 &nodeObject->getData().front(),
2598 nodeObject->getData().size());
2599
2600 if (obj.has_nodeid())
2601 newObj.set_index(obj.nodeid());
2602 if (obj.has_ledgerseq())
2603 newObj.set_ledgerseq(obj.ledgerseq());
2604
2605 // VFALCO NOTE "seq" in the message is obsolete
2606 }
2607 }
2608 }
2609
2610 JLOG(p_journal_.trace()) << "GetObj: " << reply.objects_size() << " of "
2611 << packet.objects_size();
2612 send(std::make_shared<Message>(reply, protocol::mtGET_OBJECTS));
2613 }
2614 else
2615 {
2616 // this is a reply
2617 std::uint32_t pLSeq = 0;
2618 bool pLDo = true;
2619 bool progress = false;
2620
2621 for (int i = 0; i < packet.objects_size(); ++i)
2622 {
2623 protocol::TMIndexedObject const& obj = packet.objects(i);
2624
2625 if (obj.has_hash() && stringIsUint256Sized(obj.hash()))
2626 {
2627 if (obj.has_ledgerseq())
2628 {
2629 if (obj.ledgerseq() != pLSeq)
2630 {
2631 if (pLDo && (pLSeq != 0))
2632 {
2633 JLOG(p_journal_.debug())
2634 << "GetObj: Full fetch pack for " << pLSeq;
2635 }
2636 pLSeq = obj.ledgerseq();
2637 pLDo = !app_.getLedgerMaster().haveLedger(pLSeq);
2638
2639 if (!pLDo)
2640 {
2641 JLOG(p_journal_.debug())
2642 << "GetObj: Late fetch pack for " << pLSeq;
2643 }
2644 else
2645 progress = true;
2646 }
2647 }
2648
2649 if (pLDo)
2650 {
2651 uint256 const hash{obj.hash()};
2652
2653 app_.getLedgerMaster().addFetchPack(
2654 hash,
2656 obj.data().begin(), obj.data().end()));
2657 }
2658 }
2659 }
2660
2661 if (pLDo && (pLSeq != 0))
2662 {
2663 JLOG(p_journal_.debug())
2664 << "GetObj: Partial fetch pack for " << pLSeq;
2665 }
2666 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2667 app_.getLedgerMaster().gotFetchPack(progress, pLSeq);
2668 }
2669}
2670
2671void
2673{
2674 if (!txReduceRelayEnabled())
2675 {
2676 JLOG(p_journal_.error())
2677 << "TMHaveTransactions: tx reduce-relay is disabled";
2678 fee_.update(Resource::feeMalformedRequest, "disabled");
2679 return;
2680 }
2681
2682 std::weak_ptr<PeerImp> weak = shared_from_this();
2683 app_.getJobQueue().addJob(
2684 jtMISSING_TXN, "handleHaveTransactions", [weak, m]() {
2685 if (auto peer = weak.lock())
2686 peer->handleHaveTransactions(m);
2687 });
2688}
2689
2690void
2691PeerImp::handleHaveTransactions(
2693{
2694 protocol::TMGetObjectByHash tmBH;
2695 tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
2696 tmBH.set_query(true);
2697
2698 JLOG(p_journal_.trace())
2699 << "received TMHaveTransactions " << m->hashes_size();
2700
2701 for (std::uint32_t i = 0; i < m->hashes_size(); i++)
2702 {
2703 if (!stringIsUint256Sized(m->hashes(i)))
2704 {
2705 JLOG(p_journal_.error())
2706 << "TMHaveTransactions with invalid hash size";
2707 fee_.update(Resource::feeMalformedRequest, "hash size");
2708 return;
2709 }
2710
2711 uint256 hash(m->hashes(i));
2712
2713 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2714
2715 JLOG(p_journal_.trace()) << "checking transaction " << (bool)txn;
2716
2717 if (!txn)
2718 {
2719 JLOG(p_journal_.debug()) << "adding transaction to request";
2720
2721 auto obj = tmBH.add_objects();
2722 obj->set_hash(hash.data(), hash.size());
2723 }
2724 else
2725 {
2726 // Erase only if a peer has seen this tx. If the peer has not
2727 // seen this tx then the tx could not has been queued for this
2728 // peer.
2729 removeTxQueue(hash);
2730 }
2731 }
2732
2733 JLOG(p_journal_.trace())
2734 << "transaction request object is " << tmBH.objects_size();
2735
2736 if (tmBH.objects_size() > 0)
2737 send(std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS));
2738}
2739
2740void
2742{
2743 if (!txReduceRelayEnabled())
2744 {
2745 JLOG(p_journal_.error())
2746 << "TMTransactions: tx reduce-relay is disabled";
2747 fee_.update(Resource::feeMalformedRequest, "disabled");
2748 return;
2749 }
2750
2751 JLOG(p_journal_.trace())
2752 << "received TMTransactions " << m->transactions_size();
2753
2754 overlay_.addTxMetrics(m->transactions_size());
2755
2756 for (std::uint32_t i = 0; i < m->transactions_size(); ++i)
2757 handleTransaction(
2759 m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
2760 false,
2761 true);
2762}
2763
2764void
2765PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
2766{
2767 using on_message_fn =
2769 if (!strand_.running_in_this_thread())
2770 return post(
2771 strand_,
2772 std::bind(
2773 (on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
2774
2775 if (!m->has_validatorpubkey())
2776 {
2777 fee_.update(Resource::feeInvalidData, "squelch no pubkey");
2778 return;
2779 }
2780 auto validator = m->validatorpubkey();
2781 auto const slice{makeSlice(validator)};
2782 if (!publicKeyType(slice))
2783 {
2784 fee_.update(Resource::feeInvalidData, "squelch bad pubkey");
2785 return;
2786 }
2787 PublicKey key(slice);
2788
2789 // Ignore the squelch for validator's own messages.
2790 if (key == app_.getValidationPublicKey())
2791 {
2792 JLOG(p_journal_.debug())
2793 << "onMessage: TMSquelch discarding validator's squelch " << slice;
2794 return;
2795 }
2796
2797 std::uint32_t duration =
2798 m->has_squelchduration() ? m->squelchduration() : 0;
2799 if (!m->squelch())
2800 squelch_.removeSquelch(key);
2801 else if (!squelch_.addSquelch(key, std::chrono::seconds{duration}))
2802 fee_.update(Resource::feeInvalidData, "squelch duration");
2803
2804 JLOG(p_journal_.debug())
2805 << "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
2806}
2807
2808//--------------------------------------------------------------------------
2809
2810void
2811PeerImp::addLedger(
2812 uint256 const& hash,
2813 std::lock_guard<std::mutex> const& lockedRecentLock)
2814{
2815 // lockedRecentLock is passed as a reminder that recentLock_ must be
2816 // locked by the caller.
2817 (void)lockedRecentLock;
2818
2819 if (std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
2820 recentLedgers_.end())
2821 return;
2822
2823 recentLedgers_.push_back(hash);
2824}
2825
2826void
2827PeerImp::doFetchPack(std::shared_ptr<protocol::TMGetObjectByHash> const& packet)
2828{
2829 // VFALCO TODO Invert this dependency using an observer and shared state
2830 // object. Don't queue fetch pack jobs if we're under load or we already
2831 // have some queued.
2832 if (app_.getFeeTrack().isLoadedLocal() ||
2833 (app_.getLedgerMaster().getValidatedLedgerAge() > 40s) ||
2834 (app_.getJobQueue().getJobCount(jtPACK) > 10))
2835 {
2836 JLOG(p_journal_.info()) << "Too busy to make fetch pack";
2837 return;
2838 }
2839
2840 if (!stringIsUint256Sized(packet->ledgerhash()))
2841 {
2842 JLOG(p_journal_.warn()) << "FetchPack hash size malformed";
2843 fee_.update(Resource::feeMalformedRequest, "hash size");
2844 return;
2845 }
2846
2847 fee_.fee = Resource::feeHeavyBurdenPeer;
2848
2849 uint256 const hash{packet->ledgerhash()};
2850
2851 std::weak_ptr<PeerImp> weak = shared_from_this();
2852 auto elapsed = UptimeClock::now();
2853 auto const pap = &app_;
2854 app_.getJobQueue().addJob(
2855 jtPACK, "MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
2856 pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
2857 });
2858}
2859
2860void
2861PeerImp::doTransactions(
2863{
2864 protocol::TMTransactions reply;
2865
2866 JLOG(p_journal_.trace()) << "received TMGetObjectByHash requesting tx "
2867 << packet->objects_size();
2868
2869 if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE)
2870 {
2871 JLOG(p_journal_.error()) << "doTransactions, invalid number of hashes";
2872 fee_.update(Resource::feeMalformedRequest, "too big");
2873 return;
2874 }
2875
2876 for (std::uint32_t i = 0; i < packet->objects_size(); ++i)
2877 {
2878 auto const& obj = packet->objects(i);
2879
2880 if (!stringIsUint256Sized(obj.hash()))
2881 {
2882 fee_.update(Resource::feeMalformedRequest, "hash size");
2883 return;
2884 }
2885
2886 uint256 hash(obj.hash());
2887
2888 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2889
2890 if (!txn)
2891 {
2892 JLOG(p_journal_.error()) << "doTransactions, transaction not found "
2893 << Slice(hash.data(), hash.size());
2894 fee_.update(Resource::feeMalformedRequest, "tx not found");
2895 return;
2896 }
2897
2898 Serializer s;
2899 auto tx = reply.add_transactions();
2900 auto sttx = txn->getSTransaction();
2901 sttx->add(s);
2902 tx->set_rawtransaction(s.data(), s.size());
2903 tx->set_status(
2904 txn->getStatus() == INCLUDED ? protocol::tsCURRENT
2905 : protocol::tsNEW);
2906 tx->set_receivetimestamp(
2907 app_.timeKeeper().now().time_since_epoch().count());
2908 tx->set_deferred(txn->getSubmitResult().queued);
2909 }
2910
2911 if (reply.transactions_size() > 0)
2912 send(std::make_shared<Message>(reply, protocol::mtTRANSACTIONS));
2913}
2914
2915void
2916PeerImp::checkTransaction(
2917 HashRouterFlags flags,
2918 bool checkSignature,
2919 std::shared_ptr<STTx const> const& stx,
2920 bool batch)
2921{
2922 // VFALCO TODO Rewrite to not use exceptions
2923 try
2924 {
2925 // charge strongly for relaying batch txns
2926 // LCOV_EXCL_START
2927 if (stx->isFlag(tfInnerBatchTxn) &&
2928 getCurrentTransactionRules()->enabled(featureBatch))
2929 {
2930 JLOG(p_journal_.warn()) << "Ignoring Network relayed Tx containing "
2931 "tfInnerBatchTxn (checkSignature).";
2932 charge(Resource::feeModerateBurdenPeer, "inner batch txn");
2933 return;
2934 }
2935 // LCOV_EXCL_STOP
2936
2937 // Expired?
2938 if (stx->isFieldPresent(sfLastLedgerSequence) &&
2939 (stx->getFieldU32(sfLastLedgerSequence) <
2940 app_.getLedgerMaster().getValidLedgerIndex()))
2941 {
2942 JLOG(p_journal_.info())
2943 << "Marking transaction " << stx->getTransactionID()
2944 << "as BAD because it's expired";
2945 app_.getHashRouter().setFlags(
2946 stx->getTransactionID(), HashRouterFlags::BAD);
2947 charge(Resource::feeUselessData, "expired tx");
2948 return;
2949 }
2950
2951 if (isPseudoTx(*stx))
2952 {
2953 // Don't do anything with pseudo transactions except put them in the
2954 // TransactionMaster cache
2955 std::string reason;
2956 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2957 XRPL_ASSERT(
2958 tx->getStatus() == NEW,
2959 "ripple::PeerImp::checkTransaction Transaction created "
2960 "correctly");
2961 if (tx->getStatus() == NEW)
2962 {
2963 JLOG(p_journal_.debug())
2964 << "Processing " << (batch ? "batch" : "unsolicited")
2965 << " pseudo-transaction tx " << tx->getID();
2966
2967 app_.getMasterTransaction().canonicalize(&tx);
2968 // Tell the overlay about it, but don't relay it.
2969 auto const toSkip =
2970 app_.getHashRouter().shouldRelay(tx->getID());
2971 if (toSkip)
2972 {
2973 JLOG(p_journal_.debug())
2974 << "Passing skipped pseudo pseudo-transaction tx "
2975 << tx->getID();
2976 app_.overlay().relay(tx->getID(), {}, *toSkip);
2977 }
2978 if (!batch)
2979 {
2980 JLOG(p_journal_.debug())
2981 << "Charging for pseudo-transaction tx " << tx->getID();
2982 charge(Resource::feeUselessData, "pseudo tx");
2983 }
2984
2985 return;
2986 }
2987 }
2988
2989 if (checkSignature)
2990 {
2991 // Check the signature before handing off to the job queue.
2992 if (auto [valid, validReason] = checkValidity(
2993 app_.getHashRouter(),
2994 *stx,
2995 app_.getLedgerMaster().getValidatedRules(),
2996 app_.config());
2997 valid != Validity::Valid)
2998 {
2999 if (!validReason.empty())
3000 {
3001 JLOG(p_journal_.debug())
3002 << "Exception checking transaction: " << validReason;
3003 }
3004
3005 // Probably not necessary to set HashRouterFlags::BAD, but
3006 // doesn't hurt.
3007 app_.getHashRouter().setFlags(
3008 stx->getTransactionID(), HashRouterFlags::BAD);
3009 charge(
3010 Resource::feeInvalidSignature,
3011 "check transaction signature failure");
3012 return;
3013 }
3014 }
3015 else
3016 {
3018 app_.getHashRouter(), stx->getTransactionID(), Validity::Valid);
3019 }
3020
3021 std::string reason;
3022 auto tx = std::make_shared<Transaction>(stx, reason, app_);
3023
3024 if (tx->getStatus() == INVALID)
3025 {
3026 if (!reason.empty())
3027 {
3028 JLOG(p_journal_.debug())
3029 << "Exception checking transaction: " << reason;
3030 }
3031 app_.getHashRouter().setFlags(
3032 stx->getTransactionID(), HashRouterFlags::BAD);
3033 charge(Resource::feeInvalidSignature, "tx (impossible)");
3034 return;
3035 }
3036
3037 bool const trusted = any(flags & HashRouterFlags::TRUSTED);
3038 app_.getOPs().processTransaction(
3039 tx, trusted, false, NetworkOPs::FailHard::no);
3040 }
3041 catch (std::exception const& ex)
3042 {
3043 JLOG(p_journal_.warn())
3044 << "Exception in " << __func__ << ": " << ex.what();
3045 app_.getHashRouter().setFlags(
3046 stx->getTransactionID(), HashRouterFlags::BAD);
3047 using namespace std::string_literals;
3048 charge(Resource::feeInvalidData, "tx "s + ex.what());
3049 }
3050}
3051
3052// Called from our JobQueue
3053void
3054PeerImp::checkPropose(
3055 bool isTrusted,
3057 RCLCxPeerPos peerPos)
3058{
3059 JLOG(p_journal_.trace())
3060 << "Checking " << (isTrusted ? "trusted" : "UNTRUSTED") << " proposal";
3061
3062 XRPL_ASSERT(packet, "ripple::PeerImp::checkPropose : non-null packet");
3063
3064 if (!cluster() && !peerPos.checkSign())
3065 {
3066 std::string desc{"Proposal fails sig check"};
3067 JLOG(p_journal_.warn()) << desc;
3068 charge(Resource::feeInvalidSignature, desc);
3069 return;
3070 }
3071
3072 bool relay;
3073
3074 if (isTrusted)
3075 relay = app_.getOPs().processTrustedProposal(peerPos);
3076 else
3077 relay = app_.config().RELAY_UNTRUSTED_PROPOSALS == 1 || cluster();
3078
3079 if (relay)
3080 {
3081 // haveMessage contains peers, which are suppressed; i.e. the peers
3082 // are the source of the message, consequently the message should
3083 // not be relayed to these peers. But the message must be counted
3084 // as part of the squelch logic.
3085 auto haveMessage = app_.overlay().relay(
3086 *packet, peerPos.suppressionID(), peerPos.publicKey());
3087 if (!haveMessage.empty())
3088 overlay_.updateSlotAndSquelch(
3089 peerPos.suppressionID(),
3090 peerPos.publicKey(),
3091 std::move(haveMessage),
3092 protocol::mtPROPOSE_LEDGER);
3093 }
3094}
3095
3096void
3097PeerImp::checkValidation(
3099 uint256 const& key,
3101{
3102 if (!val->isValid())
3103 {
3104 std::string desc{"Validation forwarded by peer is invalid"};
3105 JLOG(p_journal_.debug()) << desc;
3106 charge(Resource::feeInvalidSignature, desc);
3107 return;
3108 }
3109
3110 // FIXME it should be safe to remove this try/catch. Investigate codepaths.
3111 try
3112 {
3113 if (app_.getOPs().recvValidation(val, std::to_string(id())) ||
3114 cluster())
3115 {
3116 // haveMessage contains peers, which are suppressed; i.e. the peers
3117 // are the source of the message, consequently the message should
3118 // not be relayed to these peers. But the message must be counted
3119 // as part of the squelch logic.
3120 auto haveMessage =
3121 overlay_.relay(*packet, key, val->getSignerPublic());
3122 if (!haveMessage.empty())
3123 {
3124 overlay_.updateSlotAndSquelch(
3125 key,
3126 val->getSignerPublic(),
3127 std::move(haveMessage),
3128 protocol::mtVALIDATION);
3129 }
3130 }
3131 }
3132 catch (std::exception const& ex)
3133 {
3134 JLOG(p_journal_.trace())
3135 << "Exception processing validation: " << ex.what();
3136 using namespace std::string_literals;
3137 charge(Resource::feeMalformedRequest, "validation "s + ex.what());
3138 }
3139}
3140
3141// Returns the set of peers that can help us get
3142// the TX tree with the specified root hash.
3143//
3145getPeerWithTree(OverlayImpl& ov, uint256 const& rootHash, PeerImp const* skip)
3146{
3148 int retScore = 0;
3149
3151 if (p->hasTxSet(rootHash) && p.get() != skip)
3152 {
3153 auto score = p->getScore(true);
3154 if (!ret || (score > retScore))
3155 {
3156 ret = std::move(p);
3157 retScore = score;
3158 }
3159 }
3160 });
3161
3162 return ret;
3163}
3164
3165// Returns a random peer weighted by how likely to
3166// have the ledger and how responsive it is.
3167//
3170 OverlayImpl& ov,
3171 uint256 const& ledgerHash,
3172 LedgerIndex ledger,
3173 PeerImp const* skip)
3174{
3176 int retScore = 0;
3177
3179 if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
3180 {
3181 auto score = p->getScore(true);
3182 if (!ret || (score > retScore))
3183 {
3184 ret = std::move(p);
3185 retScore = score;
3186 }
3187 }
3188 });
3189
3190 return ret;
3191}
3192
3193void
3194PeerImp::sendLedgerBase(
3195 std::shared_ptr<Ledger const> const& ledger,
3196 protocol::TMLedgerData& ledgerData)
3197{
3198 JLOG(p_journal_.trace()) << "sendLedgerBase: Base data";
3199
3200 Serializer s(sizeof(LedgerInfo));
3201 addRaw(ledger->info(), s);
3202 ledgerData.add_nodes()->set_nodedata(s.getDataPtr(), s.getLength());
3203
3204 auto const& stateMap{ledger->stateMap()};
3205 if (stateMap.getHash() != beast::zero)
3206 {
3207 // Return account state root node if possible
3208 Serializer root(768);
3209
3210 stateMap.serializeRoot(root);
3211 ledgerData.add_nodes()->set_nodedata(
3212 root.getDataPtr(), root.getLength());
3213
3214 if (ledger->info().txHash != beast::zero)
3215 {
3216 auto const& txMap{ledger->txMap()};
3217 if (txMap.getHash() != beast::zero)
3218 {
3219 // Return TX root node if possible
3220 root.erase();
3221 txMap.serializeRoot(root);
3222 ledgerData.add_nodes()->set_nodedata(
3223 root.getDataPtr(), root.getLength());
3224 }
3225 }
3226 }
3227
3228 auto message{
3229 std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
3230 send(message);
3231}
3232
3234PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
3235{
3236 JLOG(p_journal_.trace()) << "getLedger: Ledger";
3237
3239
3240 if (m->has_ledgerhash())
3241 {
3242 // Attempt to find ledger by hash
3243 uint256 const ledgerHash{m->ledgerhash()};
3244 ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
3245 if (!ledger)
3246 {
3247 JLOG(p_journal_.trace())
3248 << "getLedger: Don't have ledger with hash " << ledgerHash;
3249
3250 if (m->has_querytype() && !m->has_requestcookie())
3251 {
3252 // Attempt to relay the request to a peer
3253 if (auto const peer = getPeerWithLedger(
3254 overlay_,
3255 ledgerHash,
3256 m->has_ledgerseq() ? m->ledgerseq() : 0,
3257 this))
3258 {
3259 m->set_requestcookie(id());
3260 peer->send(
3261 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3262 JLOG(p_journal_.debug())
3263 << "getLedger: Request relayed to peer";
3264 return ledger;
3265 }
3266
3267 JLOG(p_journal_.trace())
3268 << "getLedger: Failed to find peer to relay request";
3269 }
3270 }
3271 }
3272 else if (m->has_ledgerseq())
3273 {
3274 // Attempt to find ledger by sequence
3275 if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
3276 {
3277 JLOG(p_journal_.debug())
3278 << "getLedger: Early ledger sequence request";
3279 }
3280 else
3281 {
3282 ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
3283 if (!ledger)
3284 {
3285 JLOG(p_journal_.debug())
3286 << "getLedger: Don't have ledger with sequence "
3287 << m->ledgerseq();
3288 }
3289 }
3290 }
3291 else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
3292 {
3293 ledger = app_.getLedgerMaster().getClosedLedger();
3294 }
3295
3296 if (ledger)
3297 {
3298 // Validate retrieved ledger sequence
3299 auto const ledgerSeq{ledger->info().seq};
3300 if (m->has_ledgerseq())
3301 {
3302 if (ledgerSeq != m->ledgerseq())
3303 {
3304 // Do not resource charge a peer responding to a relay
3305 if (!m->has_requestcookie())
3306 charge(
3307 Resource::feeMalformedRequest, "get_ledger ledgerSeq");
3308
3309 ledger.reset();
3310 JLOG(p_journal_.warn())
3311 << "getLedger: Invalid ledger sequence " << ledgerSeq;
3312 }
3313 }
3314 else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
3315 {
3316 ledger.reset();
3317 JLOG(p_journal_.debug())
3318 << "getLedger: Early ledger sequence request " << ledgerSeq;
3319 }
3320 }
3321 else
3322 {
3323 JLOG(p_journal_.debug()) << "getLedger: Unable to find ledger";
3324 }
3325
3326 return ledger;
3327}
3328
3330PeerImp::getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const
3331{
3332 JLOG(p_journal_.trace()) << "getTxSet: TX set";
3333
3334 uint256 const txSetHash{m->ledgerhash()};
3336 app_.getInboundTransactions().getSet(txSetHash, false)};
3337 if (!shaMap)
3338 {
3339 if (m->has_querytype() && !m->has_requestcookie())
3340 {
3341 // Attempt to relay the request to a peer
3342 if (auto const peer = getPeerWithTree(overlay_, txSetHash, this))
3343 {
3344 m->set_requestcookie(id());
3345 peer->send(
3346 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3347 JLOG(p_journal_.debug()) << "getTxSet: Request relayed";
3348 }
3349 else
3350 {
3351 JLOG(p_journal_.debug())
3352 << "getTxSet: Failed to find relay peer";
3353 }
3354 }
3355 else
3356 {
3357 JLOG(p_journal_.debug()) << "getTxSet: Failed to find TX set";
3358 }
3359 }
3360
3361 return shaMap;
3362}
3363
3364void
3365PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
3366{
3367 // Do not resource charge a peer responding to a relay
3368 if (!m->has_requestcookie())
3369 charge(
3370 Resource::feeModerateBurdenPeer, "received a get ledger request");
3371
3374 SHAMap const* map{nullptr};
3375 protocol::TMLedgerData ledgerData;
3376 bool fatLeaves{true};
3377 auto const itype{m->itype()};
3378
3379 if (itype == protocol::liTS_CANDIDATE)
3380 {
3381 if (sharedMap = getTxSet(m); !sharedMap)
3382 return;
3383 map = sharedMap.get();
3384
3385 // Fill out the reply
3386 ledgerData.set_ledgerseq(0);
3387 ledgerData.set_ledgerhash(m->ledgerhash());
3388 ledgerData.set_type(protocol::liTS_CANDIDATE);
3389 if (m->has_requestcookie())
3390 ledgerData.set_requestcookie(m->requestcookie());
3391
3392 // We'll already have most transactions
3393 fatLeaves = false;
3394 }
3395 else
3396 {
3397 if (send_queue_.size() >= Tuning::dropSendQueue)
3398 {
3399 JLOG(p_journal_.debug())
3400 << "processLedgerRequest: Large send queue";
3401 return;
3402 }
3403 if (app_.getFeeTrack().isLoadedLocal() && !cluster())
3404 {
3405 JLOG(p_journal_.debug()) << "processLedgerRequest: Too busy";
3406 return;
3407 }
3408
3409 if (ledger = getLedger(m); !ledger)
3410 return;
3411
3412 // Fill out the reply
3413 auto const ledgerHash{ledger->info().hash};
3414 ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
3415 ledgerData.set_ledgerseq(ledger->info().seq);
3416 ledgerData.set_type(itype);
3417 if (m->has_requestcookie())
3418 ledgerData.set_requestcookie(m->requestcookie());
3419
3420 switch (itype)
3421 {
3422 case protocol::liBASE:
3423 sendLedgerBase(ledger, ledgerData);
3424 return;
3425
3426 case protocol::liTX_NODE:
3427 map = &ledger->txMap();
3428 JLOG(p_journal_.trace()) << "processLedgerRequest: TX map hash "
3429 << to_string(map->getHash());
3430 break;
3431
3432 case protocol::liAS_NODE:
3433 map = &ledger->stateMap();
3434 JLOG(p_journal_.trace())
3435 << "processLedgerRequest: Account state map hash "
3436 << to_string(map->getHash());
3437 break;
3438
3439 default:
3440 // This case should not be possible here
3441 JLOG(p_journal_.error())
3442 << "processLedgerRequest: Invalid ledger info type";
3443 return;
3444 }
3445 }
3446
3447 if (!map)
3448 {
3449 JLOG(p_journal_.warn()) << "processLedgerRequest: Unable to find map";
3450 return;
3451 }
3452
3453 // Add requested node data to reply
3454 if (m->nodeids_size() > 0)
3455 {
3456 auto const queryDepth{
3457 m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)};
3458
3460
3461 for (int i = 0; i < m->nodeids_size() &&
3462 ledgerData.nodes_size() < Tuning::softMaxReplyNodes;
3463 ++i)
3464 {
3465 auto const shaMapNodeId{deserializeSHAMapNodeID(m->nodeids(i))};
3466
3467 data.clear();
3468 data.reserve(Tuning::softMaxReplyNodes);
3469
3470 try
3471 {
3472 if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
3473 {
3474 JLOG(p_journal_.trace())
3475 << "processLedgerRequest: getNodeFat got "
3476 << data.size() << " nodes";
3477
3478 for (auto const& d : data)
3479 {
3480 if (ledgerData.nodes_size() >=
3481 Tuning::hardMaxReplyNodes)
3482 break;
3483 protocol::TMLedgerNode* node{ledgerData.add_nodes()};
3484 node->set_nodeid(d.first.getRawString());
3485 node->set_nodedata(d.second.data(), d.second.size());
3486 }
3487 }
3488 else
3489 {
3490 JLOG(p_journal_.warn())
3491 << "processLedgerRequest: getNodeFat returns false";
3492 }
3493 }
3494 catch (std::exception const& e)
3495 {
3496 std::string info;
3497 switch (itype)
3498 {
3499 case protocol::liBASE:
3500 // This case should not be possible here
3501 info = "Ledger base";
3502 break;
3503
3504 case protocol::liTX_NODE:
3505 info = "TX node";
3506 break;
3507
3508 case protocol::liAS_NODE:
3509 info = "AS node";
3510 break;
3511
3512 case protocol::liTS_CANDIDATE:
3513 info = "TS candidate";
3514 break;
3515
3516 default:
3517 info = "Invalid";
3518 break;
3519 }
3520
3521 if (!m->has_ledgerhash())
3522 info += ", no hash specified";
3523
3524 JLOG(p_journal_.warn())
3525 << "processLedgerRequest: getNodeFat with nodeId "
3526 << *shaMapNodeId << " and ledger info type " << info
3527 << " throws exception: " << e.what();
3528 }
3529 }
3530
3531 JLOG(p_journal_.info())
3532 << "processLedgerRequest: Got request for " << m->nodeids_size()
3533 << " nodes at depth " << queryDepth << ", return "
3534 << ledgerData.nodes_size() << " nodes";
3535 }
3536
3537 if (ledgerData.nodes_size() == 0)
3538 return;
3539
3540 send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
3541}
3542
3543int
3544PeerImp::getScore(bool haveItem) const
3545{
3546 // Random component of score, used to break ties and avoid
3547 // overloading the "best" peer
3548 static int const spRandomMax = 9999;
3549
3550 // Score for being very likely to have the thing we are
3551 // look for; should be roughly spRandomMax
3552 static int const spHaveItem = 10000;
3553
3554 // Score reduction for each millisecond of latency; should
3555 // be roughly spRandomMax divided by the maximum reasonable
3556 // latency
3557 static int const spLatency = 30;
3558
3559 // Penalty for unknown latency; should be roughly spRandomMax
3560 static int const spNoLatency = 8000;
3561
3562 int score = rand_int(spRandomMax);
3563
3564 if (haveItem)
3565 score += spHaveItem;
3566
3568 {
3569 std::lock_guard sl(recentLock_);
3570 latency = latency_;
3571 }
3572
3573 if (latency)
3574 score -= latency->count() * spLatency;
3575 else
3576 score -= spNoLatency;
3577
3578 return score;
3579}
3580
3581bool
3582PeerImp::isHighLatency() const
3583{
3584 std::lock_guard sl(recentLock_);
3585 return latency_ >= peerHighLatency;
3586}
3587
3588void
3589PeerImp::Metrics::add_message(std::uint64_t bytes)
3590{
3591 using namespace std::chrono_literals;
3592 std::unique_lock lock{mutex_};
3593
3594 totalBytes_ += bytes;
3595 accumBytes_ += bytes;
3596 auto const timeElapsed = clock_type::now() - intervalStart_;
3597 auto const timeElapsedInSecs =
3598 std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
3599
3600 if (timeElapsedInSecs >= 1s)
3601 {
3602 auto const avgBytes = accumBytes_ / timeElapsedInSecs.count();
3603 rollingAvg_.push_back(avgBytes);
3604
3605 auto const totalBytes =
3606 std::accumulate(rollingAvg_.begin(), rollingAvg_.end(), 0ull);
3607 rollingAvgBytes_ = totalBytes / rollingAvg_.size();
3608
3609 intervalStart_ = clock_type::now();
3610 accumBytes_ = 0;
3611 }
3612}
3613
3615PeerImp::Metrics::average_bytes() const
3616{
3617 std::shared_lock lock{mutex_};
3618 return rollingAvgBytes_;
3619}
3620
3622PeerImp::Metrics::total_bytes() const
3623{
3624 std::shared_lock lock{mutex_};
3625 return totalBytes_;
3626}
3627
3628} // namespace ripple
T accumulate(T... args)
T bind(T... args)
Represents a JSON value.
Definition json_value.h:131
A version-independent IP address and port combination.
Definition IPEndpoint.h:19
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:56
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Definition IPEndpoint.h:49
static Endpoint from_string(std::string const &s)
std::string to_string() const
Returns a string representing the endpoint.
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
bool active(Severity level) const
Returns true if any message would be logged at this severity level.
Definition Journal.h:295
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
Stream warn() const
Definition Journal.h:321
virtual Config & config()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual TimeKeeper & timeKeeper()=0
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
virtual ValidatorList & validators()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual LedgerMaster & getLedgerMaster()=0
virtual Cluster & cluster()=0
virtual HashRouter & getHashRouter()=0
void for_each(std::function< void(ClusterNode const &)> func) const
Invokes the callback once for every cluster node.
Definition Cluster.cpp:64
std::size_t size() const
The number of nodes in the cluster list.
Definition Cluster.cpp:30
bool update(PublicKey const &identity, std::string name, std::uint32_t loadFee=0, NetClock::time_point reportTime=NetClock::time_point{})
Store information about the state of a cluster node.
Definition Cluster.cpp:38
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
Definition Cluster.cpp:19
bool VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE
Definition Config.h:229
bool TX_REDUCE_RELAY_METRICS
Definition Config.h:246
int MAX_TRANSACTIONS
Definition Config.h:207
std::chrono::seconds MAX_DIVERGED_TIME
Definition Config.h:265
std::chrono::seconds MAX_UNKNOWN_TIME
Definition Config.h:262
bool shouldProcess(uint256 const &key, PeerShortID peer, HashRouterFlags &flags, std::chrono::seconds tx_interval)
bool addSuppressionPeer(uint256 const &key, PeerShortID peer)
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition JobQueue.cpp:160
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition JobQueue.cpp:123
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:149
LedgerIndex getValidLedgerIndex()
std::chrono::seconds getValidatedLedgerAge()
void setClusterFee(std::uint32_t fee)
static std::size_t messageSize(::google::protobuf::Message const &message)
Definition Message.cpp:36
virtual bool isNeedNetworkLedger()=0
PeerFinder::Manager & peerFinder()
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
void onPeerDeactivate(Peer::id_t id)
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
void reportOutboundTraffic(TrafficCount::category cat, int bytes)
void for_each(UnaryFunc &&f) const
Resource::Manager & resourceManager()
void reportInboundTraffic(TrafficCount::category cat, int bytes)
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
Setup const & setup() const
std::shared_ptr< Message > getManifestsMessage()
void incPeerDisconnectCharges() override
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
virtual void on_endpoints(std::shared_ptr< Slot > const &slot, Endpoints const &endpoints)=0
Called when mtENDPOINTS is received.
virtual Config config()=0
Returns the configuration for the manager.
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
virtual void on_failure(std::shared_ptr< Slot > const &slot)=0
Called when an outbound connection is deemed to have failed.
This class manages established peer-to-peer connections, handles message exchange,...
Definition PeerImp.h:99
std::queue< std::shared_ptr< Message > > send_queue_
Definition PeerImp.h:224
std::unique_ptr< LoadEvent > load_event_
Definition PeerImp.h:239
boost::beast::http::fields const & headers_
Definition PeerImp.h:223
void onMessageEnd(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m)
Definition PeerImp.cpp:1138
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
Definition PeerImp.cpp:509
clock_type::duration uptime() const
Definition PeerImp.h:424
void removeTxQueue(uint256 const &hash) override
Remove transaction's hash from the transactions' hashes queue.
Definition PeerImp.cpp:325
protocol::TMStatusChange last_status_
Definition PeerImp.h:216
std::string name_
Definition PeerImp.h:148
boost::circular_buffer< uint256 > recentTxSets_
Definition PeerImp.h:159
std::unique_ptr< stream_type > stream_ptr_
Definition PeerImp.h:124
void onMessage(std::shared_ptr< protocol::TMManifests > const &m)
Definition PeerImp.cpp:1147
Tracking
Whether the peer's view of the ledger converges or diverges from ours.
Definition PeerImp.h:102
Compressed compressionEnabled_
Definition PeerImp.h:244
uint256 closedLedgerHash_
Definition PeerImp.h:155
std::string domain() const
Definition PeerImp.cpp:880
std::optional< std::uint32_t > lastPingSeq_
Definition PeerImp.h:162
std::string const & fingerprint() const override
Definition PeerImp.h:676
beast::Journal const journal_
Definition PeerImp.h:122
virtual void run()
Definition PeerImp.cpp:144
void tryAsyncShutdown()
Attempts to perform a graceful SSL shutdown if conditions are met.
Definition PeerImp.cpp:600
LedgerIndex maxLedger_
Definition PeerImp.h:154
beast::Journal const p_journal_
Definition PeerImp.h:123
bool const inbound_
Definition PeerImp.h:139
PeerImp(PeerImp const &)=delete
Application & app_
Definition PeerImp.h:116
void stop() override
Definition PeerImp.cpp:202
void shutdown()
Initiates the peer disconnection sequence.
Definition PeerImp.cpp:624
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
Definition PeerImp.cpp:551
bool hasTxSet(uint256 const &hash) const override
Definition PeerImp.cpp:533
clock_type::time_point lastPingTime_
Definition PeerImp.h:163
void onMessageUnknown(std::uint16_t type)
Definition PeerImp.cpp:1089
std::shared_ptr< PeerFinder::Slot > const slot_
Definition PeerImp.h:219
std::shared_mutex nameMutex_
Definition PeerImp.h:149
boost::circular_buffer< uint256 > recentLedgers_
Definition PeerImp.h:158
id_t const id_
Definition PeerImp.h:117
std::optional< std::chrono::milliseconds > latency_
Definition PeerImp.h:161
void handleTransaction(std::shared_ptr< protocol::TMTransaction > const &m, bool eraseTxQueue, bool batch)
Called from onMessage(TMTransaction(s)).
Definition PeerImp.cpp:1339
beast::IP::Endpoint const remote_address_
Definition PeerImp.h:134
Json::Value json() override
Definition PeerImp.cpp:374
PublicKey const publicKey_
Definition PeerImp.h:147
void close()
Forcibly closes the underlying socket connection.
Definition PeerImp.cpp:667
hash_set< uint256 > txQueue_
Definition PeerImp.h:249
std::mutex recentLock_
Definition PeerImp.h:215
void onMessageBegin(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m, std::size_t size, std::size_t uncompressed_size, bool isCompressed)
Definition PeerImp.cpp:1095
bool txReduceRelayEnabled_
Definition PeerImp.h:251
void fail(std::string const &name, error_code ec)
Handles a failure associated with a specific error code.
Definition PeerImp.cpp:561
clock_type::time_point trackingTime_
Definition PeerImp.h:145
void cancelTimer() noexcept
Cancels any pending wait on the peer activity timer.
Definition PeerImp.cpp:789
bool shutdownStarted_
Definition PeerImp.h:230
socket_type & socket_
Definition PeerImp.h:125
ProtocolVersion protocol_
Definition PeerImp.h:142
reduce_relay::Squelch< UptimeClock > squelch_
Definition PeerImp.h:166
bool writePending_
Definition PeerImp.h:236
std::string getVersion() const
Return the version of rippled that the peer is running, if reported.
Definition PeerImp.cpp:366
struct ripple::PeerImp::@22 metrics_
uint256 previousLedgerHash_
Definition PeerImp.h:156
void charge(Resource::Charge const &fee, std::string const &context) override
Adjust this peer's load balance based on the type of load imposed.
Definition PeerImp.cpp:337
void onTimer(error_code const &ec)
Handles the expiration of the peer activity timer.
Definition PeerImp.cpp:722
void send(std::shared_ptr< Message > const &m) override
Definition PeerImp.cpp:222
std::string name() const
Definition PeerImp.cpp:873
boost::system::error_code error_code
Definition PeerImp.h:106
void onReadMessage(error_code ec, std::size_t bytes_transferred)
Definition PeerImp.cpp:932
bool ledgerReplayEnabled_
Definition PeerImp.h:253
boost::asio::basic_waitable_timer< std::chrono::steady_clock > waitable_timer
Definition PeerImp.h:113
bool crawl() const
Returns true if this connection will publicly share its IP address.
Definition PeerImp.cpp:351
waitable_timer timer_
Definition PeerImp.h:130
void setTimer(std::chrono::seconds interval)
Sets and starts the peer timer.
Definition PeerImp.cpp:693
void sendTxQueue() override
Send aggregated transactions' hashes.
Definition PeerImp.cpp:289
bool txReduceRelayEnabled() const override
Definition PeerImp.h:494
bool supportsFeature(ProtocolFeature f) const override
Definition PeerImp.cpp:492
ChargeWithContext fee_
Definition PeerImp.h:218
void onWriteMessage(error_code ec, std::size_t bytes_transferred)
Definition PeerImp.cpp:1023
http_request_type request_
Definition PeerImp.h:221
OverlayImpl & overlay_
Definition PeerImp.h:138
LedgerIndex minLedger_
Definition PeerImp.h:153
virtual ~PeerImp()
Definition PeerImp.cpp:121
void addTxQueue(uint256 const &hash) override
Add transaction's hash to the transactions' hashes queue.
Definition PeerImp.cpp:308
stream_type & stream_
Definition PeerImp.h:126
static std::string makePrefix(std::string const &fingerprint)
Definition PeerImp.cpp:714
bool cluster() const override
Returns true if this connection is a member of the cluster.
Definition PeerImp.cpp:360
void onShutdown(error_code ec)
Handles the completion of the asynchronous SSL shutdown.
Definition PeerImp.cpp:641
boost::asio::strand< boost::asio::executor > strand_
Definition PeerImp.h:127
void cycleStatus() override
Definition PeerImp.cpp:541
boost::beast::multi_buffer read_buffer_
Definition PeerImp.h:220
Resource::Consumer usage_
Definition PeerImp.h:217
bool readPending_
Definition PeerImp.h:233
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
Definition PeerImp.cpp:524
void doProtocolStart()
Definition PeerImp.cpp:890
std::atomic< Tracking > tracking_
Definition PeerImp.h:144
Represents a peer connection in the overlay.
A public key.
Definition PublicKey.h:43
A peer's signed, proposed position for use in RCLConsensus.
bool checkSign() const
Verify the signing hash of the proposal.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
uint256 const & suppressionID() const
Unique id used by hash router to suppress duplicates.
A consumption charge.
Definition Charge.h:11
An endpoint that consumes resources.
Definition Consumer.h:17
int balance()
Returns the credit balance representing consumption.
Definition Consumer.cpp:118
bool disconnect(beast::Journal const &j)
Returns true if the consumer should be disconnected.
Definition Consumer.cpp:105
Disposition charge(Charge const &fee, std::string const &context={})
Apply a load charge to the consumer.
Definition Consumer.cpp:87
virtual void importConsumers(std::string const &origin, Gossip const &gossip)=0
Import packaged consumer information.
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
Definition SHAMap.h:78
std::size_t size() const noexcept
Definition Serializer.h:53
void const * data() const noexcept
Definition Serializer.h:59
int getLength() const
Definition Serializer.h:214
void const * getDataPtr() const
Definition Serializer.h:204
An immutable linear range of bytes.
Definition Slice.h:27
time_point now() const override
Returns the current time, using the server's clock.
Definition TimeKeeper.h:45
static category categorize(::google::protobuf::Message const &message, protocol::MessageType type, bool inbound)
Given a protocol message, determine which traffic category it belongs to.
static void sendValidatorList(Peer &peer, std::uint64_t peerSequence, PublicKey const &publisherKey, std::size_t maxSequence, std::uint32_t rawVersion, std::string const &rawManifest, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, HashRouter &hashRouter, beast::Journal j)
void for_each_available(std::function< void(std::string const &manifest, std::uint32_t version, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, PublicKey const &pubKey, std::size_t maxSequence, uint256 const &hash)> func) const
Invokes the callback once for every available publisher list's raw data members.
static constexpr std::size_t size()
Definition base_uint.h:507
constexpr bool parseHex(std::string_view sv)
Parse a hex string into a base_uint.
Definition base_uint.h:484
T emplace_back(T... args)
T empty(T... args)
T find(T... args)
T for_each(T... args)
T get(T... args)
T is_same_v
T load(T... args)
T lock(T... args)
T max(T... args)
T min(T... args)
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:27
unsigned int UInt
Charge const feeMalformedRequest
Schedule of fees charged for imposing load on the server.
Charge const feeInvalidData
Charge const feeUselessData
Charge const feeTrivialPeer
Charge const feeModerateBurdenPeer
std::size_t constexpr readBufferBytes
Size of buffer used to read from the socket.
@ targetSendQueue
How many messages we consider reasonable sustained on a send queue.
@ maxQueryDepth
The maximum number of levels to search.
@ sendqIntervals
How many timer intervals a sendq has to stay large before we disconnect.
@ sendQueueLogFreq
How often to log send queue size.
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
auto measureDurationAndLog(Func &&func, std::string const &actionDescription, std::chrono::duration< Rep, Period > maxDelay, beast::Journal const &journal)
Definition PerfLog.h:168
static constexpr std::size_t MAX_TX_QUEUE_SIZE
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition AccountID.cpp:95
uint256 proposalUniqueId(uint256 const &proposeHash, uint256 const &previousLedger, std::uint32_t proposeSeq, NetClock::time_point closeTime, Slice const &publicKey, Slice const &signature)
Calculate a unique identifier for a signed proposal.
constexpr ProtocolVersion make_protocol(std::uint16_t major, std::uint16_t minor)
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
Definition STTx.cpp:851
static constexpr char FEATURE_COMPR[]
Definition Handshake.h:122
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
bool isCurrent(ValidationParms const &p, NetClock::time_point now, NetClock::time_point signTime, NetClock::time_point seenTime)
Whether a validation is still current.
std::string base64_decode(std::string_view data)
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address public_ip, beast::IP::Address remote_ip, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
static bool stringIsUint256Sized(std::string const &pBuffStr)
Definition PeerImp.cpp:138
static constexpr char FEATURE_LEDGER_REPLAY[]
Definition Handshake.h:128
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
HashRouterFlags
Definition HashRouter.h:15
std::optional< KeyType > publicKeyType(Slice const &slice)
Returns the type of public key.
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:11
static std::shared_ptr< PeerImp > getPeerWithLedger(OverlayImpl &ov, uint256 const &ledgerHash, LedgerIndex ledger, PeerImp const *skip)
Definition PeerImp.cpp:3169
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:225
Stopwatch & stopwatch()
Returns an instance of a wall clock.
Definition chrono.h:100
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:14
NodeID calcNodeID(PublicKey const &)
Calculate the 160-bit node ID from a node public key.
std::string getFingerprint(beast::IP::Endpoint const &address, std::optional< PublicKey > const &publicKey=std::nullopt, std::optional< std::string > const &id=std::nullopt)
Definition PublicKey.h:250
static std::shared_ptr< PeerImp > getPeerWithTree(OverlayImpl &ov, uint256 const &rootHash, PeerImp const *skip)
Definition PeerImp.cpp:3145
bool peerFeatureEnabled(headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
Definition Handshake.h:179
void forceValidity(HashRouter &router, uint256 const &txid, Validity validity)
Sets the validity of a given transaction in the cache.
Definition apply.cpp:99
static constexpr char FEATURE_TXRR[]
Definition Handshake.h:126
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:611
std::optional< Rules > const & getCurrentTransactionRules()
Definition Rules.cpp:28
Number root(Number f, unsigned d)
Definition Number.cpp:617
@ manifest
Manifest.
@ proposal
proposal for signing
void addRaw(LedgerHeader const &, Serializer &, bool includeHash=false)
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
Definition apply.cpp:25
@ jtLEDGER_REQ
Definition Job.h:40
@ jtPROPOSAL_ut
Definition Job.h:41
@ jtREPLAY_REQ
Definition Job.h:39
@ jtTRANSACTION
Definition Job.h:43
@ jtPEER
Definition Job.h:61
@ jtREQUESTED_TXN
Definition Job.h:45
@ jtMISSING_TXN
Definition Job.h:44
@ jtVALIDATION_t
Definition Job.h:52
@ jtMANIFEST
Definition Job.h:36
@ jtTXN_DATA
Definition Job.h:50
@ jtPACK
Definition Job.h:24
@ jtVALIDATION_ut
Definition Job.h:35
@ jtPROPOSAL_t
Definition Job.h:55
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
Definition digest.h:205
static constexpr char FEATURE_VPRR[]
Definition Handshake.h:124
constexpr std::uint32_t tfInnerBatchTxn
Definition TxFlags.h:42
STL namespace.
T nth_element(T... args)
T ref(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)
Information about the notional ledger backing the view.
beast::IP::Address public_ip
Definition Overlay.h:50
std::optional< std::uint32_t > networkID
Definition Overlay.h:53
bool peerPrivate
true if we want our IP address kept private.
void update(Resource::Charge f, std::string const &add)
Definition PeerImp.h:201
Describes a single consumer.
Definition Gossip.h:18
beast::IP::Endpoint address
Definition Gossip.h:22
Data format for exchanging consumption information across peers.
Definition Gossip.h:13
std::vector< Item > items
Definition Gossip.h:25
T tie(T... args)
T to_string(T... args)
T what(T... args)