#include "feed/FeedTestUtil.hpp" #include "feed/impl/ProposedTransactionFeed.hpp" #include "util/MockPrometheus.hpp" #include "util/MockWsBase.hpp" #include "util/SyncExecutionCtxFixture.hpp" #include "util/TestObject.hpp" #include "util/prometheus/Gauge.hpp" #include "web/SubscriptionContextInterface.hpp" #include #include #include #include #include #include namespace { constexpr auto kACCOUNT1 = "rh1HPuRVsYYvThxG2Bs1MfjmrVC73S16Fb"; constexpr auto kACCOUNT2 = "rLEsXccBGNR3UPuPu2hUXPjziKC3qKSBun"; constexpr auto kACCOUNT3 = "r92yNeoiCdwULRbjh6cUBEbD71iHcqe1hE"; constexpr auto kDUMMY_TRANSACTION = R"JSON({ "transaction": { "Account": "rh1HPuRVsYYvThxG2Bs1MfjmrVC73S16Fb", "Amount": "40000000", "Destination": "rLEsXccBGNR3UPuPu2hUXPjziKC3qKSBun", "Fee": "20", "Flags": 2147483648, "Sequence": 13767283, "SigningPubKey": "036F3CFFE1EA77C1EEC5DCCA38C83E62E3AC068F8A16369620AF1D609BA5A620B2", "TransactionType": "Payment", "TxnSignature": "30450221009BD0D563B24E50B26A42F30455AD21C3D5CD4D80174C41F7B54969FFC08DE94C02201FC35320B56D56D1E34D1D281D48AC68CBEDDD6EE9DFA639CCB08BB251453A87", "hash": "F44393295DB860C6860769C16F5B23887762F09F87A8D1174E0FCFF9E7247F07" } })JSON"; // Expected v2 format: "transaction" renamed to "tx_json", "hash" moved to top level constexpr auto kDUMMY_TRANSACTION_V2 = R"JSON({ "hash": "F44393295DB860C6860769C16F5B23887762F09F87A8D1174E0FCFF9E7247F07", "tx_json": { "Account": "rh1HPuRVsYYvThxG2Bs1MfjmrVC73S16Fb", "Amount": "40000000", "Destination": "rLEsXccBGNR3UPuPu2hUXPjziKC3qKSBun", "Fee": "20", "Flags": 2147483648, "Sequence": 13767283, "SigningPubKey": "036F3CFFE1EA77C1EEC5DCCA38C83E62E3AC068F8A16369620AF1D609BA5A620B2", "TransactionType": "Payment", "TxnSignature": "30450221009BD0D563B24E50B26A42F30455AD21C3D5CD4D80174C41F7B54969FFC08DE94C02201FC35320B56D56D1E34D1D281D48AC68CBEDDD6EE9DFA639CCB08BB251453A87" } })JSON"; } // namespace using namespace feed::impl; namespace json = boost::json; using namespace util::prometheus; using FeedProposedTransactionTest = FeedBaseTest; TEST_F(FeedProposedTransactionTest, ProposedTransaction) { EXPECT_CALL(*mockSessionPtr, onDisconnect); testFeedPtr->sub(sessionPtr); EXPECT_EQ(testFeedPtr->transactionSubcount(), 1); EXPECT_CALL(*mockSessionPtr, send(sharedStringJsonEq(kDUMMY_TRANSACTION))); testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); testFeedPtr->unsub(sessionPtr); EXPECT_EQ(testFeedPtr->transactionSubcount(), 0); testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); } TEST_F(FeedProposedTransactionTest, AccountProposedTransaction) { auto const account = getAccountIdWithString(kACCOUNT1); EXPECT_CALL(*mockSessionPtr, onDisconnect); testFeedPtr->sub(account, sessionPtr); EXPECT_EQ(testFeedPtr->accountSubCount(), 1); web::SubscriptionContextPtr const sessionIdle = std::make_shared(); auto const accountIdle = getAccountIdWithString(kACCOUNT3); EXPECT_CALL(*dynamic_cast(sessionIdle.get()), onDisconnect); testFeedPtr->sub(accountIdle, sessionIdle); EXPECT_EQ(testFeedPtr->accountSubCount(), 2); EXPECT_CALL(*mockSessionPtr, send(sharedStringJsonEq(kDUMMY_TRANSACTION))); testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); // unsub testFeedPtr->unsub(account, sessionPtr); EXPECT_EQ(testFeedPtr->accountSubCount(), 1); testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); } TEST_F(FeedProposedTransactionTest, SubStreamAndAccount) { auto const account = getAccountIdWithString(kACCOUNT1); EXPECT_CALL(*mockSessionPtr, onDisconnect).Times(2); testFeedPtr->sub(account, sessionPtr); testFeedPtr->sub(sessionPtr); EXPECT_EQ(testFeedPtr->accountSubCount(), 1); EXPECT_EQ(testFeedPtr->transactionSubcount(), 1); EXPECT_CALL(*mockSessionPtr, send(sharedStringJsonEq(kDUMMY_TRANSACTION))).Times(2); testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); // unsub testFeedPtr->unsub(account, sessionPtr); EXPECT_EQ(testFeedPtr->accountSubCount(), 0); EXPECT_CALL(*mockSessionPtr, send(sharedStringJsonEq(kDUMMY_TRANSACTION))); testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); // unsub transaction testFeedPtr->unsub(sessionPtr); EXPECT_EQ(testFeedPtr->transactionSubcount(), 0); testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); } TEST_F(FeedProposedTransactionTest, AccountProposedTransactionDuplicate) { auto const account = getAccountIdWithString(kACCOUNT1); auto const account2 = getAccountIdWithString(kACCOUNT2); EXPECT_CALL(*mockSessionPtr, onDisconnect).Times(2); testFeedPtr->sub(account, sessionPtr); testFeedPtr->sub(account2, sessionPtr); EXPECT_EQ(testFeedPtr->accountSubCount(), 2); EXPECT_CALL(*mockSessionPtr, send(sharedStringJsonEq(kDUMMY_TRANSACTION))); testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); // unsub account1 testFeedPtr->unsub(account, sessionPtr); EXPECT_EQ(testFeedPtr->accountSubCount(), 1); EXPECT_CALL(*mockSessionPtr, send(sharedStringJsonEq(kDUMMY_TRANSACTION))); testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); // unsub account2 testFeedPtr->unsub(account2, sessionPtr); EXPECT_EQ(testFeedPtr->accountSubCount(), 0); testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); } TEST_F(FeedProposedTransactionTest, Count) { EXPECT_CALL(*mockSessionPtr, onDisconnect); testFeedPtr->sub(sessionPtr); // repeat testFeedPtr->sub(sessionPtr); EXPECT_EQ(testFeedPtr->transactionSubcount(), 1); auto const account1 = getAccountIdWithString(kACCOUNT1); EXPECT_CALL(*mockSessionPtr, onDisconnect); testFeedPtr->sub(account1, sessionPtr); // repeat testFeedPtr->sub(account1, sessionPtr); EXPECT_EQ(testFeedPtr->accountSubCount(), 1); auto const sessionPtr2 = std::make_shared(); EXPECT_CALL(*dynamic_cast(sessionPtr2.get()), onDisconnect); testFeedPtr->sub(sessionPtr2); EXPECT_EQ(testFeedPtr->transactionSubcount(), 2); auto const account2 = getAccountIdWithString(kACCOUNT2); EXPECT_CALL(*dynamic_cast(sessionPtr2.get()), onDisconnect); testFeedPtr->sub(account2, sessionPtr2); EXPECT_EQ(testFeedPtr->accountSubCount(), 2); EXPECT_CALL(*dynamic_cast(sessionPtr2.get()), onDisconnect); testFeedPtr->sub(account1, sessionPtr2); EXPECT_EQ(testFeedPtr->accountSubCount(), 3); testFeedPtr->unsub(sessionPtr); EXPECT_EQ(testFeedPtr->transactionSubcount(), 1); // unsub unsubscribed account testFeedPtr->unsub(account2, sessionPtr); EXPECT_EQ(testFeedPtr->accountSubCount(), 3); testFeedPtr->unsub(account1, sessionPtr); EXPECT_EQ(testFeedPtr->accountSubCount(), 2); testFeedPtr->unsub(account1, sessionPtr2); EXPECT_EQ(testFeedPtr->accountSubCount(), 1); testFeedPtr->unsub(account2, sessionPtr2); EXPECT_EQ(testFeedPtr->accountSubCount(), 0); } TEST_F(FeedProposedTransactionTest, AutoDisconnect) { std::vector sessionOnDisconnectSlots; ON_CALL(*mockSessionPtr, onDisconnect).WillByDefault([&sessionOnDisconnectSlots](auto slot) { sessionOnDisconnectSlots.push_back(slot); }); EXPECT_CALL(*mockSessionPtr, onDisconnect); testFeedPtr->sub(sessionPtr); // repeat testFeedPtr->sub(sessionPtr); EXPECT_EQ(testFeedPtr->transactionSubcount(), 1); auto const account1 = getAccountIdWithString(kACCOUNT1); EXPECT_CALL(*mockSessionPtr, onDisconnect); testFeedPtr->sub(account1, sessionPtr); // repeat testFeedPtr->sub(account1, sessionPtr); EXPECT_EQ(testFeedPtr->accountSubCount(), 1); auto sessionPtr2 = std::make_shared(); auto mockSessionPtr2 = dynamic_cast(sessionPtr2.get()); std::vector session2OnDisconnectSlots; ON_CALL(*mockSessionPtr2, onDisconnect).WillByDefault([&session2OnDisconnectSlots](auto slot) { session2OnDisconnectSlots.push_back(slot); }); EXPECT_CALL(*mockSessionPtr2, onDisconnect); testFeedPtr->sub(sessionPtr2); EXPECT_EQ(testFeedPtr->transactionSubcount(), 2); auto const account2 = getAccountIdWithString(kACCOUNT2); EXPECT_CALL(*mockSessionPtr2, onDisconnect); testFeedPtr->sub(account2, sessionPtr2); EXPECT_EQ(testFeedPtr->accountSubCount(), 2); EXPECT_CALL(*mockSessionPtr2, onDisconnect); testFeedPtr->sub(account1, sessionPtr2); EXPECT_EQ(testFeedPtr->accountSubCount(), 3); std::ranges::for_each(session2OnDisconnectSlots, [&sessionPtr2](auto& slot) { slot(sessionPtr2.get()); }); sessionPtr2.reset(); EXPECT_EQ(testFeedPtr->accountSubCount(), 1); EXPECT_EQ(testFeedPtr->transactionSubcount(), 1); std::ranges::for_each(sessionOnDisconnectSlots, [this](auto& slot) { slot(sessionPtr.get()); }); sessionPtr.reset(); EXPECT_EQ(testFeedPtr->accountSubCount(), 0); EXPECT_EQ(testFeedPtr->transactionSubcount(), 0); } TEST_F(FeedProposedTransactionTest, ProposedTransactionV2) { EXPECT_CALL(*mockSessionPtr, apiSubversion).WillOnce(testing::Return(2u)); EXPECT_CALL(*mockSessionPtr, onDisconnect); testFeedPtr->sub(sessionPtr); EXPECT_CALL(*mockSessionPtr, send(sharedStringJsonEq(kDUMMY_TRANSACTION_V2))); testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); testFeedPtr->unsub(sessionPtr); testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); } TEST_F(FeedProposedTransactionTest, AccountProposedTransactionV2) { auto const account = getAccountIdWithString(kACCOUNT1); EXPECT_CALL(*mockSessionPtr, apiSubversion).WillOnce(testing::Return(2u)); EXPECT_CALL(*mockSessionPtr, onDisconnect); testFeedPtr->sub(account, sessionPtr); EXPECT_CALL(*mockSessionPtr, send(sharedStringJsonEq(kDUMMY_TRANSACTION_V2))); testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); testFeedPtr->unsub(account, sessionPtr); testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); } TEST_F(FeedProposedTransactionTest, MixedVersionSubscribers) { auto sessionV2Ptr = std::make_shared(); auto* mockSessionV2Ptr = dynamic_cast(sessionV2Ptr.get()); EXPECT_CALL(*mockSessionPtr, onDisconnect); EXPECT_CALL(*mockSessionV2Ptr, onDisconnect); testFeedPtr->sub(sessionPtr); testFeedPtr->sub(sessionV2Ptr); EXPECT_CALL(*mockSessionPtr, apiSubversion).WillOnce(testing::Return(1u)); EXPECT_CALL(*mockSessionV2Ptr, apiSubversion).WillOnce(testing::Return(2u)); EXPECT_CALL(*mockSessionPtr, send(sharedStringJsonEq(kDUMMY_TRANSACTION))); EXPECT_CALL(*mockSessionV2Ptr, send(sharedStringJsonEq(kDUMMY_TRANSACTION_V2))); testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); } TEST_F(FeedProposedTransactionTest, AccountProposedTransactionDuplicateV2) { auto const account = getAccountIdWithString(kACCOUNT1); auto const account2 = getAccountIdWithString(kACCOUNT2); EXPECT_CALL(*mockSessionPtr, onDisconnect).Times(2); testFeedPtr->sub(account, sessionPtr); testFeedPtr->sub(account2, sessionPtr); // Both accounts are affected; v2 subscriber should receive the message only once (dedup) EXPECT_CALL(*mockSessionPtr, apiSubversion).WillOnce(testing::Return(2u)); EXPECT_CALL(*mockSessionPtr, send(sharedStringJsonEq(kDUMMY_TRANSACTION_V2))); testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); } TEST_F(FeedProposedTransactionTest, SubStreamAndAccountV2) { auto const account = getAccountIdWithString(kACCOUNT1); EXPECT_CALL(*mockSessionPtr, onDisconnect).Times(2); testFeedPtr->sub(account, sessionPtr); testFeedPtr->sub(sessionPtr); // Subscribed to both stream and account: receives message twice (matches v1 behaviour) EXPECT_CALL(*mockSessionPtr, apiSubversion).WillRepeatedly(testing::Return(2u)); EXPECT_CALL(*mockSessionPtr, send(sharedStringJsonEq(kDUMMY_TRANSACTION_V2))).Times(2); testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); } struct ProposedTransactionFeedMockPrometheusTest : WithMockPrometheus, SyncExecutionCtxFixture { protected: web::SubscriptionContextPtr sessionPtr_ = std::make_shared(); std::shared_ptr testFeedPtr_ = std::make_shared(ctx_); MockSession* mockSessionPtr_ = dynamic_cast(sessionPtr_.get()); }; TEST_F(ProposedTransactionFeedMockPrometheusTest, subUnsub) { auto& counterTx = makeMock("subscriptions_current_number", "{stream=\"tx_proposed\"}"); auto& counterAccount = makeMock("subscriptions_current_number", "{stream=\"account_proposed\"}"); EXPECT_CALL(counterTx, add(1)); EXPECT_CALL(counterTx, add(-1)); EXPECT_CALL(counterAccount, add(1)); EXPECT_CALL(counterAccount, add(-1)); EXPECT_CALL(*mockSessionPtr_, onDisconnect); testFeedPtr_->sub(sessionPtr_); testFeedPtr_->unsub(sessionPtr_); auto const account = getAccountIdWithString(kACCOUNT1); EXPECT_CALL(*mockSessionPtr_, onDisconnect); testFeedPtr_->sub(account, sessionPtr_); testFeedPtr_->unsub(account, sessionPtr_); } TEST_F(ProposedTransactionFeedMockPrometheusTest, AutoDisconnect) { auto& counterTx = makeMock("subscriptions_current_number", "{stream=\"tx_proposed\"}"); auto& counterAccount = makeMock("subscriptions_current_number", "{stream=\"account_proposed\"}"); std::vector sessionOnDisconnectSlots; EXPECT_CALL(counterTx, add(1)); EXPECT_CALL(counterTx, add(-1)); EXPECT_CALL(counterAccount, add(1)); EXPECT_CALL(counterAccount, add(-1)); EXPECT_CALL(*mockSessionPtr_, onDisconnect).WillOnce([&sessionOnDisconnectSlots](auto slot) { sessionOnDisconnectSlots.push_back(slot); }); testFeedPtr_->sub(sessionPtr_); auto const account = getAccountIdWithString(kACCOUNT1); EXPECT_CALL(*mockSessionPtr_, onDisconnect).WillOnce([&sessionOnDisconnectSlots](auto slot) { sessionOnDisconnectSlots.push_back(slot); }); testFeedPtr_->sub(account, sessionPtr_); std::ranges::for_each(sessionOnDisconnectSlots, [this](auto& slot) { slot(sessionPtr_.get()); }); sessionPtr_.reset(); }