diff --git a/src/ripple/app/ledger/BookListeners.cpp b/src/ripple/app/ledger/BookListeners.cpp index 32bdfe15c..abc22d8f2 100644 --- a/src/ripple/app/ledger/BookListeners.cpp +++ b/src/ripple/app/ledger/BookListeners.cpp @@ -24,35 +24,44 @@ namespace ripple { -void BookListeners::addSubscriber (InfoSub::ref sub) +void +BookListeners::addSubscriber(InfoSub::ref sub) { - std::lock_guard sl (mLock); - mListeners[sub->getSeq ()] = sub; + std::lock_guard sl(mLock); + mListeners[sub->getSeq()] = sub; } -void BookListeners::removeSubscriber (std::uint64_t seq) +void +BookListeners::removeSubscriber(std::uint64_t seq) { - std::lock_guard sl (mLock); - mListeners.erase (seq); + std::lock_guard sl(mLock); + mListeners.erase(seq); } -void BookListeners::publish (Json::Value const& jvObj) +void +BookListeners::publish( + Json::Value const& jvObj, + hash_set& havePublished) { - std::lock_guard sl (mLock); - auto it = mListeners.cbegin (); + std::lock_guard sl(mLock); + auto it = mListeners.cbegin(); - while (it != mListeners.cend ()) + while (it != mListeners.cend()) { - InfoSub::pointer p = it->second.lock (); + InfoSub::pointer p = it->second.lock(); if (p) { - p->send (jvObj, true); + // Only publish jvObj if this is the first occurence + if(havePublished.emplace(p->getSeq()).second) + { + p->send(jvObj, true); + } ++it; } else - it = mListeners.erase (it); + it = mListeners.erase(it); } } -} // ripple +} // namespace ripple diff --git a/src/ripple/app/ledger/BookListeners.h b/src/ripple/app/ledger/BookListeners.h index e24adae56..fc45595dc 100644 --- a/src/ripple/app/ledger/BookListeners.h +++ b/src/ripple/app/ledger/BookListeners.h @@ -32,11 +32,33 @@ class BookListeners public: using pointer = std::shared_ptr; - BookListeners () {} + BookListeners() + { + } - void addSubscriber (InfoSub::ref sub); - void removeSubscriber (std::uint64_t sub); - void publish (Json::Value const& jvObj); + /** Add a new subscription for this book + */ + void + addSubscriber(InfoSub::ref sub); + + /** Stop publishing to a subscriber + */ + void + removeSubscriber(std::uint64_t sub); + + /** Publish a transaction to subscribers + + Publish a transaction to clients subscribed to changes on this book. + Uses havePublished to prevent sending duplicate transactions to clients + that have subscribed to multiple books. + + @param jvObj JSON transaction data to publish + @param havePublished InfoSub sequence numbers that have already + published this transaction. + + */ + void + publish(Json::Value const& jvObj, hash_set& havePublished); private: std::recursive_mutex mLock; @@ -44,6 +66,6 @@ private: hash_map mListeners; }; -} // ripple +} // namespace ripple #endif diff --git a/src/ripple/app/ledger/OrderBookDB.cpp b/src/ripple/app/ledger/OrderBookDB.cpp index 6ec52fce5..f28c0afb0 100644 --- a/src/ripple/app/ledger/OrderBookDB.cpp +++ b/src/ripple/app/ledger/OrderBookDB.cpp @@ -239,9 +239,15 @@ void OrderBookDB::processTxn ( const AcceptedLedgerTx& alTx, Json::Value const& jvObj) { std::lock_guard sl (mLock); - if (alTx.getResult () == tesSUCCESS) { + // For this particular transaction, maintain the set of unique + // subscriptions that have already published it. This prevents sending + // the transaction multiple times if it touches multiple ltOFFER + // entries for the same book, or if it touches multiple books and a + // single client has subscribed to those books. + hash_set havePublished; + // Check if this is an offer or an offer cancel or a payment that // consumes an offer. // Check to see what the meta looks like. @@ -272,12 +278,14 @@ void OrderBookDB::processTxn ( data->isFieldPresent (sfTakerGets)) { // determine the OrderBook - auto listeners = getBookListeners ( - {data->getFieldAmount (sfTakerGets).issue(), - data->getFieldAmount (sfTakerPays).issue()}); + Book b{data->getFieldAmount(sfTakerGets).issue(), + data->getFieldAmount(sfTakerPays).issue()}; + auto listeners = getBookListeners(b); if (listeners) - listeners->publish (jvObj); + { + listeners->publish(jvObj, havePublished); + } } } } diff --git a/src/test/rpc/Book_test.cpp b/src/test/rpc/Book_test.cpp index 51cc15892..93ef81716 100644 --- a/src/test/rpc/Book_test.cpp +++ b/src/test/rpc/Book_test.cpp @@ -64,7 +64,7 @@ public: // RPC subscribe to books stream books[jss::books] = Json::arrayValue; { - auto &j = books[jss::books].append(Json::objectValue); + auto& j = books[jss::books].append(Json::objectValue); j[jss::snapshot] = true; j[jss::taker_gets][jss::currency] = "XRP"; j[jss::taker_pays][jss::currency] = "USD"; @@ -147,7 +147,7 @@ public: // RPC subscribe to books stream books[jss::books] = Json::arrayValue; { - auto &j = books[jss::books].append(Json::objectValue); + auto& j = books[jss::books].append(Json::objectValue); j[jss::snapshot] = true; j[jss::taker_gets][jss::currency] = "XRP"; j[jss::taker_pays][jss::currency] = "USD"; @@ -225,7 +225,7 @@ public: // RPC subscribe to books stream books[jss::books] = Json::arrayValue; { - auto &j = books[jss::books].append(Json::objectValue); + auto& j = books[jss::books].append(Json::objectValue); j[jss::snapshot] = true; j[jss::both] = true; j[jss::taker_gets][jss::currency] = "XRP"; @@ -319,7 +319,7 @@ public: // RPC subscribe to books stream books[jss::books] = Json::arrayValue; { - auto &j = books[jss::books].append(Json::objectValue); + auto& j = books[jss::books].append(Json::objectValue); j[jss::snapshot] = true; j[jss::both] = true; j[jss::taker_gets][jss::currency] = "XRP"; @@ -414,14 +414,14 @@ public: // RPC subscribe to books stream books[jss::books] = Json::arrayValue; { - auto &j = books[jss::books].append(Json::objectValue); + auto& j = books[jss::books].append(Json::objectValue); j[jss::snapshot] = true; j[jss::taker_gets][jss::currency] = "XRP"; j[jss::taker_pays][jss::currency] = "USD"; j[jss::taker_pays][jss::issuer] = Account("alice").human(); } { - auto &j = books[jss::books].append(Json::objectValue); + auto& j = books[jss::books].append(Json::objectValue); j[jss::snapshot] = true; j[jss::taker_gets][jss::currency] = "CNY"; j[jss::taker_gets][jss::issuer] = Account("alice").human(); @@ -540,14 +540,14 @@ public: // RPC subscribe to books stream books[jss::books] = Json::arrayValue; { - auto &j = books[jss::books].append(Json::objectValue); + auto& j = books[jss::books].append(Json::objectValue); j[jss::snapshot] = true; j[jss::taker_gets][jss::currency] = "XRP"; j[jss::taker_pays][jss::currency] = "USD"; j[jss::taker_pays][jss::issuer] = Account("alice").human(); } { - auto &j = books[jss::books].append(Json::objectValue); + auto& j = books[jss::books].append(Json::objectValue); j[jss::snapshot] = true; j[jss::taker_gets][jss::currency] = "CNY"; j[jss::taker_gets][jss::issuer] = Account("alice").human(); @@ -657,7 +657,7 @@ public: // RPC subscribe to books stream books[jss::books] = Json::arrayValue; { - auto &j = books[jss::books].append(Json::objectValue); + auto& j = books[jss::books].append(Json::objectValue); j[jss::snapshot] = true; j[jss::both] = true; j[jss::taker_gets][jss::currency] = "XRP"; @@ -665,7 +665,7 @@ public: j[jss::taker_pays][jss::issuer] = Account("alice").human(); } { - auto &j = books[jss::books].append(Json::objectValue); + auto& j = books[jss::books].append(Json::objectValue); j[jss::snapshot] = true; j[jss::both] = true; j[jss::taker_gets][jss::currency] = "CNY"; @@ -804,7 +804,7 @@ public: // RPC subscribe to books stream books[jss::books] = Json::arrayValue; { - auto &j = books[jss::books].append(Json::objectValue); + auto& j = books[jss::books].append(Json::objectValue); j[jss::snapshot] = true; j[jss::both] = true; j[jss::taker_gets][jss::currency] = "XRP"; @@ -813,7 +813,7 @@ public: } // RPC subscribe to books stream { - auto &j = books[jss::books].append(Json::objectValue); + auto& j = books[jss::books].append(Json::objectValue); j[jss::snapshot] = true; j[jss::both] = true; j[jss::taker_gets][jss::currency] = "CNY"; @@ -951,7 +951,7 @@ public: { books[jss::books] = Json::arrayValue; { - auto &j = books[jss::books].append(Json::objectValue); + auto& j = books[jss::books].append(Json::objectValue); j[jss::snapshot] = true; j[jss::taker_gets][jss::currency] = "XRP"; j[jss::taker_pays][jss::currency] = "USD"; @@ -1072,6 +1072,180 @@ public: BEAST_EXPECT(jv[jss::status] == "success"); } + // Check that a stream only sees the given OfferCreate once + static + bool + offerOnlyOnceInStream( + std::unique_ptr const & wsc, + std::chrono::milliseconds const& timeout, + jtx::PrettyAmount const& takerGets, + jtx::PrettyAmount const& takerPays) + { + auto maybeJv = wsc->getMsg(timeout); + // No message + if (!maybeJv) + return false; + // wrong message + if(!(*maybeJv).isMember(jss::transaction)) + return false; + auto const& t = (*maybeJv)[jss::transaction]; + if (t[jss::TransactionType] != "OfferCreate" || + t[jss::TakerGets] != takerGets.value().getJson(0) || + t[jss::TakerPays] != takerPays.value().getJson(0)) + return false; + // Make sure no other message is waiting + return wsc->getMsg(timeout) == boost::none; + }; + + void + testCrossingSingleBookOffer() + { + testcase("Crossing single book offer"); + + // This was added to check that an OfferCreate transaction is only + // published once in a stream, even if it updates multiple offer + // ledger entries + + using namespace jtx; + Env env(*this); + + // Scenario is: + // - Alice and Bob place identical offers for USD -> XRP + // - Charlie places a crossing order that takes both Alice and Bob's + + auto const gw = Account("gateway"); + auto const alice = Account("alice"); + auto const bob = Account("bob"); + auto const charlie = Account("charlie"); + auto const USD = gw["USD"]; + + env.fund (XRP(1000000), gw, alice, bob, charlie); + env.close(); + + env (trust(alice, USD(500))); + env (trust(bob, USD(500))); + env.close(); + + env (pay(gw, alice, USD(500))); + env (pay(gw, bob, USD(500))); + env.close(); + + // Alice and Bob offer $500 for 500 XRP + env (offer (alice, XRP(500), USD(500))); + env (offer (bob, XRP(500), USD(500))); + env.close(); + + auto wsc = makeWSClient(env.app().config()); + Json::Value books; + { + // RPC subscribe to books stream + books[jss::books] = Json::arrayValue; + { + auto& j = books[jss::books].append(Json::objectValue); + j[jss::snapshot] = false; + j[jss::taker_gets][jss::currency] = "XRP"; + j[jss::taker_pays][jss::currency] = "USD"; + j[jss::taker_pays][jss::issuer] = gw.human(); + } + + auto jv = wsc->invoke("subscribe", books); + if (!BEAST_EXPECT(jv[jss::status] == "success")) + return; + } + + // Charlie places an offer that crosses Alice and Charlie's offers + env(offer(charlie, USD(1000), XRP(1000))); + env.close(); + env.require(offers(alice, 0), offers(bob, 0), offers(charlie, 0)); + BEAST_EXPECT(offerOnlyOnceInStream(wsc, 1s, XRP(1000), USD(1000))); + + // RPC unsubscribe + auto jv = wsc->invoke("unsubscribe", books); + BEAST_EXPECT(jv[jss::status] == "success"); + } + + void + testCrossingMultiBookOffer() + { + testcase("Crossing multi-book offer"); + + // This was added to check that an OfferCreate transaction is only + // published once in a stream, even if it auto-bridges across several + // books that are under subscription + + using namespace jtx; + Env env(*this); + + // Scenario is: + // - Alice has 1 USD and wants 100 XRP + // - Bob has 100 XRP and wants 1 EUR + // - Charlie has 1 EUR and wants 1 USD and should auto-bridge through + // Alice and Bob + + auto const gw = Account("gateway"); + auto const alice = Account("alice"); + auto const bob = Account("bob"); + auto const charlie = Account("charlie"); + auto const USD = gw["USD"]; + auto const EUR = gw["EUR"]; + + env.fund(XRP(1000000), gw, alice, bob, charlie); + env.close(); + + for (auto const& account : {alice, bob, charlie}) + { + for (auto const& iou : {USD, EUR}) + { + env(trust(account, iou(1))); + } + } + env.close(); + + env(pay(gw, alice, USD(1))); + env(pay(gw, charlie, EUR(1))); + env.close(); + + env(offer(alice, XRP(100), USD(1))); + env(offer(bob, EUR(1), XRP(100))); + env.close(); + + auto wsc = makeWSClient(env.app().config()); + Json::Value books; + + { + // RPC subscribe to multiple book streams + books[jss::books] = Json::arrayValue; + { + auto& j = books[jss::books].append(Json::objectValue); + j[jss::snapshot] = false; + j[jss::taker_gets][jss::currency] = "XRP"; + j[jss::taker_pays][jss::currency] = "USD"; + j[jss::taker_pays][jss::issuer] = gw.human(); + } + + { + auto& j = books[jss::books].append(Json::objectValue); + j[jss::snapshot] = false; + j[jss::taker_gets][jss::currency] = "EUR"; + j[jss::taker_gets][jss::issuer] = gw.human(); + j[jss::taker_pays][jss::currency] = "XRP"; + } + + auto jv = wsc->invoke("subscribe", books); + if (!BEAST_EXPECT(jv[jss::status] == "success")) + return; + } + + // Charlies places an on offer for EUR -> USD that should auto-bridge + env(offer(charlie, USD(1), EUR(1))); + env.close(); + BEAST_EXPECT(offerOnlyOnceInStream(wsc, 1s, EUR(1), USD(1))); + + // RPC unsubscribe + auto jv = wsc->invoke("unsubscribe", books); + BEAST_EXPECT(jv[jss::status] == "success"); + } + void testBookOfferErrors() { @@ -1466,6 +1640,8 @@ public: testMultipleBooksBothSidesEmptyBook(); testMultipleBooksBothSidesOffersInBook(); testTrackOffers(); + testCrossingSingleBookOffer(); + testCrossingMultiBookOffer(); testBookOfferErrors(); testBookOfferLimits(true); testBookOfferLimits(false);