Fix: Duplicate messages when subscribe both accounts and proposed_accounts (#1415)

Fix #1134
This commit is contained in:
cyan317
2024-05-21 09:04:46 +01:00
committed by GitHub
parent 36c6caa7c0
commit df17b429c5
8 changed files with 386 additions and 4 deletions

View File

@@ -240,7 +240,11 @@ SubscriptionSource::handleMessage(std::string const& message)
} else {
if (isForwarding_) {
if (object.contains(JS(transaction))) {
// Clio as rippled's proposed_transactions subscirber, will receive two jsons for each transaction
// 1 - Proposed transaction
// 2 - Validated transaction
// Only forward proposed transaction, validated transactions are sent by Clio itself
if (object.contains(JS(transaction)) and !object.contains(JS(meta))) {
subscriptions_->forwardProposedTransaction(object);
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ValidationReceived) {
subscriptions_->forwardValidation(object);

View File

@@ -59,24 +59,32 @@ void
SubscriptionManager::subProposedTransactions(SubscriberSharedPtr const& subscriber)
{
proposedTransactionFeed_.sub(subscriber);
// proposed_transactions subscribers not only receive the transaction json when it is proposed, but also the
// transaction json when it is validated. So the subscriber also subscribes to the transaction feed.
transactionFeed_.subProposed(subscriber);
}
void
SubscriptionManager::unsubProposedTransactions(SubscriberSharedPtr const& subscriber)
{
proposedTransactionFeed_.unsub(subscriber);
transactionFeed_.unsubProposed(subscriber);
}
void
SubscriptionManager::subProposedAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber)
{
proposedTransactionFeed_.sub(account, subscriber);
// Same as proposed_transactions subscribers, proposed_account subscribers also subscribe to the transaction feed to
// receive validated transaction feed. TransactionFeed class will filter out the sessions that have been sent to.
transactionFeed_.subProposed(account, subscriber);
}
void
SubscriptionManager::unsubProposedAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber)
{
proposedTransactionFeed_.unsub(account, subscriber);
transactionFeed_.unsubProposed(account, subscriber);
}
void

View File

@@ -102,6 +102,7 @@ ProposedTransactionFeed::pub(boost::json::object const& receivedTxJson)
auto affectedAccounts = std::unordered_set<ripple::AccountID>(accounts.cbegin(), accounts.cend());
boost::asio::post(strand_, [this, pubMsg = std::move(pubMsg), affectedAccounts = std::move(affectedAccounts)]() {
notified_.clear();
signal_.emit(pubMsg);
// Prevent the same connection from receiving the same message twice if it is subscribed to multiple accounts
// However, if the same connection subscribe both stream and account, it will still receive the message twice.

View File

@@ -93,6 +93,27 @@ TransactionFeed::sub(ripple::AccountID const& account, SubscriberSharedPtr const
}
}
void
TransactionFeed::subProposed(SubscriberSharedPtr const& subscriber)
{
auto const added = txProposedsignal_.connectTrackableSlot(subscriber, TransactionSlot(*this, subscriber));
if (added) {
subscriber->onDisconnect.connect([this](SubscriberPtr connection) { unsubProposedInternal(connection); });
}
}
void
TransactionFeed::subProposed(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber)
{
auto const added =
accountProposedSignal_.connectTrackableSlot(subscriber, account, TransactionSlot(*this, subscriber));
if (added) {
subscriber->onDisconnect.connect([this, account](SubscriberPtr connection) {
unsubProposedInternal(account, connection);
});
}
}
void
TransactionFeed::sub(ripple::Book const& book, SubscriberSharedPtr const& subscriber)
{
@@ -116,6 +137,18 @@ TransactionFeed::unsub(ripple::AccountID const& account, SubscriberSharedPtr con
unsubInternal(account, subscriber.get());
}
void
TransactionFeed::unsubProposed(SubscriberSharedPtr const& subscriber)
{
unsubProposedInternal(subscriber.get());
}
void
TransactionFeed::unsubProposed(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber)
{
unsubProposedInternal(account, subscriber.get());
}
void
TransactionFeed::unsub(ripple::Book const& book, SubscriberSharedPtr const& subscriber)
{
@@ -251,14 +284,19 @@ TransactionFeed::pub(
affectedBooks = std::move(affectedBooks)]() {
notified_.clear();
signal_.emit(allVersionsMsgs);
// clear the notified set. If the same connection subscribes both transactions + proposed_transactions,
// rippled SENDS the same message twice
notified_.clear();
// check duplicate for accounts, this prevents sending the same message multiple times if it touches
// multiple accounts watched by the same connection
txProposedsignal_.emit(allVersionsMsgs);
notified_.clear();
// check duplicate for account and proposed_account, this prevents sending the same message multiple times
// if it affects multiple accounts watched by the same connection
for (auto const& account : affectedAccounts) {
accountSignal_.emit(account, allVersionsMsgs);
accountProposedSignal_.emit(account, allVersionsMsgs);
}
notified_.clear();
// check duplicate for books, this prevents sending the same message multiple times if it touches multiple
// check duplicate for books, this prevents sending the same message multiple times if it affects multiple
// books watched by the same connection
for (auto const& book : affectedBooks) {
bookSignal_.emit(book, allVersionsMsgs);
@@ -285,6 +323,18 @@ TransactionFeed::unsubInternal(ripple::AccountID const& account, SubscriberPtr s
}
}
void
TransactionFeed::unsubProposedInternal(SubscriberPtr subscriber)
{
txProposedsignal_.disconnect(subscriber);
}
void
TransactionFeed::unsubProposedInternal(ripple::AccountID const& account, SubscriberPtr subscriber)
{
accountProposedSignal_.disconnect(subscriber, account);
}
void
TransactionFeed::unsubInternal(ripple::Book const& book, SubscriberPtr subscriber)
{

View File

@@ -72,6 +72,10 @@ class TransactionFeed {
TrackableSignalMap<ripple::Book, Subscriber, AllVersionTransactionsType const&> bookSignal_;
TrackableSignal<Subscriber, AllVersionTransactionsType const&> signal_;
// Signals for proposed tx subscribers
TrackableSignalMap<ripple::AccountID, Subscriber, AllVersionTransactionsType const&> accountProposedSignal_;
TrackableSignal<Subscriber, AllVersionTransactionsType const&> txProposedsignal_;
std::unordered_set<SubscriberPtr>
notified_; // Used by slots to prevent double notifications if tx contains multiple subscribed accounts
@@ -111,6 +115,22 @@ public:
void
sub(ripple::Book const& book, SubscriberSharedPtr const& subscriber);
/**
* @brief Subscribe to the transaction feed for proposed transaction stream.
* @param subscriber
*/
void
subProposed(SubscriberSharedPtr const& subscriber);
/**
* @brief Subscribe to the transaction feed for proposed account, only receive the feed when particular account is
* affected.
* @param subscriber
* @param account The account to watch.
*/
void
subProposed(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber);
/**
* @brief Unsubscribe to the transaction feed.
* @param subscriber
@@ -126,6 +146,21 @@ public:
void
unsub(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber);
/**
* @brief Unsubscribe to the transaction feed for proposed transaction stream.
* @param subscriber
*/
void
unsubProposed(SubscriberSharedPtr const& subscriber);
/**
* @brief Unsubscribe to the transaction for particular proposed account.
* @param subscriber
* @param account The account to unsubscribe.
*/
void
unsubProposed(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber);
/**
* @brief Unsubscribe to the transaction feed for particular order book.
* @param subscriber
@@ -170,6 +205,12 @@ private:
void
unsubInternal(ripple::AccountID const& account, SubscriberPtr subscriber);
void
unsubProposedInternal(SubscriberPtr subscriber);
void
unsubProposedInternal(ripple::AccountID const& account, SubscriberPtr subscriber);
void
unsubInternal(ripple::Book const& book, SubscriberPtr subscriber);
};

View File

@@ -444,6 +444,22 @@ TEST_F(SubscriptionSourceReadTests, GotTransactionIsForwardingTrue)
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GotTransactionWithMetaIsForwardingFalse)
{
subscriptionSource_.setForwarding(true);
boost::json::object const message = {{"transaction", "some_transaction_data"}, {"meta", "some_meta_data"}};
boost::asio::spawn(ioContext_, [&message, this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(boost::json::serialize(message), yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(*subscriptionManager_, forwardProposedTransaction(message)).Times(0);
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GotValidationReceivedIsForwardingFalse)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {

View File

@@ -442,8 +442,79 @@ TEST_F(SubscriptionManagerTest, ProposedTransactionTest)
"Destination":"rLEsXccBGNR3UPuPu2hUXPjziKC3qKSBun"
}
})";
constexpr static auto OrderbookPublish =
R"({
"transaction":
{
"Account":"rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn",
"Amount":"1",
"DeliverMax":"1",
"Destination":"rLEsXccBGNR3UPuPu2hUXPjziKC3qKSBun",
"Fee":"1",
"Sequence":32,
"SigningPubKey":"74657374",
"TransactionType":"Payment",
"hash":"51D2AAA6B8E4E16EF22F6424854283D8391B56875858A711B8CE4D5B9A422CC2",
"date":0
},
"meta":
{
"AffectedNodes":
[
{
"ModifiedNode":
{
"FinalFields":
{
"TakerGets":"3",
"TakerPays":
{
"currency":"0158415500000000C1F76FF6ECB0BAC600000000",
"issuer":"rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn",
"value":"1"
}
},
"LedgerEntryType":"Offer",
"PreviousFields":
{
"TakerGets":"1",
"TakerPays":
{
"currency":"0158415500000000C1F76FF6ECB0BAC600000000",
"issuer":"rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn",
"value":"3"
}
}
}
}
],
"TransactionIndex":22,
"TransactionResult":"tesSUCCESS",
"delivered_amount":"unavailable"
},
"type":"transaction",
"validated":true,
"status":"closed",
"ledger_index":33,
"ledger_hash":"4BC50C9B0D8515D3EAAE1E74B29A95804346C491EE1A95BF25E4AAB854A6A652",
"engine_result_code":0,
"engine_result":"tesSUCCESS",
"close_time_iso": "2000-01-01T00:00:00Z",
"engine_result_message":"The transaction was applied. Only final in a validated ledger."
})";
EXPECT_CALL(*sessionPtr, send(SharedStringJsonEq(dummyTransaction))).Times(2);
EXPECT_CALL(*sessionPtr, send(SharedStringJsonEq(OrderbookPublish))).Times(2);
SubscriptionManagerPtr->forwardProposedTransaction(json::parse(dummyTransaction).get_object());
auto const ledgerinfo = CreateLedgerInfo(LEDGERHASH, 33);
auto trans1 = TransactionAndMetadata();
auto obj = CreatePaymentTransactionObject(ACCOUNT1, ACCOUNT2, 1, 1, 32);
trans1.transaction = obj.getSerializer().peekData();
trans1.ledgerSequence = 32;
auto const metaObj = CreateMetaDataForBookChange(CURRENCY, ACCOUNT1, 22, 3, 1, 1, 3);
trans1.metadata = metaObj.getSerializer().peekData();
SubscriptionManagerPtr->pubTransaction(trans1, ledgerinfo);
ctx.run();
// unsub account1
@@ -452,3 +523,57 @@ TEST_F(SubscriptionManagerTest, ProposedTransactionTest)
SubscriptionManagerPtr->unsubProposedTransactions(session);
EXPECT_EQ(SubscriptionManagerPtr->report()["transactions_proposed"], 0);
}
TEST_F(SubscriptionManagerTest, DuplicateResponseSubTxAndProposedTx)
{
SubscriptionManagerPtr->subProposedTransactions(session);
SubscriptionManagerPtr->subTransactions(session);
EXPECT_EQ(SubscriptionManagerPtr->report()["transactions"], 1);
EXPECT_EQ(SubscriptionManagerPtr->report()["transactions_proposed"], 1);
EXPECT_CALL(*sessionPtr, send(testing::_)).Times(2);
auto const ledgerinfo = CreateLedgerInfo(LEDGERHASH, 33);
auto trans1 = TransactionAndMetadata();
auto obj = CreatePaymentTransactionObject(ACCOUNT1, ACCOUNT2, 1, 1, 32);
trans1.transaction = obj.getSerializer().peekData();
trans1.ledgerSequence = 32;
auto const metaObj = CreateMetaDataForBookChange(CURRENCY, ACCOUNT1, 22, 3, 1, 1, 3);
trans1.metadata = metaObj.getSerializer().peekData();
SubscriptionManagerPtr->pubTransaction(trans1, ledgerinfo);
ctx.run();
SubscriptionManagerPtr->unsubTransactions(session);
EXPECT_EQ(SubscriptionManagerPtr->report()["transactions"], 0);
SubscriptionManagerPtr->unsubProposedTransactions(session);
EXPECT_EQ(SubscriptionManagerPtr->report()["transactions_proposed"], 0);
}
TEST_F(SubscriptionManagerTest, NoDuplicateResponseSubAccountAndProposedAccount)
{
auto const account = GetAccountIDWithString(ACCOUNT1);
SubscriptionManagerPtr->subProposedAccount(account, session);
SubscriptionManagerPtr->subAccount(account, session);
EXPECT_EQ(SubscriptionManagerPtr->report()["accounts_proposed"], 1);
EXPECT_EQ(SubscriptionManagerPtr->report()["account"], 1);
EXPECT_CALL(*sessionPtr, send(testing::_)).Times(1);
auto const ledgerinfo = CreateLedgerInfo(LEDGERHASH, 33);
auto trans1 = TransactionAndMetadata();
auto obj = CreatePaymentTransactionObject(ACCOUNT1, ACCOUNT2, 1, 1, 32);
trans1.transaction = obj.getSerializer().peekData();
trans1.ledgerSequence = 32;
auto const metaObj = CreateMetaDataForBookChange(CURRENCY, ACCOUNT1, 22, 3, 1, 1, 3);
trans1.metadata = metaObj.getSerializer().peekData();
SubscriptionManagerPtr->pubTransaction(trans1, ledgerinfo);
ctx.run();
// unsub account1
SubscriptionManagerPtr->unsubProposedAccount(account, session);
EXPECT_EQ(SubscriptionManagerPtr->report()["accounts_proposed"], 0);
SubscriptionManagerPtr->unsubAccount(account, session);
EXPECT_EQ(SubscriptionManagerPtr->report()["account"], 0);
}

View File

@@ -187,6 +187,28 @@ TEST_F(FeedTransactionTest, SubTransactionV1)
ctx.run();
}
TEST_F(FeedTransactionTest, SubTransactionForProposedTx)
{
testFeedPtr->subProposed(sessionPtr);
EXPECT_EQ(testFeedPtr->transactionSubCount(), 0);
auto const ledgerinfo = CreateLedgerInfo(LEDGERHASH, 33);
auto trans1 = TransactionAndMetadata();
ripple::STObject const obj = CreatePaymentTransactionObject(ACCOUNT1, ACCOUNT2, 1, 1, 32);
trans1.transaction = obj.getSerializer().peekData();
trans1.ledgerSequence = 32;
trans1.metadata = CreatePaymentTransactionMetaObject(ACCOUNT1, ACCOUNT2, 110, 30, 22).getSerializer().peekData();
testFeedPtr->pub(trans1, ledgerinfo, backend);
EXPECT_CALL(*mockSessionPtr, send(SharedStringJsonEq(TRAN_V1))).Times(1);
ctx.run();
testFeedPtr->unsubProposed(sessionPtr);
testFeedPtr->pub(trans1, ledgerinfo, backend);
ctx.restart();
ctx.run();
}
TEST_F(FeedTransactionTest, SubTransactionV2)
{
sessionPtr->apiSubVersion = 2;
@@ -237,6 +259,29 @@ TEST_F(FeedTransactionTest, SubAccountV1)
ctx.run();
}
TEST_F(FeedTransactionTest, SubForProposedAccount)
{
auto const account = GetAccountIDWithString(ACCOUNT1);
testFeedPtr->subProposed(account, sessionPtr);
EXPECT_EQ(testFeedPtr->accountSubCount(), 0);
auto const ledgerinfo = CreateLedgerInfo(LEDGERHASH, 33);
auto trans1 = TransactionAndMetadata();
ripple::STObject const obj = CreatePaymentTransactionObject(ACCOUNT1, ACCOUNT2, 1, 1, 32);
trans1.transaction = obj.getSerializer().peekData();
trans1.ledgerSequence = 32;
trans1.metadata = CreatePaymentTransactionMetaObject(ACCOUNT1, ACCOUNT2, 110, 30, 22).getSerializer().peekData();
testFeedPtr->pub(trans1, ledgerinfo, backend);
EXPECT_CALL(*mockSessionPtr, send(SharedStringJsonEq(TRAN_V1))).Times(1);
ctx.run();
testFeedPtr->unsubProposed(account, sessionPtr);
testFeedPtr->pub(trans1, ledgerinfo, backend);
ctx.restart();
ctx.run();
}
TEST_F(FeedTransactionTest, SubAccountV2)
{
auto const account = GetAccountIDWithString(ACCOUNT1);
@@ -930,6 +975,98 @@ TEST_F(FeedTransactionTest, SubTransactionOfferCreationGlobalFrozen)
ctx.run();
}
TEST_F(FeedTransactionTest, SubBothProposedAndValidatedAccount)
{
auto const account = GetAccountIDWithString(ACCOUNT1);
testFeedPtr->sub(account, sessionPtr);
testFeedPtr->subProposed(account, sessionPtr);
EXPECT_EQ(testFeedPtr->accountSubCount(), 1);
auto const ledgerinfo = CreateLedgerInfo(LEDGERHASH, 33);
auto trans1 = TransactionAndMetadata();
ripple::STObject const obj = CreatePaymentTransactionObject(ACCOUNT1, ACCOUNT2, 1, 1, 32);
trans1.transaction = obj.getSerializer().peekData();
trans1.ledgerSequence = 32;
trans1.metadata = CreatePaymentTransactionMetaObject(ACCOUNT1, ACCOUNT2, 110, 30, 22).getSerializer().peekData();
testFeedPtr->pub(trans1, ledgerinfo, backend);
EXPECT_CALL(*mockSessionPtr, send(SharedStringJsonEq(TRAN_V1))).Times(1);
ctx.run();
testFeedPtr->unsub(account, sessionPtr);
testFeedPtr->unsubProposed(account, sessionPtr);
EXPECT_EQ(testFeedPtr->accountSubCount(), 0);
testFeedPtr->pub(trans1, ledgerinfo, backend);
ctx.restart();
ctx.run();
}
TEST_F(FeedTransactionTest, SubBothProposedAndValidated)
{
testFeedPtr->sub(sessionPtr);
testFeedPtr->subProposed(sessionPtr);
EXPECT_EQ(testFeedPtr->transactionSubCount(), 1);
auto const ledgerinfo = CreateLedgerInfo(LEDGERHASH, 33);
auto trans1 = TransactionAndMetadata();
ripple::STObject const obj = CreatePaymentTransactionObject(ACCOUNT1, ACCOUNT2, 1, 1, 32);
trans1.transaction = obj.getSerializer().peekData();
trans1.ledgerSequence = 32;
trans1.metadata = CreatePaymentTransactionMetaObject(ACCOUNT1, ACCOUNT2, 110, 30, 22).getSerializer().peekData();
testFeedPtr->pub(trans1, ledgerinfo, backend);
EXPECT_CALL(*mockSessionPtr, send(SharedStringJsonEq(TRAN_V1))).Times(2);
ctx.run();
testFeedPtr->unsub(sessionPtr);
testFeedPtr->unsubProposed(sessionPtr);
testFeedPtr->pub(trans1, ledgerinfo, backend);
ctx.restart();
ctx.run();
}
TEST_F(FeedTransactionTest, SubProposedDisconnect)
{
testFeedPtr->subProposed(sessionPtr);
EXPECT_EQ(testFeedPtr->transactionSubCount(), 0);
auto const ledgerinfo = CreateLedgerInfo(LEDGERHASH, 33);
auto trans1 = TransactionAndMetadata();
ripple::STObject const obj = CreatePaymentTransactionObject(ACCOUNT1, ACCOUNT2, 1, 1, 32);
trans1.transaction = obj.getSerializer().peekData();
trans1.ledgerSequence = 32;
trans1.metadata = CreatePaymentTransactionMetaObject(ACCOUNT1, ACCOUNT2, 110, 30, 22).getSerializer().peekData();
testFeedPtr->pub(trans1, ledgerinfo, backend);
EXPECT_CALL(*mockSessionPtr, send(SharedStringJsonEq(TRAN_V1))).Times(1);
ctx.run();
sessionPtr.reset();
testFeedPtr->pub(trans1, ledgerinfo, backend);
ctx.restart();
ctx.run();
}
TEST_F(FeedTransactionTest, SubProposedAccountDisconnect)
{
auto const account = GetAccountIDWithString(ACCOUNT1);
testFeedPtr->subProposed(account, sessionPtr);
EXPECT_EQ(testFeedPtr->accountSubCount(), 0);
auto const ledgerinfo = CreateLedgerInfo(LEDGERHASH, 33);
auto trans1 = TransactionAndMetadata();
ripple::STObject const obj = CreatePaymentTransactionObject(ACCOUNT1, ACCOUNT2, 1, 1, 32);
trans1.transaction = obj.getSerializer().peekData();
trans1.ledgerSequence = 32;
trans1.metadata = CreatePaymentTransactionMetaObject(ACCOUNT1, ACCOUNT2, 110, 30, 22).getSerializer().peekData();
testFeedPtr->pub(trans1, ledgerinfo, backend);
EXPECT_CALL(*mockSessionPtr, send(SharedStringJsonEq(TRAN_V1))).Times(1);
ctx.run();
sessionPtr.reset();
testFeedPtr->pub(trans1, ledgerinfo, backend);
ctx.restart();
ctx.run();
}
struct TransactionFeedMockPrometheusTest : WithMockPrometheus, SyncAsioContextTest {
protected:
std::shared_ptr<web::ConnectionBase> sessionPtr;