rippled
Loading...
Searching...
No Matches
ConnectAttempt.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/overlay/Cluster.h>
21#include <xrpld/overlay/detail/ConnectAttempt.h>
22#include <xrpld/overlay/detail/PeerImp.h>
23#include <xrpld/overlay/detail/ProtocolVersion.h>
24
25#include <xrpl/json/json_reader.h>
26
27namespace ripple {
28
30 Application& app,
31 boost::asio::io_context& io_context,
32 endpoint_type const& remote_endpoint,
34 shared_context const& context,
37 beast::Journal journal,
38 OverlayImpl& overlay)
39 : Child(overlay)
40 , app_(app)
41 , id_(id)
42 , sink_(journal, OverlayImpl::makePrefix(id))
43 , journal_(sink_)
44 , remote_endpoint_(remote_endpoint)
45 , usage_(usage)
46 , strand_(boost::asio::make_strand(io_context))
47 , timer_(io_context)
48 , stream_ptr_(std::make_unique<stream_type>(
49 socket_type(std::forward<boost::asio::io_context&>(io_context)),
50 *context))
51 , socket_(stream_ptr_->next_layer().socket())
52 , stream_(*stream_ptr_)
53 , slot_(slot)
54{
55 JLOG(journal_.debug()) << "Connect " << remote_endpoint;
56}
57
59{
60 if (slot_ != nullptr)
62 JLOG(journal_.trace()) << "~ConnectAttempt";
63}
64
65void
67{
68 if (!strand_.running_in_this_thread())
69 return boost::asio::post(
71 if (socket_.is_open())
72 {
73 JLOG(journal_.debug()) << "Stop";
74 }
75 close();
76}
77
78void
80{
81 stream_.next_layer().async_connect(
83 boost::asio::bind_executor(
84 strand_,
88 std::placeholders::_1)));
89}
90
91//------------------------------------------------------------------------------
92
93void
95{
96 XRPL_ASSERT(
97 strand_.running_in_this_thread(),
98 "ripple::ConnectAttempt::close : strand in this thread");
99 if (socket_.is_open())
100 {
101 try
102 {
103 timer_.cancel();
104 socket_.close();
105 }
106 catch (boost::system::system_error const&)
107 {
108 // ignored
109 }
110
111 JLOG(journal_.debug()) << "Closed";
112 }
113}
114
115void
117{
118 JLOG(journal_.debug()) << reason;
119 close();
120}
121
122void
124{
125 JLOG(journal_.debug()) << name << ": " << ec.message();
126 close();
127}
128
129void
131{
132 try
133 {
134 timer_.expires_after(std::chrono::seconds(15));
135 }
136 catch (boost::system::system_error const& e)
137 {
138 JLOG(journal_.error()) << "setTimer: " << e.code();
139 return;
140 }
141
142 timer_.async_wait(boost::asio::bind_executor(
143 strand_,
144 std::bind(
147 std::placeholders::_1)));
148}
149
150void
152{
153 try
154 {
155 timer_.cancel();
156 }
157 catch (boost::system::system_error const&)
158 {
159 // ignored
160 }
161}
162
163void
165{
166 if (!socket_.is_open())
167 return;
168 if (ec == boost::asio::error::operation_aborted)
169 return;
170 if (ec)
171 {
172 // This should never happen
173 JLOG(journal_.error()) << "onTimer: " << ec.message();
174 return close();
175 }
176 fail("Timeout");
177}
178
179void
181{
182 cancelTimer();
183
184 if (ec == boost::asio::error::operation_aborted)
185 return;
186 endpoint_type local_endpoint;
187 if (!ec)
188 local_endpoint = socket_.local_endpoint(ec);
189 if (ec)
190 return fail("onConnect", ec);
191 if (!socket_.is_open())
192 return;
193 JLOG(journal_.trace()) << "onConnect";
194
195 setTimer();
196 stream_.set_verify_mode(boost::asio::ssl::verify_none);
197 stream_.async_handshake(
198 boost::asio::ssl::stream_base::client,
199 boost::asio::bind_executor(
200 strand_,
201 std::bind(
204 std::placeholders::_1)));
205}
206
207void
209{
210 cancelTimer();
211 if (!socket_.is_open())
212 return;
213 if (ec == boost::asio::error::operation_aborted)
214 return;
215 endpoint_type local_endpoint;
216 if (!ec)
217 local_endpoint = socket_.local_endpoint(ec);
218 if (ec)
219 return fail("onHandshake", ec);
220 JLOG(journal_.trace()) << "onHandshake";
221
224 return fail("Duplicate connection");
225
226 auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
227 if (!sharedValue)
228 return close(); // makeSharedValue logs
229
236
238 req_,
239 *sharedValue,
242 remote_endpoint_.address(),
243 app_);
244
245 setTimer();
246 boost::beast::http::async_write(
247 stream_,
248 req_,
249 boost::asio::bind_executor(
250 strand_,
251 std::bind(
254 std::placeholders::_1)));
255}
256
257void
259{
260 cancelTimer();
261 if (!socket_.is_open())
262 return;
263 if (ec == boost::asio::error::operation_aborted)
264 return;
265 if (ec)
266 return fail("onWrite", ec);
267 boost::beast::http::async_read(
268 stream_,
269 read_buf_,
270 response_,
271 boost::asio::bind_executor(
272 strand_,
273 std::bind(
276 std::placeholders::_1)));
277}
278
279void
281{
282 cancelTimer();
283
284 if (!socket_.is_open())
285 return;
286 if (ec == boost::asio::error::operation_aborted)
287 return;
288 if (ec == boost::asio::error::eof)
289 {
290 JLOG(journal_.info()) << "EOF";
291 setTimer();
292 return stream_.async_shutdown(boost::asio::bind_executor(
293 strand_,
294 std::bind(
297 std::placeholders::_1)));
298 }
299 if (ec)
300 return fail("onRead", ec);
302}
303
304void
306{
307 cancelTimer();
308 if (!ec)
309 {
310 JLOG(journal_.error()) << "onShutdown: expected error condition";
311 return close();
312 }
313 if (ec != boost::asio::error::eof)
314 return fail("onShutdown", ec);
315 close();
316}
317
318//--------------------------------------------------------------------------
319
320void
322{
323 if (response_.result() == boost::beast::http::status::service_unavailable)
324 {
325 Json::Value json;
326 Json::Reader r;
327 std::string s;
328 s.reserve(boost::asio::buffer_size(response_.body().data()));
329 for (auto const buffer : response_.body().data())
330 s.append(
331 static_cast<char const*>(buffer.data()),
332 boost::asio::buffer_size(buffer));
333 auto const success = r.parse(s, json);
334 if (success)
335 {
336 if (json.isObject() && json.isMember("peer-ips"))
337 {
338 Json::Value const& ips = json["peer-ips"];
339 if (ips.isArray())
340 {
342 eps.reserve(ips.size());
343 for (auto const& v : ips)
344 {
345 if (v.isString())
346 {
347 error_code ec;
348 auto const ep = parse_endpoint(v.asString(), ec);
349 if (!ec)
350 eps.push_back(ep);
351 }
352 }
354 }
355 }
356 }
357 }
358
360 {
361 JLOG(journal_.info())
362 << "Unable to upgrade to peer protocol: " << response_.result()
363 << " (" << response_.reason() << ")";
364 return close();
365 }
366
367 // Just because our peer selected a particular protocol version doesn't
368 // mean that it's acceptable to us. Check that it is:
369 std::optional<ProtocolVersion> negotiatedProtocol;
370
371 {
372 auto const pvs = parseProtocolVersions(response_["Upgrade"]);
373
374 if (pvs.size() == 1 && isProtocolSupported(pvs[0]))
375 negotiatedProtocol = pvs[0];
376
377 if (!negotiatedProtocol)
378 return fail(
379 "processResponse: Unable to negotiate protocol version");
380 }
381
382 auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
383 if (!sharedValue)
384 return close(); // makeSharedValue logs
385
386 try
387 {
388 auto publicKey = verifyHandshake(
389 response_,
390 *sharedValue,
393 remote_endpoint_.address(),
394 app_);
395
396 JLOG(journal_.info())
397 << "Public Key: " << toBase58(TokenType::NodePublic, publicKey);
398
399 JLOG(journal_.debug())
400 << "Protocol: " << to_string(*negotiatedProtocol);
401
402 auto const member = app_.cluster().member(publicKey);
403 if (member)
404 {
405 JLOG(journal_.info()) << "Cluster name: " << *member;
406 }
407
408 auto const result = overlay_.peerFinder().activate(
409 slot_, publicKey, static_cast<bool>(member));
410 if (result != PeerFinder::Result::success)
411 return fail("Outbound " + std::string(to_string(result)));
412
413 auto const peer = std::make_shared<PeerImp>(
414 app_,
415 std::move(stream_ptr_),
416 read_buf_.data(),
417 std::move(slot_),
418 std::move(response_),
419 usage_,
420 publicKey,
421 *negotiatedProtocol,
422 id_,
423 overlay_);
424
425 overlay_.add_active(peer);
426 }
427 catch (std::exception const& e)
428 {
429 return fail(std::string("Handshake failure (") + e.what() + ")");
430 }
431}
432
433} // namespace ripple
T append(T... args)
T bind(T... args)
Unserialize a JSON document into a Value.
Definition json_reader.h:39
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Represents a JSON value.
Definition json_value.h:149
bool isArray() const
UInt size() const
Number of values in array or object.
bool isObject() const
bool isMember(char const *key) const
Return true if the object has a member named key.
A generic endpoint for log messages.
Definition Journal.h:60
Stream error() const
Definition Journal.h:346
Stream debug() const
Definition Journal.h:328
Stream info() const
Definition Journal.h:334
Stream trace() const
Severity stream access functions.
Definition Journal.h:322
virtual Config & config()=0
virtual Cluster & cluster()=0
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
Definition Cluster.cpp:38
bool VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE
Definition Config.h:248
bool LEDGER_REPLAY
Definition Config.h:223
bool TX_REDUCE_RELAY_ENABLE
Definition Config.h:258
bool COMPRESSION
Definition Config.h:220
std::uint32_t const id_
boost::asio::ip::tcp::socket socket_type
ConnectAttempt(Application &app, boost::asio::io_context &io_context, endpoint_type const &remote_endpoint, Resource::Consumer usage, shared_context const &context, std::uint32_t id, std::shared_ptr< PeerFinder::Slot > const &slot, beast::Journal journal, OverlayImpl &overlay)
boost::system::error_code error_code
std::unique_ptr< stream_type > stream_ptr_
std::shared_ptr< PeerFinder::Slot > slot_
Resource::Consumer usage_
boost::asio::strand< boost::asio::io_context::executor_type > strand_
void onHandshake(error_code ec)
boost::asio::ip::tcp::endpoint endpoint_type
void onWrite(error_code ec)
void onTimer(error_code ec)
void onShutdown(error_code ec)
boost::beast::ssl_stream< middle_type > stream_type
boost::beast::multi_buffer read_buf_
void onConnect(error_code ec)
void fail(std::string const &reason)
void onRead(error_code ec)
static boost::asio::ip::tcp::endpoint parse_endpoint(std::string const &s, boost::system::error_code &ec)
beast::Journal const journal_
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_
endpoint_type remote_endpoint_
PeerFinder::Manager & peerFinder()
void add_active(std::shared_ptr< PeerImp > const &peer)
static bool isPeerUpgrade(http_request_type const &request)
Setup const & setup() const
virtual bool onConnected(std::shared_ptr< Slot > const &slot, beast::IP::Endpoint const &local_endpoint)=0
Called when an outbound connection attempt succeeds.
virtual Config config()=0
Returns the configuration for the manager.
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
virtual void onRedirects(boost::asio::ip::tcp::endpoint const &remote_address, std::vector< boost::asio::ip::tcp::endpoint > const &eps)=0
Called when we received redirect IPs from a busy peer.
virtual Result activate(std::shared_ptr< Slot > const &slot, PublicKey const &key, bool reserved)=0
Request an active slot type.
An endpoint that consumes resources.
Definition Consumer.h:35
T is_same_v
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
std::vector< ProtocolVersion > parseProtocolVersions(boost::beast::string_view const &value)
Parse a set of protocol versions.
void buildHandshake(boost::beast::http::fields &h, ripple::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address public_ip, beast::IP::Address remote_ip, Application &app)
Insert fields headers necessary for upgrading the link to the peer protocol.
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:630
auto makeRequest(bool crawlPublic, bool comprEnabled, bool ledgerReplayEnabled, bool txReduceRelayEnabled, bool vpReduceRelayEnabled) -> request_type
Make outbound http request.
bool isProtocolSupported(ProtocolVersion const &v)
Determine whether we support a specific protocol version.
PublicKey verifyHandshake(boost::beast::http::fields const &headers, ripple::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address public_ip, beast::IP::Address remote, Application &app)
Validate header fields necessary for upgrading the link to the peer protocol.
STL namespace.
T push_back(T... args)
T reserve(T... args)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
beast::IP::Address public_ip
Definition Overlay.h:69
std::optional< std::uint32_t > networkID
Definition Overlay.h:72
bool peerPrivate
true if we want our IP address kept private.
T what(T... args)