removes socket_ and stream_ member attributes

This commit is contained in:
Vito
2025-09-02 15:45:50 +02:00
parent e0b9812fc5
commit 0ac24f5f73
2 changed files with 30 additions and 28 deletions

View File

@@ -82,10 +82,8 @@ PeerImp::PeerImp(
, journal_(sink_)
, p_journal_(p_sink_)
, stream_ptr_(std::move(stream_ptr))
, socket_(stream_ptr_->next_layer().socket())
, stream_(*stream_ptr_)
, strand_(boost::asio::make_strand(socket_.get_executor()))
, timer_(waitable_timer{socket_.get_executor()})
, strand_(boost::asio::make_strand(stream_ptr_->get_executor()))
, timer_(waitable_timer{stream_ptr_->get_executor()})
, remote_address_(slot->remote_endpoint())
, overlay_(overlay)
, inbound_(true)
@@ -215,7 +213,7 @@ PeerImp::stop()
{
if (!strand_.running_in_this_thread())
return post(strand_, std::bind(&PeerImp::stop, shared_from_this()));
if (socket_.is_open())
if (socketOpen())
{
// The rationale for using different severity levels is that
// outbound connections are under our control and may be logged
@@ -288,7 +286,7 @@ PeerImp::send(std::shared_ptr<Message> const& m)
return;
boost::asio::async_write(
stream_,
*stream_ptr_,
boost::asio::buffer(
send_queue_.front()->getBuffer(compressionEnabled_)),
bind_executor(
@@ -300,6 +298,12 @@ PeerImp::send(std::shared_ptr<Message> const& m)
std::placeholders::_2)));
}
bool
PeerImp::socketOpen() const
{
return stream_ptr_->next_layer().socket().is_open();
}
void
PeerImp::sendTxQueue()
{
@@ -578,13 +582,13 @@ PeerImp::close()
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::close : strand in this thread");
if (socket_.is_open())
if (socketOpen())
{
detaching_ = true; // DEPRECATED
try
{
timer_.cancel();
socket_.close();
stream_ptr_->lowest_layer().close();
}
catch (boost::system::system_error const&)
{
@@ -613,7 +617,7 @@ PeerImp::fail(std::string const& reason)
(void(Peer::*)(std::string const&)) & PeerImp::fail,
shared_from_this(),
reason));
if (journal_.active(beast::severities::kWarning) && socket_.is_open())
if (journal_.active(beast::severities::kWarning) && socketOpen())
{
std::string const n = name();
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
@@ -628,7 +632,7 @@ PeerImp::fail(std::string const& name, error_code ec)
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::fail : strand in this thread");
if (socket_.is_open())
if (socketOpen())
{
JLOG(journal_.warn())
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_)
@@ -644,7 +648,7 @@ PeerImp::gracefulClose()
strand_.running_in_this_thread(),
"ripple::PeerImp::gracefulClose : strand in this thread");
XRPL_ASSERT(
socket_.is_open(), "ripple::PeerImp::gracefulClose : socket is open");
socketOpen(), "ripple::PeerImp::gracefulClose : socket is open");
XRPL_ASSERT(
!gracefulClose_,
"ripple::PeerImp::gracefulClose : socket is not closing");
@@ -652,7 +656,7 @@ PeerImp::gracefulClose()
if (send_queue_.size() > 0)
return;
setTimer();
stream_.async_shutdown(bind_executor(
stream_ptr_->async_shutdown(bind_executor(
strand_,
std::bind(
&PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
@@ -703,7 +707,7 @@ PeerImp::makePrefix(id_t id)
void
PeerImp::onTimer(error_code const& ec)
{
if (!socket_.is_open())
if (!socketOpen())
return;
if (ec == boost::asio::error::operation_aborted)
@@ -826,14 +830,14 @@ PeerImp::doAccept()
// Write the whole buffer and only start protocol when that's done.
boost::asio::async_write(
stream_,
*stream_ptr_,
write_buffer->data(),
boost::asio::transfer_all(),
bind_executor(
strand_,
[this, write_buffer, self = shared_from_this()](
error_code ec, std::size_t bytes_transferred) {
if (!socket_.is_open())
if (!socketOpen())
return;
if (ec == boost::asio::error::operation_aborted)
return;
@@ -903,7 +907,7 @@ PeerImp::doProtocolStart()
void
PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
{
if (!socket_.is_open())
if (!socketOpen())
return;
if (ec == boost::asio::error::operation_aborted)
return;
@@ -943,7 +947,7 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
if (ec)
return fail("onReadMessage", ec);
if (!socket_.is_open())
if (!socketOpen())
return;
if (gracefulClose_)
return;
@@ -953,7 +957,7 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
}
// Timeout on writes only
stream_.async_read_some(
stream_ptr_->async_read_some(
read_buffer_.prepare(std::max(Tuning::readBufferBytes, hint)),
bind_executor(
strand_,
@@ -967,7 +971,7 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
void
PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
{
if (!socket_.is_open())
if (!socketOpen())
return;
if (ec == boost::asio::error::operation_aborted)
return;
@@ -991,7 +995,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
{
// Timeout on writes only
return boost::asio::async_write(
stream_,
*stream_ptr_,
boost::asio::buffer(
send_queue_.front()->getBuffer(compressionEnabled_)),
bind_executor(
@@ -1005,7 +1009,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
if (gracefulClose_)
{
return stream_.async_shutdown(bind_executor(
return stream_ptr_->async_shutdown(bind_executor(
strand_,
std::bind(
&PeerImp::onShutdown,

View File

@@ -60,7 +60,6 @@ public:
private:
using clock_type = std::chrono::steady_clock;
using error_code = boost::system::error_code;
using socket_type = boost::asio::ip::tcp::socket;
using middle_type = boost::beast::tcp_stream;
using stream_type = boost::beast::ssl_stream<middle_type>;
using address_type = boost::asio::ip::address;
@@ -76,8 +75,6 @@ private:
beast::Journal const journal_;
beast::Journal const p_journal_;
std::unique_ptr<stream_type> stream_ptr_;
socket_type& socket_;
stream_type& stream_;
boost::asio::strand<boost::asio::executor> strand_;
waitable_timer timer_;
@@ -519,6 +516,9 @@ private:
handleHaveTransactions(
std::shared_ptr<protocol::TMHaveTransactions> const& m);
bool
socketOpen() const;
public:
//--------------------------------------------------------------------------
//
@@ -667,10 +667,8 @@ PeerImp::PeerImp(
, journal_(sink_)
, p_journal_(p_sink_)
, stream_ptr_(std::move(stream_ptr))
, socket_(stream_ptr_->next_layer().socket())
, stream_(*stream_ptr_)
, strand_(boost::asio::make_strand(socket_.get_executor()))
, timer_(waitable_timer{socket_.get_executor()})
, strand_(boost::asio::make_strand(stream_ptr_->get_executor()))
, timer_(waitable_timer{stream_ptr_->get_executor()})
, remote_address_(slot->remote_endpoint())
, overlay_(overlay)
, inbound_(false)