diff --git a/src/etl/impl/SubscriptionSource.cpp b/src/etl/impl/SubscriptionSource.cpp index 1b6de5af4..e406a2391 100644 --- a/src/etl/impl/SubscriptionSource.cpp +++ b/src/etl/impl/SubscriptionSource.cpp @@ -233,8 +233,8 @@ SubscriptionSource::handleMessage(std::string const& message) } else { if (isForwarding_) { // Clio as rippled's proposed_transactions subscriber, will receive two jsons for - // each transaction 1 - Proposed transaction 2 - Validated transaction Only forward - // proposed transaction, validated transactions are sent by Clio itself + // each transaction 1 - Proposed transaction 2 - Validated transaction. + // Only forward proposed transaction, validated transactions are sent by Clio itself if (object.contains(JS(transaction)) and !object.contains(JS(meta))) { LOG(log_.debug()) << "Forwarding proposed transaction: " << object; subscriptions_->forwardProposedTransaction(object); diff --git a/src/feed/impl/ProposedTransactionFeed.cpp b/src/feed/impl/ProposedTransactionFeed.cpp index 9eedb2b7d..1b2d65412 100644 --- a/src/feed/impl/ProposedTransactionFeed.cpp +++ b/src/feed/impl/ProposedTransactionFeed.cpp @@ -1,12 +1,15 @@ #include "feed/impl/ProposedTransactionFeed.hpp" #include "feed/Types.hpp" +#include "rpc/JS.hpp" #include "rpc/RPCHelpers.hpp" #include "util/log/Logger.hpp" #include #include +#include #include +#include #include #include @@ -16,17 +19,30 @@ namespace feed::impl { +void +ProposedTransactionFeed::ProposedTransactionSlot::operator()( + AllVersionsMsgsPtrType const& allVersionMsgs +) const +{ + if (auto connectionPtr = subscriptionContextWeakPtr_.lock()) { + if (feed_.get().notified_.contains(connectionPtr.get())) + return; + + feed_.get().notified_.insert(connectionPtr.get()); + + if (connectionPtr->apiSubversion() < 2u) { + connectionPtr->send(std::shared_ptr(allVersionMsgs, &allVersionMsgs->v1)); + } else { + connectionPtr->send(std::shared_ptr(allVersionMsgs, &allVersionMsgs->v2)); + } + } +} + void ProposedTransactionFeed::sub(SubscriberSharedPtr const& subscriber) { - auto const weakPtr = std::weak_ptr(subscriber); - auto const added = signal_.connectTrackableSlot( - subscriber, [weakPtr](std::shared_ptr const& msg) { - if (auto connectionPtr = weakPtr.lock()) { - connectionPtr->send(msg); - } - } - ); + auto const added = + signal_.connectTrackableSlot(subscriber, ProposedTransactionSlot(*this, subscriber)); if (added) { LOG(logger_.info()) << subscriber->tag() << "Subscribed tx_proposed"; @@ -41,19 +57,10 @@ ProposedTransactionFeed::sub( SubscriberSharedPtr const& subscriber ) { - auto const weakPtr = std::weak_ptr(subscriber); auto const added = accountSignal_.connectTrackableSlot( - subscriber, account, [this, weakPtr](std::shared_ptr const& msg) { - if (auto connectionPtr = weakPtr.lock()) { - // Check if this connection already sent - if (notified_.contains(connectionPtr.get())) - return; - - notified_.insert(connectionPtr.get()); - connectionPtr->send(msg); - } - } + subscriber, account, ProposedTransactionSlot(*this, subscriber) ); + if (added) { LOG(logger_.info()) << subscriber->tag() << "Subscribed accounts_proposed " << account; ++subAccountCount_.get(); @@ -81,27 +88,48 @@ ProposedTransactionFeed::unsub( void ProposedTransactionFeed::pub(boost::json::object const& receivedTxJson) { - auto pubMsg = std::make_shared(boost::json::serialize(receivedTxJson)); + // v2: rename "transaction" → "tx_json", move "hash" to top level + auto const v2Json = [&]() { + boost::json::object v2Json = receivedTxJson; + if (v2Json.contains(JS(transaction))) { + boost::json::value txVal = v2Json.at(JS(transaction)); + v2Json.erase(JS(transaction)); + if (txVal.is_object()) { + auto& txObj = txVal.as_object(); + if (txObj.contains(JS(hash))) { + v2Json[JS(hash)] = txObj.at(JS(hash)); + txObj.erase(JS(hash)); + } + } + v2Json[JS(tx_json)] = std::move(txVal); + } + return v2Json; + }(); - auto const transaction = receivedTxJson.at("transaction").as_object(); + auto const allVersionMsgs = std::make_shared( + // v1: forward as-is (rippled sends "transaction" key) + boost::json::serialize(receivedTxJson), + boost::json::serialize(v2Json) + ); + + auto const transaction = receivedTxJson.at(JS(transaction)).as_object(); auto const accounts = rpc::getAccountsFromTransaction(transaction); auto affectedAccounts = std::unordered_set(accounts.cbegin(), accounts.cend()); - [[maybe_unused]] auto task = strand_.execute( - [this, pubMsg = std::move(pubMsg), affectedAccounts = std::move(affectedAccounts)]() { + [[maybe_unused]] auto task = + strand_.execute([this, allVersionMsgs, affectedAccounts = std::move(affectedAccounts)]() { notified_.clear(); - signal_.emit(pubMsg); + signal_.emit(allVersionMsgs); // Prevent the same connection from receiving the same message twice if it is subscribed - // to multiple accounts However, if the same connection subscribe both stream and + // to multiple accounts. However, if the same connection subscribes both stream and // account, it will still receive the message twice. notified_ can be cleared before // signal_ emit to improve this, but let's keep it as is for now, since rippled acts // like this. notified_.clear(); for (auto const& account : affectedAccounts) - accountSignal_.emit(account, pubMsg); - } - ); + accountSignal_.emit(account, allVersionMsgs); + }); } std::uint64_t diff --git a/src/feed/impl/ProposedTransactionFeed.hpp b/src/feed/impl/ProposedTransactionFeed.hpp index f34358472..517b0cdb8 100644 --- a/src/feed/impl/ProposedTransactionFeed.hpp +++ b/src/feed/impl/ProposedTransactionFeed.hpp @@ -27,9 +27,33 @@ namespace feed::impl { /** * @brief Feed that publishes the Proposed Transactions. - * @note Be aware that the Clio only forwards this stream, not respect api_version. */ class ProposedTransactionFeed { + // Hold two versions of transaction messages + struct AllVersionMsgsType { + std::string v1; + std::string v2; + }; + + using AllVersionsMsgsPtrType = std::shared_ptr; + + class ProposedTransactionSlot { + std::reference_wrapper feed_; + std::weak_ptr subscriptionContextWeakPtr_; + + public: + ProposedTransactionSlot( + ProposedTransactionFeed& feed, + SubscriberSharedPtr const& connection + ) + : feed_(feed), subscriptionContextWeakPtr_(connection) + { + } + + void + operator()(AllVersionsMsgsPtrType const& allVersionMsgs) const; + }; + util::Logger logger_{"Subscriptions"}; std::unordered_set notified_; // Used by slots to prevent double notifications @@ -38,10 +62,16 @@ class ProposedTransactionFeed { std::reference_wrapper subAllCount_; std::reference_wrapper subAccountCount_; - TrackableSignalMap> accountSignal_; - TrackableSignal> signal_; + TrackableSignalMap accountSignal_; + TrackableSignal signal_; public: + /** + * @brief Move constructor is deleted because ProposedTransactionSlot takes + * ProposedTransactionFeed by reference. + */ + ProposedTransactionFeed(ProposedTransactionFeed&&) = delete; + /** * @brief Construct a Proposed Transaction Feed object. * @param executionCtx The actual publish will be called in the strand of this. diff --git a/src/feed/impl/TransactionFeed.cpp b/src/feed/impl/TransactionFeed.cpp index af889612e..5b48f15a9 100644 --- a/src/feed/impl/TransactionFeed.cpp +++ b/src/feed/impl/TransactionFeed.cpp @@ -37,7 +37,9 @@ namespace feed::impl { void -TransactionFeed::TransactionSlot::operator()(AllVersionTransactionsType const& allVersionMsgs) const +TransactionFeed::TransactionSlot::operator()( + std::shared_ptr const& allVersionMsgs +) const { if (auto connection = subscriptionContextWeakPtr.lock(); connection) { // Check if this connection already sent @@ -47,10 +49,10 @@ TransactionFeed::TransactionSlot::operator()(AllVersionTransactionsType const& a feed.get().notified_.insert(connection.get()); if (connection->apiSubversion() < 2u) { - connection->send(allVersionMsgs[0]); + connection->send(std::shared_ptr(allVersionMsgs, &allVersionMsgs->v1)); return; } - connection->send(allVersionMsgs[1]); + connection->send(std::shared_ptr(allVersionMsgs, &allVersionMsgs->v2)); } } @@ -263,10 +265,9 @@ TransactionFeed::pub( return pubObj; }; - AllVersionTransactionsType allVersionsMsgs{ - std::make_shared(boost::json::serialize(genJsonByVersion(1u))), - std::make_shared(boost::json::serialize(genJsonByVersion(2u))) - }; + auto allVersionsMsgs = std::make_shared( + boost::json::serialize(genJsonByVersion(1u)), boost::json::serialize(genJsonByVersion(2u)) + ); auto const affectedAccountsFlat = meta->getAffectedAccounts(); auto affectedAccounts = std::unordered_set( diff --git a/src/feed/impl/TransactionFeed.hpp b/src/feed/impl/TransactionFeed.hpp index 7ca85a6c7..d55bc0d01 100644 --- a/src/feed/impl/TransactionFeed.hpp +++ b/src/feed/impl/TransactionFeed.hpp @@ -19,7 +19,6 @@ #include #include -#include #include #include #include @@ -30,7 +29,10 @@ namespace feed::impl { class TransactionFeed { // Hold two versions of transaction messages - using AllVersionTransactionsType = std::array, 2>; + struct AllVersionsMsgsType { + std::string v1; + std::string v2; + }; struct TransactionSlot { std::reference_wrapper feed; @@ -42,7 +44,7 @@ class TransactionFeed { } void - operator()(AllVersionTransactionsType const& allVersionMsgs) const; + operator()(std::shared_ptr const& allVersionMsgs) const; }; util::Logger logger_{"Subscriptions"}; @@ -52,15 +54,16 @@ class TransactionFeed { std::reference_wrapper subAccountCount_; std::reference_wrapper subBookCount_; - TrackableSignalMap + TrackableSignalMap const&> accountSignal_; - TrackableSignalMap bookSignal_; - TrackableSignal signal_; + TrackableSignalMap const&> + bookSignal_; + TrackableSignal const&> signal_; // Signals for proposed tx subscribers - TrackableSignalMap + TrackableSignalMap const&> accountProposedSignal_; - TrackableSignal txProposedSignal_; + TrackableSignal const&> txProposedSignal_; std::unordered_set notified_; // Used by slots to prevent double notifications // if tx contains multiple subscribed accounts diff --git a/tests/unit/feed/ProposedTransactionFeedTests.cpp b/tests/unit/feed/ProposedTransactionFeedTests.cpp index 677d4b2f1..96095caa2 100644 --- a/tests/unit/feed/ProposedTransactionFeedTests.cpp +++ b/tests/unit/feed/ProposedTransactionFeedTests.cpp @@ -36,6 +36,23 @@ constexpr auto kDUMMY_TRANSACTION = } })JSON"; +// Expected v2 format: "transaction" renamed to "tx_json", "hash" moved to top level +constexpr auto kDUMMY_TRANSACTION_V2 = + R"JSON({ + "hash": "F44393295DB860C6860769C16F5B23887762F09F87A8D1174E0FCFF9E7247F07", + "tx_json": { + "Account": "rh1HPuRVsYYvThxG2Bs1MfjmrVC73S16Fb", + "Amount": "40000000", + "Destination": "rLEsXccBGNR3UPuPu2hUXPjziKC3qKSBun", + "Fee": "20", + "Flags": 2147483648, + "Sequence": 13767283, + "SigningPubKey": "036F3CFFE1EA77C1EEC5DCCA38C83E62E3AC068F8A16369620AF1D609BA5A620B2", + "TransactionType": "Payment", + "TxnSignature": "30450221009BD0D563B24E50B26A42F30455AD21C3D5CD4D80174C41F7B54969FFC08DE94C02201FC35320B56D56D1E34D1D281D48AC68CBEDDD6EE9DFA639CCB08BB251453A87" + } + })JSON"; + } // namespace using namespace feed::impl; @@ -240,6 +257,80 @@ TEST_F(FeedProposedTransactionTest, AutoDisconnect) EXPECT_EQ(testFeedPtr->transactionSubcount(), 0); } +TEST_F(FeedProposedTransactionTest, ProposedTransactionV2) +{ + EXPECT_CALL(*mockSessionPtr, apiSubversion).WillOnce(testing::Return(2u)); + EXPECT_CALL(*mockSessionPtr, onDisconnect); + testFeedPtr->sub(sessionPtr); + + EXPECT_CALL(*mockSessionPtr, send(sharedStringJsonEq(kDUMMY_TRANSACTION_V2))); + testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); + + testFeedPtr->unsub(sessionPtr); + testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); +} + +TEST_F(FeedProposedTransactionTest, AccountProposedTransactionV2) +{ + auto const account = getAccountIdWithString(kACCOUNT1); + + EXPECT_CALL(*mockSessionPtr, apiSubversion).WillOnce(testing::Return(2u)); + EXPECT_CALL(*mockSessionPtr, onDisconnect); + testFeedPtr->sub(account, sessionPtr); + + EXPECT_CALL(*mockSessionPtr, send(sharedStringJsonEq(kDUMMY_TRANSACTION_V2))); + testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); + + testFeedPtr->unsub(account, sessionPtr); + testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); +} + +TEST_F(FeedProposedTransactionTest, MixedVersionSubscribers) +{ + auto sessionV2Ptr = std::make_shared(); + auto* mockSessionV2Ptr = dynamic_cast(sessionV2Ptr.get()); + + EXPECT_CALL(*mockSessionPtr, onDisconnect); + EXPECT_CALL(*mockSessionV2Ptr, onDisconnect); + testFeedPtr->sub(sessionPtr); + testFeedPtr->sub(sessionV2Ptr); + + EXPECT_CALL(*mockSessionPtr, apiSubversion).WillOnce(testing::Return(1u)); + EXPECT_CALL(*mockSessionV2Ptr, apiSubversion).WillOnce(testing::Return(2u)); + EXPECT_CALL(*mockSessionPtr, send(sharedStringJsonEq(kDUMMY_TRANSACTION))); + EXPECT_CALL(*mockSessionV2Ptr, send(sharedStringJsonEq(kDUMMY_TRANSACTION_V2))); + testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); +} + +TEST_F(FeedProposedTransactionTest, AccountProposedTransactionDuplicateV2) +{ + auto const account = getAccountIdWithString(kACCOUNT1); + auto const account2 = getAccountIdWithString(kACCOUNT2); + + EXPECT_CALL(*mockSessionPtr, onDisconnect).Times(2); + testFeedPtr->sub(account, sessionPtr); + testFeedPtr->sub(account2, sessionPtr); + + // Both accounts are affected; v2 subscriber should receive the message only once (dedup) + EXPECT_CALL(*mockSessionPtr, apiSubversion).WillOnce(testing::Return(2u)); + EXPECT_CALL(*mockSessionPtr, send(sharedStringJsonEq(kDUMMY_TRANSACTION_V2))); + testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); +} + +TEST_F(FeedProposedTransactionTest, SubStreamAndAccountV2) +{ + auto const account = getAccountIdWithString(kACCOUNT1); + + EXPECT_CALL(*mockSessionPtr, onDisconnect).Times(2); + testFeedPtr->sub(account, sessionPtr); + testFeedPtr->sub(sessionPtr); + + // Subscribed to both stream and account: receives message twice (matches v1 behaviour) + EXPECT_CALL(*mockSessionPtr, apiSubversion).WillRepeatedly(testing::Return(2u)); + EXPECT_CALL(*mockSessionPtr, send(sharedStringJsonEq(kDUMMY_TRANSACTION_V2))).Times(2); + testFeedPtr->pub(json::parse(kDUMMY_TRANSACTION).get_object()); +} + struct ProposedTransactionFeedMockPrometheusTest : WithMockPrometheus, SyncExecutionCtxFixture { protected: web::SubscriptionContextPtr sessionPtr_ = std::make_shared();