Destroy websocket on reconnect when using SslETLSource

This commit is contained in:
Nathan Nichols
2021-08-02 10:59:53 -05:00
committed by CJ Cobb
parent ec7608680e
commit c8681cc0fa
2 changed files with 84 additions and 29 deletions

View File

@@ -21,8 +21,8 @@ ETLSourceImpl<Derived>::ETLSourceImpl(
std::shared_ptr<NetworkValidatedLedgers> networkValidatedLedgers,
ETLLoadBalancer& balancer)
: ioc_(ioContext)
, resolver_(boost::asio::make_strand(ioc_))
, timer_(ioc_)
, resolver_(boost::asio::make_strand(ioContext))
, timer_(ioContext)
, networkValidatedLedgers_(networkValidatedLedgers)
, backend_(backend)
, subscriptions_(subscriptions)
@@ -107,13 +107,12 @@ ETLSourceImpl<Derived>::reconnect(boost::beast::error_code ec)
timer_.async_wait([this](auto ec) {
bool startAgain = (ec != boost::asio::error::operation_aborted);
BOOST_LOG_TRIVIAL(trace) << __func__ << " async_wait : ec = " << ec;
close(startAgain);
derived().close(startAgain);
});
}
template <class Derived>
void
ETLSourceImpl<Derived>::close(bool startAgain)
PlainETLSource::close(bool startAgain)
{
timer_.cancel();
ioc_.post([this, startAgain]() {
@@ -147,6 +146,54 @@ ETLSourceImpl<Derived>::close(bool startAgain)
});
}
void
SslETLSource::close(bool startAgain)
{
timer_.cancel();
ioc_.post([this, startAgain]() {
if (closing_)
return;
if (derived().ws().is_open())
{
// onStop() also calls close(). If the async_close is called twice,
// an assertion fails. Using closing_ makes sure async_close is only
// called once
closing_ = true;
derived().ws().async_close(
boost::beast::websocket::close_code::normal,
[this, startAgain](auto ec) {
if (ec)
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << " async_close : "
<< "error code = " << ec << " - " << toString();
}
closing_ = false;
if (startAgain)
{
ws_ = std::make_unique<boost::beast::websocket::stream<
boost::beast::ssl_stream<
boost::beast::tcp_stream>>>(
boost::asio::make_strand(ioc_), *sslCtx_);
run();
}
});
}
else if (startAgain)
{
ws_ = std::make_unique<boost::beast::websocket::stream<
boost::beast::ssl_stream<
boost::beast::tcp_stream>>>(
boost::asio::make_strand(ioc_), *sslCtx_);
run();
}
});
}
template <class Derived>
void
ETLSourceImpl<Derived>::onResolve(

View File

@@ -69,8 +69,6 @@ class ETLSourceImpl : public ETLSource
std::string grpcPort_;
boost::asio::io_context& ioc_;
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> stub_;
boost::asio::ip::tcp::resolver resolver_;
@@ -87,8 +85,6 @@ class ETLSourceImpl : public ETLSource
mutable std::mutex mtx_;
std::atomic_bool closing_{false};
std::atomic_bool connected_{false};
// true if this ETL source is forwarding transactions received on the
@@ -101,13 +97,28 @@ class ETLSourceImpl : public ETLSource
std::chrono::system_clock::time_point lastMsgTime_;
mutable std::mutex lastMsgTimeMtx_;
// used for retrying connections
boost::asio::steady_timer timer_;
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<SubscriptionManager> subscriptions_;
ETLLoadBalancer& balancer_;
protected:
Derived&
derived()
{
return static_cast<Derived&>(*this);
}
std::string ip_;
size_t numFailures_ = 0;
boost::asio::io_context& ioc_;
// used for retrying connections
boost::asio::steady_timer timer_;
std::atomic_bool closing_{false};
void
run() override
{
@@ -121,17 +132,6 @@ class ETLSourceImpl : public ETLSource
});
}
protected:
Derived&
derived()
{
return static_cast<Derived&>(*this);
}
std::string ip_;
size_t numFailures_ = 0;
public:
~ETLSourceImpl()
@@ -315,11 +315,6 @@ public:
bool
handleMessage();
/// Close the websocket
/// @param startAgain whether to reconnect
void
close(bool startAgain);
/// Get grpc stub to forward requests to rippled node
/// @return stub to send requests to ETL source
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
@@ -355,6 +350,11 @@ public:
boost::beast::error_code ec,
boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint) override;
/// Close the websocket
/// @param startAgain whether to reconnect
void
close(bool startAgain);
boost::beast::websocket::stream<boost::beast::tcp_stream>&
ws()
{
@@ -364,6 +364,8 @@ public:
class SslETLSource : public ETLSourceImpl<SslETLSource>
{
std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx_;
std::unique_ptr<boost::beast::websocket::stream<boost::beast::ssl_stream<
boost::beast::tcp_stream>>> ws_;
@@ -377,10 +379,11 @@ public:
std::shared_ptr<NetworkValidatedLedgers> nwvl,
ETLLoadBalancer& balancer)
: ETLSourceImpl(config, ioc, backend, subscriptions, nwvl, balancer)
, sslCtx_(sslCtx)
, ws_(std::make_unique<
boost::beast::websocket::stream<boost::beast::ssl_stream<
boost::beast::tcp_stream>>>(
boost::asio::make_strand(ioc), *sslCtx))
boost::asio::make_strand(ioc_), *sslCtx_))
{
}
@@ -394,6 +397,11 @@ public:
boost::beast::error_code ec,
boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint);
/// Close the websocket
/// @param startAgain whether to reconnect
void
close(bool startAgain);
boost::beast::websocket::stream<boost::beast::ssl_stream<boost::beast::tcp_stream>>&
ws()
{