Fix TSAN issues part1 (#788)

Fixes a few issues from boost 1.82 migration and some Conan misconfigurations
This commit is contained in:
Alex Kremer
2023-07-26 21:39:39 +01:00
committed by GitHub
parent 02621fe02e
commit 68eec01dbc
18 changed files with 84 additions and 160 deletions

View File

@@ -1,5 +1,5 @@
find_package(OpenSSL 1.1.1 REQUIRED)
set_target_properties(OpenSSL::SSL PROPERTIES
INTERFACE_COMPILE_DEFINITIONS OPENSSL_NO_SSL2
)
find_package(OpenSSL 1.1.1 REQUIRED)

View File

@@ -38,23 +38,15 @@ Now you should be able to download prebuilt `xrpl` package on some platforms. At
conan remove -f xrpl/1.12.0-b2
conan remove -f cassandra-cpp-driver/2.16.2
```
3. In a clone of rippled from step 1, build rippled as per their instructions. pay attention to these commands:
```sh
conan export external/snappy snappy/1.1.9@
conan export external/soci soci/4.0.3@
mkdir build && cd build
conan install .. --output-folder . --build missing --settings build_type=Release
cmake -DCMAKE_TOOLCHAIN_FILE:FILEPATH=build/generators/conan_toolchain.cmake -DCMAKE_BUILD_TYPE=Release ..
cmake --build . --parallel 8 # or without the number if you feel extra adventurous
cd - # to go back to root of rippled
```
4. Perform this command at the root directory of rippled
3. In a clone of rippled from step 1, setup rippled as per their instructions. Note that there is no need to build rippled, only make it available to Conan. Pay attention to these commands:
```sh
conan export external/snappy
conan export external/soci
conan export .
```
this will export a local package `xrpl/1.12.0-b2`.
5. Navigate to clio's root directory and perform
4. Navigate to clio's root directory and perform
```sh
conan export external/cassandra # export our "custom" cassandra driver package
mkdir build && cd build
@@ -67,11 +59,10 @@ Please note that a few unittests are currently failing. See below.
## Things to fix
1. Figure out what to do with `ripple::Fees` that is now missing the `units` member. It was used in a few places and couple unittests are broken because of it.
2. Fix build on CI (currently using old CMake. need to use conan instead).
3. Fix code coverage support (see 'coverage' option in conanfile).
4. See if we can contribute/push our cassandra-cpp-driver to conan center so we don't need to export it before we able to use it.
5. Try to improve the new asio code that is using `async_compose` and potentially the `FutureWithCallback` way of accepting the callback.
1. Fix build on CI (currently using old CMake. need to use conan instead).
2. Fix code coverage support (see 'coverage' option in conanfile).
3. See if we can contribute/push our cassandra-cpp-driver to conan center so we don't need to export it before we able to use it.
4. Try to improve the new asio code that is using `async_compose` and potentially the `FutureWithCallback` way of accepting the callback.
# Clio
Clio is an XRP Ledger API server. Clio is optimized for RPC calls, over WebSocket or JSON-RPC. Validated

View File

@@ -93,38 +93,17 @@ template <class F>
auto
synchronous(F&& f)
{
/** @brief Serialized handlers and their execution.
*
* The ctx class is converted into a serialized handler, also named
* ctx, and is used to pass a stream of data into the method.
*/
boost::asio::io_context ctx;
boost::asio::strand<boost::asio::io_context::executor_type> strand(ctx.get_executor());
std::optional<boost::asio::io_context::work> work;
/*! @brief Place the ctx within the vector of serialized handlers. */
work.emplace(ctx);
/**
* @brief If/else statements regarding coroutine type matching.
*
* R is the currently executing coroutine that is about to get passed.
* If corountine types do not match, the current one's type is stored.
*/
using R = typename boost::result_of<F(boost::asio::yield_context&)>::type;
if constexpr (!std::is_same<R, void>::value)
{
/**
* @brief When the coroutine type is the same
*
* The spawn function enables programs to implement asynchronous logic
* in a synchronous manner. res stores the instance of the currently
* executing coroutine, yield. The different type is returned.
*/
R res;
boost::asio::spawn(strand, [&f, &work, &res](boost::asio::yield_context yield) {
boost::asio::spawn(
strand, [&f, &res, _ = boost::asio::make_work_guard(strand)](boost::asio::yield_context yield) {
res = f(yield);
work.reset();
;
});
ctx.run();
@@ -132,10 +111,9 @@ synchronous(F&& f)
}
else
{
/*! @brief When the corutine type is different, run as normal. */
boost::asio::spawn(strand, [&f, &work](boost::asio::yield_context yield) {
boost::asio::spawn(strand, [&f, _ = boost::asio::make_work_guard(strand)](boost::asio::yield_context yield) {
f(yield);
work.reset();
;
});
ctx.run();

View File

@@ -232,14 +232,16 @@ public:
future.emplace(handle_.get().asyncExecute(
statements, [sself = std::make_shared<Self>(std::move(self))](auto&& res) mutable {
boost::asio::post(
sself->get_io_executor(), [sself = std::move(sself), res = std::move(res)]() mutable {
boost::asio::get_associated_executor(*sself),
[sself = std::move(sself), res = std::move(res)]() mutable {
sself->complete(std::move(res));
sself.reset();
});
}));
};
auto res = boost::asio::async_compose<CompletionTokenType, void(ResultOrErrorType)>(init, token);
auto res = boost::asio::async_compose<CompletionTokenType, void(ResultOrErrorType)>(
init, token, boost::asio::get_associated_executor(token));
numReadRequestsOutstanding_ -= numStatements;
if (res)
@@ -276,19 +278,20 @@ public:
// TODO: see if we can avoid using shared_ptr for self here
auto init = [this, &statement, &future]<typename Self>(Self& self) {
future.emplace(handle_.get().asyncExecute(
statement, [sself = std::make_shared<Self>(std::move(self))](auto&& res) mutable {
statement, [sself = std::make_shared<Self>(std::move(self))](auto&&) mutable {
boost::asio::post(
sself->get_io_executor(), [sself = std::move(sself), res = std::move(res)]() mutable {
sself->complete(std::move(res));
boost::asio::get_associated_executor(*sself), [sself = std::move(sself)]() mutable {
sself->complete();
sself.reset();
});
}));
};
auto res = boost::asio::async_compose<CompletionTokenType, void(ResultOrErrorType)>(init, token);
boost::asio::async_compose<CompletionTokenType, void()>(
init, token, boost::asio::get_associated_executor(token));
--numReadRequestsOutstanding_;
if (res)
if (auto res = future->get(); res)
{
return res;
}
@@ -329,7 +332,8 @@ public:
// when all async operations complete unblock the result
if (--numOutstanding == 0)
boost::asio::post(sself->get_io_executor(), [sself = std::move(sself)]() mutable {
boost::asio::post(
boost::asio::get_associated_executor(*sself), [sself = std::move(sself)]() mutable {
sself->complete();
sself.reset();
});
@@ -344,7 +348,8 @@ public:
});
};
boost::asio::async_compose<CompletionTokenType, void()>(init, token);
boost::asio::async_compose<CompletionTokenType, void()>(
init, token, boost::asio::get_associated_executor(token));
numReadRequestsOutstanding_ -= statements.size();
if (hadError)

View File

@@ -46,7 +46,7 @@ public:
/**
* @brief Create a new retry policy instance with the io_context provided
*/
ExponentialBackoffRetryPolicy(boost::asio::io_context& ioc) : timer_{ioc}
ExponentialBackoffRetryPolicy(boost::asio::io_context& ioc) : timer_{boost::asio::make_strand(ioc)}
{
}

View File

@@ -79,12 +79,13 @@ SourceImpl<Derived>::reconnect(boost::beast::error_code ec)
{
err = std::string(" (") + boost::lexical_cast<std::string>(ERR_GET_LIB(ec.value())) + "," +
boost::lexical_cast<std::string>(ERR_GET_REASON(ec.value())) + ") ";
// ERR_PACK /* crypto/err/err.h */
char buf[128];
::ERR_error_string_n(ec.value(), buf, sizeof(buf));
err += buf;
std::cout << err << std::endl;
log_.error() << err;
}
if (ec != boost::asio::error::operation_aborted && ec != boost::asio::error::connection_refused)
@@ -361,9 +362,7 @@ SourceImpl<Derived>::onRead(boost::beast::error_code ec, size_t size)
}
else
{
handleMessage();
boost::beast::flat_buffer buffer;
swap(readBuffer_, buffer);
handleMessage(size);
log_.trace() << "calling async_read - " << toString();
derived().ws().async_read(readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); });
@@ -372,7 +371,7 @@ SourceImpl<Derived>::onRead(boost::beast::error_code ec, size_t size)
template <class Derived>
bool
SourceImpl<Derived>::handleMessage()
SourceImpl<Derived>::handleMessage(size_t size)
{
log_.trace() << toString();
@@ -380,41 +379,40 @@ SourceImpl<Derived>::handleMessage()
connected_ = true;
try
{
std::string msg{static_cast<char const*>(readBuffer_.data().data()), readBuffer_.size()};
log_.trace() << msg;
boost::json::value raw = boost::json::parse(msg);
log_.trace() << "parsed";
boost::json::object response = raw.as_object();
auto const msg = boost::beast::buffers_to_string(readBuffer_.data());
readBuffer_.consume(size);
auto const raw = boost::json::parse(msg);
auto const response = raw.as_object();
uint32_t ledgerIndex = 0;
if (response.contains("result"))
{
boost::json::object result = response["result"].as_object();
auto const& result = response.at("result").as_object();
if (result.contains("ledger_index"))
{
ledgerIndex = result["ledger_index"].as_int64();
}
ledgerIndex = result.at("ledger_index").as_int64();
if (result.contains("validated_ledgers"))
{
boost::json::string const& validatedLedgers = result["validated_ledgers"].as_string();
setValidatedRange({validatedLedgers.c_str(), validatedLedgers.size()});
auto const& validatedLedgers = result.at("validated_ledgers").as_string();
setValidatedRange({validatedLedgers.data(), validatedLedgers.size()});
}
log_.info() << "Received a message on ledger "
<< " subscription stream. Message : " << response << " - " << toString();
}
else if (response.contains("type") && response["type"] == "ledgerClosed")
else if (response.contains("type") && response.at("type") == "ledgerClosed")
{
log_.info() << "Received a message on ledger "
<< " subscription stream. Message : " << response << " - " << toString();
if (response.contains("ledger_index"))
{
ledgerIndex = response["ledger_index"].as_int64();
ledgerIndex = response.at("ledger_index").as_int64();
}
if (response.contains("validated_ledgers"))
{
boost::json::string const& validatedLedgers = response["validated_ledgers"].as_string();
setValidatedRange({validatedLedgers.c_str(), validatedLedgers.size()});
auto const& validatedLedgers = response.at("validated_ledgers").as_string();
setValidatedRange({validatedLedgers.data(), validatedLedgers.size()});
}
}
else
@@ -426,11 +424,11 @@ SourceImpl<Derived>::handleMessage()
forwardCache_.freshen();
subscriptions_->forwardProposedTransaction(response);
}
else if (response.contains("type") && response["type"] == "validationReceived")
else if (response.contains("type") && response.at("type") == "validationReceived")
{
subscriptions_->forwardValidation(response);
}
else if (response.contains("type") && response["type"] == "manifestReceived")
else if (response.contains("type") && response.at("type") == "manifestReceived")
{
subscriptions_->forwardManifest(response);
}

View File

@@ -190,7 +190,7 @@ public:
, balancer_(balancer)
, forwardCache_(config, ioContext, *this)
, ioc_(ioContext)
, timer_(ioContext)
, timer_(boost::asio::make_strand(ioContext))
, hooks_(hooks)
{
static boost::uuids::random_generator uuidGenerator;
@@ -446,7 +446,7 @@ public:
* @return true if the message was handled successfully. false on error
*/
bool
handleMessage();
handleMessage(size_t size);
/**
* @brief Forward a request to rippled

View File

@@ -202,7 +202,7 @@ try
auto workQueue = WorkQueue::make_WorkQueue(config);
auto counters = RPC::Counters::make_Counters(workQueue);
auto const handlerProvider = std::make_shared<RPC::detail::ProductionHandlerProvider const>(
backend, subscriptions, balancer, etl, counters, config);
config, backend, subscriptions, balancer, etl, counters);
auto const rpcEngine = RPC::RPCEngine::make_RPCEngine(
config, backend, subscriptions, balancer, etl, dosGuard, workQueue, counters, handlerProvider);

View File

@@ -56,12 +56,12 @@
namespace RPC::detail {
ProductionHandlerProvider::ProductionHandlerProvider(
clio::Config const& config,
std::shared_ptr<BackendInterface> const& backend,
std::shared_ptr<SubscriptionManager> const& subscriptionManager,
std::shared_ptr<LoadBalancer> const& balancer,
std::shared_ptr<ETLService const> const& etl,
Counters const& counters,
clio::Config const& config)
Counters const& counters)
: handlerMap_{
{"account_channels", {AccountChannelsHandler{backend}}},
{"account_currencies", {AccountCurrenciesHandler{backend}}},

View File

@@ -50,12 +50,12 @@ class ProductionHandlerProvider final : public HandlerProvider
public:
ProductionHandlerProvider(
clio::Config const& config,
std::shared_ptr<BackendInterface> const& backend,
std::shared_ptr<SubscriptionManager> const& subscriptionManager,
std::shared_ptr<LoadBalancer> const& balancer,
std::shared_ptr<ETLService const> const& etl,
Counters const& counters,
clio::Config const& config);
Counters const& counters);
bool
contains(std::string const& method) const override;

View File

@@ -251,7 +251,7 @@ class IntervalSweepHandler
std::reference_wrapper<boost::asio::io_context> ctx_;
BaseDOSGuard* dosGuard_ = nullptr;
boost::asio::steady_timer timer_{ctx_.get()};
boost::asio::steady_timer timer_{boost::asio::make_strand(ctx_.get())};
public:
/**

View File

@@ -150,7 +150,7 @@ public:
, tagFactory_(std::move(tagFactory))
, dosGuard_(std::ref(dosGuard))
, handler_(callback)
, acceptor_(boost::asio::make_strand(ioc))
, acceptor_(boost::asio::make_strand(ioc.get_executor()))
{
boost::beast::error_code ec;
@@ -190,7 +190,7 @@ private:
doAccept()
{
acceptor_.async_accept(
boost::asio::make_strand(ioc_.get()),
boost::asio::make_strand(ioc_.get().get_executor()),
boost::beast::bind_front_handler(&Server::onAccept, shared_from_this()));
}

View File

@@ -32,6 +32,7 @@ using namespace Backend::Cassandra;
namespace json = boost::json;
// TODO: get rid of cout and use expectations instead to verify output
class BackendCassandraBaseTest : public NoLoggerFixture
{
protected:

View File

@@ -74,6 +74,12 @@ struct FakeFuture
{
return data;
}
FakeMaybeError
await() const
{
return {};
}
};
struct FakeFutureWithCallback : public FakeFuture

View File

@@ -23,7 +23,6 @@
#include <rpc/common/AnyHandler.h>
#include <rpc/handlers/VersionHandler.h>
constexpr static auto DEFAULT_API_VERSION = 3u;
constexpr static auto MIN_API_VERSION = 2u;
constexpr static auto MAX_API_VERSION = 10u;

View File

@@ -167,13 +167,16 @@ struct SyncAsioContextTest : virtual public NoLoggerFixture
void
runSpawn(F&& f)
{
using namespace boost::asio;
auto called = false;
auto work = std::optional<boost::asio::io_context::work>{ctx};
boost::asio::spawn(ctx, [&](boost::asio::yield_context yield) {
auto strand = make_strand(ctx.get_executor());
spawn(strand, [&, _ = make_work_guard(strand)](yield_context yield) {
f(yield);
called = true;
work.reset();
});
ctx.run();
ASSERT_TRUE(called);
ctx.reset();

View File

@@ -37,14 +37,10 @@ struct HttpSyncClient
{
boost::asio::io_context ioc;
// These objects perform our I/O
net::ip::tcp::resolver resolver(ioc);
boost::beast::tcp_stream stream(ioc);
// Look up the domain name
auto const results = resolver.resolve(host, port);
// Make the connection on the IP address we get from a lookup
stream.connect(results);
http::request<http::string_body> req{http::verb::post, "/", 10};
@@ -52,18 +48,12 @@ struct HttpSyncClient
req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
req.body() = std::string(body);
req.prepare_payload();
// Send the HTTP request to the remote host
http::write(stream, req);
// This buffer is used for reading and must be persisted
boost::beast::flat_buffer buffer;
// Declare a container to hold the response
http::response<http::string_body> res;
// Receive the HTTP response
http::read(stream, buffer, res);
// Gracefully close the socket
boost::beast::error_code ec;
stream.socket().shutdown(tcp::socket::shutdown_both, ec);
@@ -73,10 +63,7 @@ struct HttpSyncClient
class WebSocketSyncClient
{
// The io_context is required for all I/O
net::io_context ioc_;
// These objects perform our I/O
tcp::resolver resolver_{ioc_};
boost::beast::websocket::stream<tcp::socket> ws_{ioc_};
@@ -84,10 +71,7 @@ public:
void
connect(std::string const& host, std::string const& port)
{
// Look up the domain name
auto const results = resolver_.resolve(host, port);
// Make the connection on the IP address we get from a lookup
auto const ep = net::connect(ws_.next_layer(), results);
// Update the host_ string. This will provide the value of the
@@ -95,12 +79,10 @@ public:
// See https://tools.ietf.org/html/rfc7230#section-5.4
auto const hostPort = host + ':' + std::to_string(ep.port());
// Set a decorator to change the User-Agent of the handshake
ws_.set_option(boost::beast::websocket::stream_base::decorator([](boost::beast::websocket::request_type& req) {
req.set(http::field::user_agent, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-coro");
}));
// Perform the websocket handshake
ws_.handshake(hostPort, "/");
}
@@ -113,13 +95,9 @@ public:
std::string
syncPost(std::string const& body)
{
// Send the message
ws_.write(net::buffer(std::string(body)));
// This buffer will hold the incoming message
boost::beast::flat_buffer buffer;
// Read a message into our buffer
ws_.write(net::buffer(std::string(body)));
ws_.read(buffer);
return boost::beast::buffers_to_string(buffer.data());
@@ -137,60 +115,38 @@ struct HttpsSyncClient
static std::string
syncPost(std::string const& host, std::string const& port, std::string const& body)
{
// The io_context is required for all I/O
net::io_context ioc;
boost::asio::ssl::context ctx(boost::asio::ssl::context::sslv23);
ctx.set_default_verify_paths();
// Verify the remote server's certificate
ctx.set_verify_mode(ssl::verify_none);
// These objects perform our I/O
tcp::resolver resolver(ioc);
boost::beast::ssl_stream<boost::beast::tcp_stream> stream(ioc, ctx);
// disable ssl verification just for testing
// stream.set_verify_callback(HttpsSyncClient::verify_certificate);
// Set SNI Hostname (many hosts need this to handshake successfully)
if (!SSL_set_tlsext_host_name(stream.native_handle(), host.c_str()))
{
boost::beast::error_code ec{static_cast<int>(::ERR_get_error()), net::error::get_ssl_category()};
throw boost::beast::system_error{ec};
}
// Look up the domain name
auto const results = resolver.resolve(host, port);
// Make the connection on the IP address we get from a lookup
boost::beast::get_lowest_layer(stream).connect(results);
// Perform the SSL handshake
stream.handshake(ssl::stream_base::client);
// Set up an HTTP GET request message
http::request<http::string_body> req{http::verb::post, "/", 10};
req.set(http::field::host, host);
req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
req.body() = std::string(body);
req.prepare_payload();
// Send the HTTP request to the remote host
http::write(stream, req);
// This buffer is used for reading and must be persisted
boost::beast::flat_buffer buffer;
// Declare a container to hold the response
http::response<http::string_body> res;
// Receive the HTTP response
http::read(stream, buffer, res);
// Write the message to standard out
std::cout << res << std::endl;
// Gracefully close the stream
boost::beast::error_code ec;
stream.shutdown(ec);
return std::string(res.body());
}
};
@@ -206,27 +162,19 @@ public:
{
boost::asio::ssl::context ctx(boost::asio::ssl::context::sslv23);
ctx.set_default_verify_paths();
// Verify the remote server's certificate
ctx.set_verify_mode(ssl::verify_none);
// These objects perform our I/O
tcp::resolver resolver{ioc_};
ws_.emplace(ioc_, ctx);
// Look up the domain name
auto const results = resolver.resolve(host, port);
// Make the connection on the IP address we get from a lookup
net::connect(ws_->next_layer().next_layer(), results.begin(), results.end());
// Perform the SSL handshake
ws_->next_layer().handshake(ssl::stream_base::client);
// Set a decorator to change the User-Agent of the handshake
ws_->set_option(boost::beast::websocket::stream_base::decorator([](boost::beast::websocket::request_type& req) {
req.set(http::field::user_agent, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-coro");
}));
// Perform the websocket handshake
ws_->handshake(host, "/");
}
@@ -239,13 +187,8 @@ public:
std::string
syncPost(std::string const& body)
{
// Send the message
ws_->write(net::buffer(std::string(body)));
// This buffer will hold the incoming message
boost::beast::flat_buffer buffer;
// Read a message into our buffer
ws_->write(net::buffer(std::string(body)));
ws_->read(buffer);
return boost::beast::buffers_to_string(buffer.data());