rippled
Loading...
Searching...
No Matches
ConnectAttempt.cpp
1#include <xrpld/overlay/Cluster.h>
2#include <xrpld/overlay/detail/ConnectAttempt.h>
3#include <xrpld/overlay/detail/PeerImp.h>
4#include <xrpld/overlay/detail/ProtocolVersion.h>
5
6#include <xrpl/json/json_reader.h>
7
8#include <sstream>
9
10namespace xrpl {
11
13 Application& app,
14 boost::asio::io_context& io_context,
15 endpoint_type const& remote_endpoint,
17 shared_context const& context,
20 beast::Journal journal,
21 OverlayImpl& overlay)
22 : Child(overlay)
23 , app_(app)
24 , id_(id)
25 , sink_(journal, OverlayImpl::makePrefix(id))
26 , journal_(sink_)
27 , remote_endpoint_(remote_endpoint)
28 , usage_(usage)
29 , strand_(boost::asio::make_strand(io_context))
30 , timer_(io_context)
31 , stepTimer_(io_context)
32 , stream_ptr_(
33 std::make_unique<stream_type>(socket_type(std::forward<boost::asio::io_context&>(io_context)), *context))
34 , socket_(stream_ptr_->next_layer().socket())
35 , stream_(*stream_ptr_)
36 , slot_(slot)
37{
38}
39
41{
42 // slot_ will be null if we successfully connected
43 // and transferred ownership to a PeerImp
44 if (slot_ != nullptr)
46}
47
48void
50{
51 if (!strand_.running_in_this_thread())
52 return boost::asio::post(strand_, std::bind(&ConnectAttempt::stop, shared_from_this()));
53
54 if (!socket_.is_open())
55 return;
56
57 JLOG(journal_.debug()) << "stop: Stop";
58
59 shutdown();
60}
61
62void
64{
65 if (!strand_.running_in_this_thread())
66 return boost::asio::post(strand_, std::bind(&ConnectAttempt::run, shared_from_this()));
67
68 JLOG(journal_.debug()) << "run: connecting to " << remote_endpoint_;
69
70 ioPending_ = true;
71
72 // Allow up to connectTimeout_ seconds to establish remote peer connection
74
75 stream_.next_layer().async_connect(
77 boost::asio::bind_executor(
78 strand_, std::bind(&ConnectAttempt::onConnect, shared_from_this(), std::placeholders::_1)));
79}
80
81//------------------------------------------------------------------------------
82
83void
85{
86 XRPL_ASSERT(strand_.running_in_this_thread(), "xrpl::ConnectAttempt::shutdown: strand in this thread");
87
88 if (!socket_.is_open())
89 return;
90
91 shutdown_ = true;
92 boost::beast::get_lowest_layer(stream_).cancel();
93
95}
96
97void
99{
100 XRPL_ASSERT(strand_.running_in_this_thread(), "xrpl::ConnectAttempt::tryAsyncShutdown : strand in this thread");
101
103 return;
104
105 if (ioPending_)
106 return;
107
108 // gracefully shutdown the SSL socket, performing a shutdown handshake
110 {
112 return stream_.async_shutdown(
113 bind_executor(strand_, std::bind(&ConnectAttempt::onShutdown, shared_from_this(), std::placeholders::_1)));
114 }
115
116 close();
117}
118
119void
121{
122 cancelTimer();
123
124 if (ec)
125 {
126 // - eof: the stream was cleanly closed
127 // - operation_aborted: an expired timer (slow shutdown)
128 // - stream_truncated: the tcp connection closed (no handshake) it could
129 // occur if a peer does not perform a graceful disconnect
130 // - broken_pipe: the peer is gone
131 // - application data after close notify: benign SSL shutdown condition
132 bool shouldLog =
133 (ec != boost::asio::error::eof && ec != boost::asio::error::operation_aborted &&
134 ec.message().find("application data after close notify") == std::string::npos);
135
136 if (shouldLog)
137 {
138 JLOG(journal_.debug()) << "onShutdown: " << ec.message();
139 }
140 }
141
142 close();
143}
144
145void
147{
148 XRPL_ASSERT(strand_.running_in_this_thread(), "xrpl::ConnectAttempt::close : strand in this thread");
149 if (!socket_.is_open())
150 return;
151
152 cancelTimer();
153
154 error_code ec;
155 socket_.close(ec);
156}
157
158void
160{
161 JLOG(journal_.debug()) << reason;
162 shutdown();
163}
164
165void
167{
168 JLOG(journal_.debug()) << name << ": " << ec.message();
169 shutdown();
170}
171
172void
174{
175 currentStep_ = step;
176
177 // Set global timer (only if not already set)
178 if (timer_.expiry() == std::chrono::steady_clock::time_point{})
179 {
180 try
181 {
182 timer_.expires_after(connectTimeout);
183 timer_.async_wait(boost::asio::bind_executor(
184 strand_, std::bind(&ConnectAttempt::onTimer, shared_from_this(), std::placeholders::_1)));
185 }
186 catch (std::exception const& ex)
187 {
188 JLOG(journal_.error()) << "setTimer (global): " << ex.what();
189 return close();
190 }
191 }
192
193 // Set step-specific timer
194 try
195 {
196 std::chrono::seconds stepTimeout;
197 switch (step)
198 {
200 stepTimeout = StepTimeouts::tcpConnect;
201 break;
203 stepTimeout = StepTimeouts::tlsHandshake;
204 break;
206 stepTimeout = StepTimeouts::httpWrite;
207 break;
209 stepTimeout = StepTimeouts::httpRead;
210 break;
212 stepTimeout = StepTimeouts::tlsShutdown;
213 break;
216 return; // No timer needed for init or complete step
217 }
218
219 // call to expires_after cancels previous timer
220 stepTimer_.expires_after(stepTimeout);
221 stepTimer_.async_wait(boost::asio::bind_executor(
222 strand_, std::bind(&ConnectAttempt::onTimer, shared_from_this(), std::placeholders::_1)));
223
224 JLOG(journal_.trace()) << "setTimer: " << stepToString(step) << " timeout=" << stepTimeout.count() << "s";
225 }
226 catch (std::exception const& ex)
227 {
228 JLOG(journal_.error()) << "setTimer (step " << stepToString(step) << "): " << ex.what();
229 return close();
230 }
231}
232
233void
235{
236 try
237 {
238 timer_.cancel();
239 stepTimer_.cancel();
240 }
241 catch (boost::system::system_error const&)
242 {
243 // ignored
244 }
245}
246
247void
249{
250 if (!socket_.is_open())
251 return;
252
253 if (ec)
254 {
255 // do not initiate shutdown, timers are frequently cancelled
256 if (ec == boost::asio::error::operation_aborted)
257 return;
258
259 // This should never happen
260 JLOG(journal_.error()) << "onTimer: " << ec.message();
261 return close();
262 }
263
264 // Determine which timer expired by checking their expiry times
265 auto const now = std::chrono::steady_clock::now();
266 bool globalExpired = (timer_.expiry() <= now);
267 bool stepExpired = (stepTimer_.expiry() <= now);
268
269 if (globalExpired)
270 {
271 JLOG(journal_.debug()) << "onTimer: Global timeout; step: " << stepToString(currentStep_);
272 }
273 else if (stepExpired)
274 {
275 JLOG(journal_.debug()) << "onTimer: Step timeout; step: " << stepToString(currentStep_);
276 }
277 else
278 {
279 JLOG(journal_.warn()) << "onTimer: Unexpected timer callback";
280 }
281
282 close();
283}
284
285void
287{
288 ioPending_ = false;
289
290 if (ec)
291 {
292 if (ec == boost::asio::error::operation_aborted)
293 return tryAsyncShutdown();
294
295 return fail("onConnect", ec);
296 }
297
298 if (!socket_.is_open())
299 return;
300
301 // check if connection has really been established
302 socket_.local_endpoint(ec);
303 if (ec)
304 return fail("onConnect", ec);
305
306 if (shutdown_)
307 return tryAsyncShutdown();
308
309 ioPending_ = true;
310
312
313 stream_.set_verify_mode(boost::asio::ssl::verify_none);
314 stream_.async_handshake(
315 boost::asio::ssl::stream_base::client,
316 boost::asio::bind_executor(
317 strand_, std::bind(&ConnectAttempt::onHandshake, shared_from_this(), std::placeholders::_1)));
318}
319
320void
322{
323 ioPending_ = false;
324
325 if (ec)
326 {
327 if (ec == boost::asio::error::operation_aborted)
328 return tryAsyncShutdown();
329
330 return fail("onHandshake", ec);
331 }
332
333 auto const local_endpoint = socket_.local_endpoint(ec);
334 if (ec)
335 return fail("onHandshake", ec);
336
338
339 // check if we connected to ourselves
341 return fail("Self connection");
342
343 auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
344 if (!sharedValue)
345 return shutdown(); // makeSharedValue logs
346
353
355 req_, *sharedValue, overlay_.setup().networkID, overlay_.setup().public_ip, remote_endpoint_.address(), app_);
356
357 if (shutdown_)
358 return tryAsyncShutdown();
359
360 ioPending_ = true;
361
362 boost::beast::http::async_write(
363 stream_,
364 req_,
365 boost::asio::bind_executor(
366 strand_, std::bind(&ConnectAttempt::onWrite, shared_from_this(), std::placeholders::_1)));
367}
368
369void
371{
372 ioPending_ = false;
373
374 if (ec)
375 {
376 if (ec == boost::asio::error::operation_aborted)
377 return tryAsyncShutdown();
378
379 return fail("onWrite", ec);
380 }
381
382 if (shutdown_)
383 return tryAsyncShutdown();
384
385 ioPending_ = true;
386
388
389 boost::beast::http::async_read(
390 stream_,
391 read_buf_,
392 response_,
393 boost::asio::bind_executor(
394 strand_, std::bind(&ConnectAttempt::onRead, shared_from_this(), std::placeholders::_1)));
395}
396
397void
399{
400 cancelTimer();
401 ioPending_ = false;
403
404 if (ec)
405 {
406 if (ec == boost::asio::error::eof)
407 {
408 JLOG(journal_.debug()) << "EOF";
409 return shutdown();
410 }
411
412 if (ec == boost::asio::error::operation_aborted)
413 return tryAsyncShutdown();
414
415 return fail("onRead", ec);
416 }
417
418 if (shutdown_)
419 return tryAsyncShutdown();
420
422}
423
424//--------------------------------------------------------------------------
425
426void
428{
430 {
431 // A peer may respond with service_unavailable and a list of alternative
432 // peers to connect to, a differing status code is unexpected
433 if (response_.result() != boost::beast::http::status::service_unavailable)
434 {
435 JLOG(journal_.warn()) << "Unable to upgrade to peer protocol: " << response_.result() << " ("
436 << response_.reason() << ")";
437 return shutdown();
438 }
439
440 // Parse response body to determine if this is a redirect or other
441 // service unavailable
442 std::string responseBody;
443 responseBody.reserve(boost::asio::buffer_size(response_.body().data()));
444 for (auto const buffer : response_.body().data())
445 responseBody.append(static_cast<char const*>(buffer.data()), boost::asio::buffer_size(buffer));
446
447 Json::Value json;
448 Json::Reader reader;
449 auto const isValidJson = reader.parse(responseBody, json);
450
451 // Check if this is a redirect response (contains peer-ips field)
452 auto const isRedirect = isValidJson && json.isObject() && json.isMember("peer-ips");
453
454 if (!isRedirect)
455 {
456 JLOG(journal_.warn()) << "processResponse: " << remote_endpoint_
457 << " failed to upgrade to peer protocol: " << response_.result() << " ("
458 << response_.reason() << ")";
459
460 return shutdown();
461 }
462
463 Json::Value const& peerIps = json["peer-ips"];
464 if (!peerIps.isArray())
465 return fail("processResponse: invalid peer-ips format");
466
467 // Extract and validate peer endpoints
469 redirectEndpoints.reserve(peerIps.size());
470
471 for (auto const& ipValue : peerIps)
472 {
473 if (!ipValue.isString())
474 continue;
475
476 error_code ec;
477 auto const endpoint = parse_endpoint(ipValue.asString(), ec);
478 if (!ec)
479 redirectEndpoints.push_back(endpoint);
480 }
481
482 // Notify PeerFinder about the redirect redirectEndpoints may be empty
483 overlay_.peerFinder().onRedirects(remote_endpoint_, redirectEndpoints);
484
485 return fail("processResponse: failed to connect to peer: redirected");
486 }
487
488 // Just because our peer selected a particular protocol version doesn't
489 // mean that it's acceptable to us. Check that it is:
490 std::optional<ProtocolVersion> negotiatedProtocol;
491
492 {
493 auto const pvs = parseProtocolVersions(response_["Upgrade"]);
494
495 if (pvs.size() == 1 && isProtocolSupported(pvs[0]))
496 negotiatedProtocol = pvs[0];
497
498 if (!negotiatedProtocol)
499 return fail("processResponse: Unable to negotiate protocol version");
500 }
501
502 auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
503 if (!sharedValue)
504 return shutdown(); // makeSharedValue logs
505
506 try
507 {
508 auto const publicKey = verifyHandshake(
509 response_,
510 *sharedValue,
513 remote_endpoint_.address(),
514 app_);
515
516 usage_.setPublicKey(publicKey);
517
518 JLOG(journal_.debug()) << "Protocol: " << to_string(*negotiatedProtocol);
519 JLOG(journal_.info()) << "Public Key: " << toBase58(TokenType::NodePublic, publicKey);
520
521 auto const member = app_.cluster().member(publicKey);
522 if (member)
523 {
524 JLOG(journal_.info()) << "Cluster name: " << *member;
525 }
526
527 auto const result = overlay_.peerFinder().activate(slot_, publicKey, member.has_value());
528 if (result != PeerFinder::Result::success)
529 {
531 ss << "Outbound Connect Attempt " << remote_endpoint_ << " " << to_string(result);
532 return fail(ss.str());
533 }
534
535 if (!socket_.is_open())
536 return;
537
538 if (shutdown_)
539 return tryAsyncShutdown();
540
541 auto const peer = std::make_shared<PeerImp>(
542 app_,
543 std::move(stream_ptr_),
544 read_buf_.data(),
545 std::move(slot_),
546 std::move(response_),
547 usage_,
548 publicKey,
549 *negotiatedProtocol,
550 id_,
551 overlay_);
552
553 overlay_.add_active(peer);
554 }
555 catch (std::exception const& e)
556 {
557 return fail(std::string("Handshake failure (") + e.what() + ")");
558 }
559}
560
561} // namespace xrpl
T append(T... args)
T bind(T... args)
Unserialize a JSON document into a Value.
Definition json_reader.h:18
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Represents a JSON value.
Definition json_value.h:131
bool isArray() const
UInt size() const
Number of values in array or object.
bool isObject() const
bool isMember(char const *key) const
Return true if the object has a member named key.
A generic endpoint for log messages.
Definition Journal.h:41
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
Stream warn() const
Definition Journal.h:313
virtual Cluster & cluster()=0
virtual Config & config()=0
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
Definition Cluster.cpp:19
bool COMPRESSION
Definition Config.h:201
bool TX_REDUCE_RELAY_ENABLE
Definition Config.h:239
bool VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE
Definition Config.h:229
bool LEDGER_REPLAY
Definition Config.h:204
boost::system::error_code error_code
boost::beast::ssl_stream< middle_type > stream_type
void fail(std::string const &reason)
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_
void setTimer(ConnectionStep step)
Set timers for the specified connection step.
void processResponse()
Process the HTTP upgrade response from peer.
boost::asio::strand< boost::asio::io_context::executor_type > strand_
void stop() override
Stop the connection attempt.
ConnectionStep currentStep_
void onRead(error_code ec)
static std::string stepToString(ConnectionStep step)
std::unique_ptr< stream_type > stream_ptr_
void onConnect(error_code ec)
void onShutdown(error_code ec)
std::shared_ptr< PeerFinder::Slot > slot_
ConnectionStep
Represents the current phase of the connection establishment process.
response_type response_
boost::asio::ip::tcp::socket socket_type
boost::beast::multi_buffer read_buf_
void run()
Begin the connection attempt.
Peer::id_t const id_
boost::asio::ip::tcp::endpoint endpoint_type
endpoint_type remote_endpoint_
boost::asio::basic_waitable_timer< std::chrono::steady_clock > stepTimer_
void onHandshake(error_code ec)
static boost::asio::ip::tcp::endpoint parse_endpoint(std::string const &s, boost::system::error_code &ec)
ConnectAttempt(Application &app, boost::asio::io_context &io_context, endpoint_type const &remote_endpoint, Resource::Consumer usage, shared_context const &context, Peer::id_t id, std::shared_ptr< PeerFinder::Slot > const &slot, beast::Journal journal, OverlayImpl &overlay)
Construct a new ConnectAttempt object.
static constexpr std::chrono::seconds connectTimeout
Resource::Consumer usage_
beast::Journal const journal_
void onWrite(error_code ec)
void onTimer(error_code ec)
Handle timer expiration events.
void cancelTimer()
Cancel both global and step timers.
static bool isPeerUpgrade(http_request_type const &request)
void add_active(std::shared_ptr< PeerImp > const &peer)
PeerFinder::Manager & peerFinder()
Setup const & setup() const
virtual void onRedirects(boost::asio::ip::tcp::endpoint const &remote_address, std::vector< boost::asio::ip::tcp::endpoint > const &eps)=0
Called when we received redirect IPs from a busy peer.
virtual Result activate(std::shared_ptr< Slot > const &slot, PublicKey const &key, bool reserved)=0
Request an active slot type.
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
virtual Config config()=0
Returns the configuration for the manager.
virtual bool onConnected(std::shared_ptr< Slot > const &slot, beast::IP::Endpoint const &local_endpoint)=0
Called when an outbound connection attempt succeeds.
An endpoint that consumes resources.
Definition Consumer.h:17
void setPublicKey(PublicKey const &publicKey)
Definition Consumer.cpp:129
T count(T... args)
T is_same_v
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
std::vector< ProtocolVersion > parseProtocolVersions(boost::beast::string_view const &value)
Parse a set of protocol versions.
bool isProtocolSupported(ProtocolVersion const &v)
Determine whether we support a specific protocol version.
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:598
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition AccountID.cpp:92
auto makeRequest(bool crawlPublic, bool comprEnabled, bool ledgerReplayEnabled, bool txReduceRelayEnabled, bool vpReduceRelayEnabled) -> request_type
Make outbound http request.
PublicKey verifyHandshake(boost::beast::http::fields const &headers, xrpl::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address public_ip, beast::IP::Address remote, Application &app)
Validate header fields necessary for upgrading the link to the peer protocol.
void buildHandshake(boost::beast::http::fields &h, xrpl::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address public_ip, beast::IP::Address remote_ip, Application &app)
Insert fields headers necessary for upgrading the link to the peer protocol.
T push_back(T... args)
T reserve(T... args)
T str(T... args)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
static constexpr std::chrono::seconds tcpConnect
static constexpr std::chrono::seconds tlsHandshake
static constexpr std::chrono::seconds httpWrite
static constexpr std::chrono::seconds httpRead
static constexpr std::chrono::seconds tlsShutdown
std::optional< std::uint32_t > networkID
Definition Overlay.h:53
beast::IP::Address public_ip
Definition Overlay.h:50
bool peerPrivate
true if we want our IP address kept private.
T what(T... args)