#include "etl/impl/SubscriptionSource.hpp" #include "util/AsioContextTestFixture.hpp" #include "util/MockNetworkValidatedLedgers.hpp" #include "util/MockPrometheus.hpp" #include "util/MockSubscriptionManager.hpp" #include "util/Spawn.hpp" #include "util/TestWsServer.hpp" #include "util/prometheus/Gauge.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace etl::impl; using testing::MockFunction; using testing::StrictMock; struct SubscriptionSourceConnectionTestsBase : SyncAsioContextTest { SubscriptionSourceConnectionTestsBase() { subscriptionSource_.run(); } void stopSubscriptionSource() { util::spawn(ctx_, [this](auto&& yield) { subscriptionSource_.stop(yield); }); } [[maybe_unused]] TestWsConnection serverConnection(boost::asio::yield_context yield) { // The first one is an SSL attempt auto failedConnection = wsServer_.acceptConnection(yield); [&]() { ASSERT_FALSE(failedConnection); }(); auto connection = wsServer_.acceptConnection(yield); [&]() { ASSERT_TRUE(connection) << connection.error().message(); }(); auto message = connection->receive(yield); [&]() { ASSERT_TRUE(message); EXPECT_EQ( message.value(), R"JSON({"command":"subscribe","streams":["ledger","manifests","validations","transactions_proposed"]})JSON" ); }(); return std::move(connection).value(); } protected: TestWsServer wsServer_{ctx_, "0.0.0.0"}; StrictMockNetworkValidatedLedgersPtr networkValidatedLedgers_; StrictMockSubscriptionManagerSharedPtr subscriptionManager_; StrictMock> onConnectHook_; StrictMock> onDisconnectHook_; StrictMock> onLedgerClosedHook_; SubscriptionSource subscriptionSource_{ ctx_, "127.0.0.1", wsServer_.port(), networkValidatedLedgers_, subscriptionManager_, onConnectHook_.AsStdFunction(), onDisconnectHook_.AsStdFunction(), onLedgerClosedHook_.AsStdFunction(), std::chrono::milliseconds(5), std::chrono::milliseconds(5) }; }; struct SubscriptionSourceConnectionTests : util::prometheus::WithPrometheus, SubscriptionSourceConnectionTestsBase {}; TEST_F(SubscriptionSourceConnectionTests, ConnectionFailed) { EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); }); runContext(); } TEST_F(SubscriptionSourceConnectionTests, ConnectionFailed_Retry_ConnectionFailed) { EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { stopSubscriptionSource(); }); runContext(); } TEST_F(SubscriptionSourceConnectionTests, ReadError) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = serverConnection(yield); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); }); runContext(); } TEST_F(SubscriptionSourceConnectionTests, ReadTimeout) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = serverConnection(yield); std::this_thread::sleep_for(std::chrono::milliseconds{10}); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); }); runContext(); } TEST_F(SubscriptionSourceConnectionTests, ReadError_Reconnect) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { for (int i = 0; i < 2; ++i) { auto connection = serverConnection(yield); connection.close(yield); } }); EXPECT_CALL(onConnectHook_, Call()).Times(2); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { stopSubscriptionSource(); }); runContext(); } TEST_F(SubscriptionSourceConnectionTests, IsConnected) { EXPECT_FALSE(subscriptionSource_.isConnected()); util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = serverConnection(yield); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()).WillOnce([this]() { EXPECT_TRUE(subscriptionSource_.isConnected()); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { EXPECT_FALSE(subscriptionSource_.isConnected()); stopSubscriptionSource(); }); runContext(); } struct SubscriptionSourceReadTestsBase : public SubscriptionSourceConnectionTestsBase { [[maybe_unused]] TestWsConnection connectAndSendMessage(std::string const message, boost::asio::yield_context yield) { auto connection = serverConnection(yield); auto error = connection.send(message, yield); [&]() { ASSERT_FALSE(error) << *error; }(); return connection; } }; struct SubscriptionSourceReadTests : util::prometheus::WithPrometheus, SubscriptionSourceReadTestsBase {}; TEST_F(SubscriptionSourceReadTests, GotWrongMessage_Reconnect) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage("something", yield); // We have to schedule receiving to receive close frame and boost will handle it // automatically connection.receive(yield); serverConnection(yield); }); EXPECT_CALL(onConnectHook_, Call()).Times(2); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { stopSubscriptionSource(); }); runContext(); } TEST_F(SubscriptionSourceReadTests, GotResult) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage(R"JSON({"result":{})JSON", yield); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); }); runContext(); } TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndex) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage(R"JSON({"result":{"ledger_index":123}})JSON", yield); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); }); EXPECT_CALL(*networkValidatedLedgers_, push(123)); runContext(); } TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndexAsString_Reconnect) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage(R"JSON({"result":{"ledger_index":"123"}})JSON", yield); // We have to schedule receiving to receive close frame and boost will handle it // automatically connection.receive(yield); serverConnection(yield); }); EXPECT_CALL(onConnectHook_, Call()).Times(2); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { stopSubscriptionSource(); }); runContext(); } TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgersAsNumber_Reconnect) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage(R"JSON({"result":{"validated_ledgers":123}})JSON", yield); // We have to schedule receiving to receive close frame and boost will handle it // automatically connection.receive(yield); serverConnection(yield); }); EXPECT_CALL(onConnectHook_, Call()).Times(2); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { stopSubscriptionSource(); }); runContext(); } TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgers) { EXPECT_FALSE(subscriptionSource_.hasLedger(123)); EXPECT_FALSE(subscriptionSource_.hasLedger(124)); EXPECT_FALSE(subscriptionSource_.hasLedger(455)); EXPECT_FALSE(subscriptionSource_.hasLedger(456)); EXPECT_FALSE(subscriptionSource_.hasLedger(457)); EXPECT_FALSE(subscriptionSource_.hasLedger(32)); EXPECT_FALSE(subscriptionSource_.hasLedger(31)); EXPECT_FALSE(subscriptionSource_.hasLedger(789)); EXPECT_FALSE(subscriptionSource_.hasLedger(790)); util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage( R"JSON({"result":{"validated_ledgers":"123-456,789,32"}})JSON", yield ); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); }); runContext(); EXPECT_TRUE(subscriptionSource_.hasLedger(123)); EXPECT_TRUE(subscriptionSource_.hasLedger(124)); EXPECT_TRUE(subscriptionSource_.hasLedger(455)); EXPECT_TRUE(subscriptionSource_.hasLedger(456)); EXPECT_FALSE(subscriptionSource_.hasLedger(457)); EXPECT_TRUE(subscriptionSource_.hasLedger(32)); EXPECT_FALSE(subscriptionSource_.hasLedger(31)); EXPECT_TRUE(subscriptionSource_.hasLedger(789)); EXPECT_FALSE(subscriptionSource_.hasLedger(790)); EXPECT_EQ(subscriptionSource_.validatedRange(), "123-456,789,32"); } TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgersWrongValue_Reconnect) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage( R"JSON({"result":{"validated_ledgers":"123-456-789,32"}})JSON", yield ); // We have to schedule receiving to receive close frame and boost will handle it // automatically connection.receive(yield); serverConnection(yield); }); EXPECT_CALL(onConnectHook_, Call()).Times(2); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { stopSubscriptionSource(); }); runContext(); } TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndexAndValidatedLedgers) { EXPECT_FALSE(subscriptionSource_.hasLedger(1)); EXPECT_FALSE(subscriptionSource_.hasLedger(1)); EXPECT_FALSE(subscriptionSource_.hasLedger(2)); EXPECT_FALSE(subscriptionSource_.hasLedger(3)); EXPECT_FALSE(subscriptionSource_.hasLedger(4)); util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage( R"JSON({"result":{"ledger_index":123,"validated_ledgers":"1-3"}})JSON", yield ); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); }); EXPECT_CALL(*networkValidatedLedgers_, push(123)); runContext(); EXPECT_EQ(subscriptionSource_.validatedRange(), "1-3"); EXPECT_FALSE(subscriptionSource_.hasLedger(0)); EXPECT_TRUE(subscriptionSource_.hasLedger(1)); EXPECT_TRUE(subscriptionSource_.hasLedger(2)); EXPECT_TRUE(subscriptionSource_.hasLedger(3)); EXPECT_FALSE(subscriptionSource_.hasLedger(4)); } TEST_F(SubscriptionSourceReadTests, GotLedgerClosed) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage(R"JSON({"type":"ledgerClosed"})JSON", yield); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); }); runContext(); } TEST_F(SubscriptionSourceReadTests, GotLedgerClosedForwardingIsSet) { subscriptionSource_.setForwarding(true); util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage(R"JSON({"type": "ledgerClosed"})JSON", yield); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onLedgerClosedHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { EXPECT_FALSE(subscriptionSource_.isForwarding()); stopSubscriptionSource(); }); runContext(); } TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndex) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage(R"JSON({"type": "ledgerClosed","ledger_index": 123})JSON", yield); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); }); EXPECT_CALL(*networkValidatedLedgers_, push(123)); runContext(); } TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndexAsString_Reconnect) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage( R"JSON({"type":"ledgerClosed","ledger_index":"123"}})JSON", yield ); // We have to schedule receiving to receive close frame and boost will handle it // automatically connection.receive(yield); serverConnection(yield); }); EXPECT_CALL(onConnectHook_, Call()).Times(2); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { stopSubscriptionSource(); }); runContext(); } TEST_F(SubscriptionSourceReadTests, GorLedgerClosedWithValidatedLedgersAsNumber_Reconnect) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage( R"JSON({"type":"ledgerClosed","validated_ledgers":123})JSON", yield ); // We have to schedule receiving to receive close frame and boost will handle it // automatically connection.receive(yield); serverConnection(yield); }); EXPECT_CALL(onConnectHook_, Call()).Times(2); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { stopSubscriptionSource(); }); runContext(); } TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithValidatedLedgers) { EXPECT_FALSE(subscriptionSource_.hasLedger(0)); EXPECT_FALSE(subscriptionSource_.hasLedger(1)); EXPECT_FALSE(subscriptionSource_.hasLedger(2)); EXPECT_FALSE(subscriptionSource_.hasLedger(3)); util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage( R"JSON({"type":"ledgerClosed","validated_ledgers":"1-2"})JSON", yield ); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); }); runContext(); EXPECT_FALSE(subscriptionSource_.hasLedger(0)); EXPECT_TRUE(subscriptionSource_.hasLedger(1)); EXPECT_TRUE(subscriptionSource_.hasLedger(2)); EXPECT_FALSE(subscriptionSource_.hasLedger(3)); EXPECT_EQ(subscriptionSource_.validatedRange(), "1-2"); } TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndexAndValidatedLedgers) { EXPECT_FALSE(subscriptionSource_.hasLedger(0)); EXPECT_FALSE(subscriptionSource_.hasLedger(1)); EXPECT_FALSE(subscriptionSource_.hasLedger(2)); EXPECT_FALSE(subscriptionSource_.hasLedger(3)); util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage( R"JSON({"type":"ledgerClosed","ledger_index":123,"validated_ledgers":"1-2"})JSON", yield ); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); }); EXPECT_CALL(*networkValidatedLedgers_, push(123)); runContext(); EXPECT_FALSE(subscriptionSource_.hasLedger(0)); EXPECT_TRUE(subscriptionSource_.hasLedger(1)); EXPECT_TRUE(subscriptionSource_.hasLedger(2)); EXPECT_FALSE(subscriptionSource_.hasLedger(3)); EXPECT_EQ(subscriptionSource_.validatedRange(), "1-2"); } TEST_F(SubscriptionSourceReadTests, GotTransactionIsForwardingFalse) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage(R"JSON({"transaction":"some_transaction_data"})JSON", yield); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); }); runContext(); } TEST_F(SubscriptionSourceReadTests, GotTransactionIsForwardingTrue) { subscriptionSource_.setForwarding(true); boost::json::object const message = {{"transaction", "some_transaction_data"}}; util::spawn(ctx_, [&message, this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage(boost::json::serialize(message), yield); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { stopSubscriptionSource(); }); EXPECT_CALL(*subscriptionManager_, forwardProposedTransaction(message)); runContext(); } TEST_F(SubscriptionSourceReadTests, GotTransactionWithMetaIsForwardingFalse) { subscriptionSource_.setForwarding(true); boost::json::object const message = { {"transaction", "some_transaction_data"}, {"meta", "some_meta_data"} }; util::spawn(ctx_, [&message, this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage(boost::json::serialize(message), yield); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { stopSubscriptionSource(); }); EXPECT_CALL(*subscriptionManager_, forwardProposedTransaction(message)).Times(0); runContext(); } TEST_F(SubscriptionSourceReadTests, GotValidationReceivedIsForwardingFalse) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage(R"JSON({"type":"validationReceived"})JSON", yield); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); }); runContext(); } TEST_F(SubscriptionSourceReadTests, GotValidationReceivedIsForwardingTrue) { subscriptionSource_.setForwarding(true); boost::json::object const message = {{"type", "validationReceived"}}; util::spawn(ctx_, [&message, this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage(boost::json::serialize(message), yield); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { stopSubscriptionSource(); }); EXPECT_CALL(*subscriptionManager_, forwardValidation(message)); runContext(); } TEST_F(SubscriptionSourceReadTests, GotManiefstReceivedIsForwardingFalse) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage(R"JSON({"type":"manifestReceived"})JSON", yield); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); }); runContext(); } TEST_F(SubscriptionSourceReadTests, GotManifestReceivedIsForwardingTrue) { subscriptionSource_.setForwarding(true); boost::json::object const message = {{"type", "manifestReceived"}}; util::spawn(ctx_, [&message, this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage(boost::json::serialize(message), yield); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { stopSubscriptionSource(); }); EXPECT_CALL(*subscriptionManager_, forwardManifest(message)); runContext(); } TEST_F(SubscriptionSourceReadTests, LastMessageTime) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage("some_message", yield); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); }); runContext(); auto const actualLastTimeMessage = subscriptionSource_.lastMessageTime(); auto const now = std::chrono::steady_clock::now(); auto const diff = std::chrono::duration_cast(now - actualLastTimeMessage); EXPECT_LT(diff, std::chrono::milliseconds(100)); } struct SubscriptionSourcePrometheusCounterTests : util::prometheus::WithMockPrometheus, SubscriptionSourceReadTestsBase {}; TEST_F(SubscriptionSourcePrometheusCounterTests, LastMessageTime) { auto& lastMessageTimeMock = makeMock( "subscription_source_last_message_time", fmt::format("{{source=\"127.0.0.1:{}\"}}", wsServer_.port()) ); util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto connection = connectAndSendMessage("some_message", yield); connection.close(yield); }); EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); }); EXPECT_CALL(lastMessageTimeMock, set).WillOnce([](int64_t value) { auto const now = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch() ) .count(); EXPECT_LE(now - value, 1); }); runContext(); }