Prevent duplicate txs in book subscription (RIPD-1465):

If an offer transaction touched multiple ledger entries associated with the same
book, that offer transaction would be published multiple times to anyone subscribed
to that book stream.

Fixes #2095.
This commit is contained in:
Brad Chase
2017-04-28 16:36:35 -04:00
committed by Nik Bougalis
parent f2787dc35c
commit aa2ff00485
4 changed files with 252 additions and 37 deletions

View File

@@ -24,35 +24,44 @@
namespace ripple {
void BookListeners::addSubscriber (InfoSub::ref sub)
void
BookListeners::addSubscriber(InfoSub::ref sub)
{
std::lock_guard <std::recursive_mutex> sl (mLock);
mListeners[sub->getSeq ()] = sub;
std::lock_guard<std::recursive_mutex> sl(mLock);
mListeners[sub->getSeq()] = sub;
}
void BookListeners::removeSubscriber (std::uint64_t seq)
void
BookListeners::removeSubscriber(std::uint64_t seq)
{
std::lock_guard <std::recursive_mutex> sl (mLock);
mListeners.erase (seq);
std::lock_guard<std::recursive_mutex> sl(mLock);
mListeners.erase(seq);
}
void BookListeners::publish (Json::Value const& jvObj)
void
BookListeners::publish(
Json::Value const& jvObj,
hash_set<std::uint64_t>& havePublished)
{
std::lock_guard <std::recursive_mutex> sl (mLock);
auto it = mListeners.cbegin ();
std::lock_guard<std::recursive_mutex> sl(mLock);
auto it = mListeners.cbegin();
while (it != mListeners.cend ())
while (it != mListeners.cend())
{
InfoSub::pointer p = it->second.lock ();
InfoSub::pointer p = it->second.lock();
if (p)
{
p->send (jvObj, true);
// Only publish jvObj if this is the first occurence
if(havePublished.emplace(p->getSeq()).second)
{
p->send(jvObj, true);
}
++it;
}
else
it = mListeners.erase (it);
it = mListeners.erase(it);
}
}
} // ripple
} // namespace ripple

View File

@@ -32,11 +32,33 @@ class BookListeners
public:
using pointer = std::shared_ptr<BookListeners>;
BookListeners () {}
BookListeners()
{
}
void addSubscriber (InfoSub::ref sub);
void removeSubscriber (std::uint64_t sub);
void publish (Json::Value const& jvObj);
/** Add a new subscription for this book
*/
void
addSubscriber(InfoSub::ref sub);
/** Stop publishing to a subscriber
*/
void
removeSubscriber(std::uint64_t sub);
/** Publish a transaction to subscribers
Publish a transaction to clients subscribed to changes on this book.
Uses havePublished to prevent sending duplicate transactions to clients
that have subscribed to multiple books.
@param jvObj JSON transaction data to publish
@param havePublished InfoSub sequence numbers that have already
published this transaction.
*/
void
publish(Json::Value const& jvObj, hash_set<std::uint64_t>& havePublished);
private:
std::recursive_mutex mLock;
@@ -44,6 +66,6 @@ private:
hash_map<std::uint64_t, InfoSub::wptr> mListeners;
};
} // ripple
} // namespace ripple
#endif

View File

@@ -239,9 +239,15 @@ void OrderBookDB::processTxn (
const AcceptedLedgerTx& alTx, Json::Value const& jvObj)
{
std::lock_guard <std::recursive_mutex> sl (mLock);
if (alTx.getResult () == tesSUCCESS)
{
// For this particular transaction, maintain the set of unique
// subscriptions that have already published it. This prevents sending
// the transaction multiple times if it touches multiple ltOFFER
// entries for the same book, or if it touches multiple books and a
// single client has subscribed to those books.
hash_set<std::uint64_t> havePublished;
// Check if this is an offer or an offer cancel or a payment that
// consumes an offer.
// Check to see what the meta looks like.
@@ -272,12 +278,14 @@ void OrderBookDB::processTxn (
data->isFieldPresent (sfTakerGets))
{
// determine the OrderBook
auto listeners = getBookListeners (
{data->getFieldAmount (sfTakerGets).issue(),
data->getFieldAmount (sfTakerPays).issue()});
Book b{data->getFieldAmount(sfTakerGets).issue(),
data->getFieldAmount(sfTakerPays).issue()};
auto listeners = getBookListeners(b);
if (listeners)
listeners->publish (jvObj);
{
listeners->publish(jvObj, havePublished);
}
}
}
}

View File

@@ -64,7 +64,7 @@ public:
// RPC subscribe to books stream
books[jss::books] = Json::arrayValue;
{
auto &j = books[jss::books].append(Json::objectValue);
auto& j = books[jss::books].append(Json::objectValue);
j[jss::snapshot] = true;
j[jss::taker_gets][jss::currency] = "XRP";
j[jss::taker_pays][jss::currency] = "USD";
@@ -147,7 +147,7 @@ public:
// RPC subscribe to books stream
books[jss::books] = Json::arrayValue;
{
auto &j = books[jss::books].append(Json::objectValue);
auto& j = books[jss::books].append(Json::objectValue);
j[jss::snapshot] = true;
j[jss::taker_gets][jss::currency] = "XRP";
j[jss::taker_pays][jss::currency] = "USD";
@@ -225,7 +225,7 @@ public:
// RPC subscribe to books stream
books[jss::books] = Json::arrayValue;
{
auto &j = books[jss::books].append(Json::objectValue);
auto& j = books[jss::books].append(Json::objectValue);
j[jss::snapshot] = true;
j[jss::both] = true;
j[jss::taker_gets][jss::currency] = "XRP";
@@ -319,7 +319,7 @@ public:
// RPC subscribe to books stream
books[jss::books] = Json::arrayValue;
{
auto &j = books[jss::books].append(Json::objectValue);
auto& j = books[jss::books].append(Json::objectValue);
j[jss::snapshot] = true;
j[jss::both] = true;
j[jss::taker_gets][jss::currency] = "XRP";
@@ -414,14 +414,14 @@ public:
// RPC subscribe to books stream
books[jss::books] = Json::arrayValue;
{
auto &j = books[jss::books].append(Json::objectValue);
auto& j = books[jss::books].append(Json::objectValue);
j[jss::snapshot] = true;
j[jss::taker_gets][jss::currency] = "XRP";
j[jss::taker_pays][jss::currency] = "USD";
j[jss::taker_pays][jss::issuer] = Account("alice").human();
}
{
auto &j = books[jss::books].append(Json::objectValue);
auto& j = books[jss::books].append(Json::objectValue);
j[jss::snapshot] = true;
j[jss::taker_gets][jss::currency] = "CNY";
j[jss::taker_gets][jss::issuer] = Account("alice").human();
@@ -540,14 +540,14 @@ public:
// RPC subscribe to books stream
books[jss::books] = Json::arrayValue;
{
auto &j = books[jss::books].append(Json::objectValue);
auto& j = books[jss::books].append(Json::objectValue);
j[jss::snapshot] = true;
j[jss::taker_gets][jss::currency] = "XRP";
j[jss::taker_pays][jss::currency] = "USD";
j[jss::taker_pays][jss::issuer] = Account("alice").human();
}
{
auto &j = books[jss::books].append(Json::objectValue);
auto& j = books[jss::books].append(Json::objectValue);
j[jss::snapshot] = true;
j[jss::taker_gets][jss::currency] = "CNY";
j[jss::taker_gets][jss::issuer] = Account("alice").human();
@@ -657,7 +657,7 @@ public:
// RPC subscribe to books stream
books[jss::books] = Json::arrayValue;
{
auto &j = books[jss::books].append(Json::objectValue);
auto& j = books[jss::books].append(Json::objectValue);
j[jss::snapshot] = true;
j[jss::both] = true;
j[jss::taker_gets][jss::currency] = "XRP";
@@ -665,7 +665,7 @@ public:
j[jss::taker_pays][jss::issuer] = Account("alice").human();
}
{
auto &j = books[jss::books].append(Json::objectValue);
auto& j = books[jss::books].append(Json::objectValue);
j[jss::snapshot] = true;
j[jss::both] = true;
j[jss::taker_gets][jss::currency] = "CNY";
@@ -804,7 +804,7 @@ public:
// RPC subscribe to books stream
books[jss::books] = Json::arrayValue;
{
auto &j = books[jss::books].append(Json::objectValue);
auto& j = books[jss::books].append(Json::objectValue);
j[jss::snapshot] = true;
j[jss::both] = true;
j[jss::taker_gets][jss::currency] = "XRP";
@@ -813,7 +813,7 @@ public:
}
// RPC subscribe to books stream
{
auto &j = books[jss::books].append(Json::objectValue);
auto& j = books[jss::books].append(Json::objectValue);
j[jss::snapshot] = true;
j[jss::both] = true;
j[jss::taker_gets][jss::currency] = "CNY";
@@ -951,7 +951,7 @@ public:
{
books[jss::books] = Json::arrayValue;
{
auto &j = books[jss::books].append(Json::objectValue);
auto& j = books[jss::books].append(Json::objectValue);
j[jss::snapshot] = true;
j[jss::taker_gets][jss::currency] = "XRP";
j[jss::taker_pays][jss::currency] = "USD";
@@ -1072,6 +1072,180 @@ public:
BEAST_EXPECT(jv[jss::status] == "success");
}
// Check that a stream only sees the given OfferCreate once
static
bool
offerOnlyOnceInStream(
std::unique_ptr<WSClient> const & wsc,
std::chrono::milliseconds const& timeout,
jtx::PrettyAmount const& takerGets,
jtx::PrettyAmount const& takerPays)
{
auto maybeJv = wsc->getMsg(timeout);
// No message
if (!maybeJv)
return false;
// wrong message
if(!(*maybeJv).isMember(jss::transaction))
return false;
auto const& t = (*maybeJv)[jss::transaction];
if (t[jss::TransactionType] != "OfferCreate" ||
t[jss::TakerGets] != takerGets.value().getJson(0) ||
t[jss::TakerPays] != takerPays.value().getJson(0))
return false;
// Make sure no other message is waiting
return wsc->getMsg(timeout) == boost::none;
};
void
testCrossingSingleBookOffer()
{
testcase("Crossing single book offer");
// This was added to check that an OfferCreate transaction is only
// published once in a stream, even if it updates multiple offer
// ledger entries
using namespace jtx;
Env env(*this);
// Scenario is:
// - Alice and Bob place identical offers for USD -> XRP
// - Charlie places a crossing order that takes both Alice and Bob's
auto const gw = Account("gateway");
auto const alice = Account("alice");
auto const bob = Account("bob");
auto const charlie = Account("charlie");
auto const USD = gw["USD"];
env.fund (XRP(1000000), gw, alice, bob, charlie);
env.close();
env (trust(alice, USD(500)));
env (trust(bob, USD(500)));
env.close();
env (pay(gw, alice, USD(500)));
env (pay(gw, bob, USD(500)));
env.close();
// Alice and Bob offer $500 for 500 XRP
env (offer (alice, XRP(500), USD(500)));
env (offer (bob, XRP(500), USD(500)));
env.close();
auto wsc = makeWSClient(env.app().config());
Json::Value books;
{
// RPC subscribe to books stream
books[jss::books] = Json::arrayValue;
{
auto& j = books[jss::books].append(Json::objectValue);
j[jss::snapshot] = false;
j[jss::taker_gets][jss::currency] = "XRP";
j[jss::taker_pays][jss::currency] = "USD";
j[jss::taker_pays][jss::issuer] = gw.human();
}
auto jv = wsc->invoke("subscribe", books);
if (!BEAST_EXPECT(jv[jss::status] == "success"))
return;
}
// Charlie places an offer that crosses Alice and Charlie's offers
env(offer(charlie, USD(1000), XRP(1000)));
env.close();
env.require(offers(alice, 0), offers(bob, 0), offers(charlie, 0));
BEAST_EXPECT(offerOnlyOnceInStream(wsc, 1s, XRP(1000), USD(1000)));
// RPC unsubscribe
auto jv = wsc->invoke("unsubscribe", books);
BEAST_EXPECT(jv[jss::status] == "success");
}
void
testCrossingMultiBookOffer()
{
testcase("Crossing multi-book offer");
// This was added to check that an OfferCreate transaction is only
// published once in a stream, even if it auto-bridges across several
// books that are under subscription
using namespace jtx;
Env env(*this);
// Scenario is:
// - Alice has 1 USD and wants 100 XRP
// - Bob has 100 XRP and wants 1 EUR
// - Charlie has 1 EUR and wants 1 USD and should auto-bridge through
// Alice and Bob
auto const gw = Account("gateway");
auto const alice = Account("alice");
auto const bob = Account("bob");
auto const charlie = Account("charlie");
auto const USD = gw["USD"];
auto const EUR = gw["EUR"];
env.fund(XRP(1000000), gw, alice, bob, charlie);
env.close();
for (auto const& account : {alice, bob, charlie})
{
for (auto const& iou : {USD, EUR})
{
env(trust(account, iou(1)));
}
}
env.close();
env(pay(gw, alice, USD(1)));
env(pay(gw, charlie, EUR(1)));
env.close();
env(offer(alice, XRP(100), USD(1)));
env(offer(bob, EUR(1), XRP(100)));
env.close();
auto wsc = makeWSClient(env.app().config());
Json::Value books;
{
// RPC subscribe to multiple book streams
books[jss::books] = Json::arrayValue;
{
auto& j = books[jss::books].append(Json::objectValue);
j[jss::snapshot] = false;
j[jss::taker_gets][jss::currency] = "XRP";
j[jss::taker_pays][jss::currency] = "USD";
j[jss::taker_pays][jss::issuer] = gw.human();
}
{
auto& j = books[jss::books].append(Json::objectValue);
j[jss::snapshot] = false;
j[jss::taker_gets][jss::currency] = "EUR";
j[jss::taker_gets][jss::issuer] = gw.human();
j[jss::taker_pays][jss::currency] = "XRP";
}
auto jv = wsc->invoke("subscribe", books);
if (!BEAST_EXPECT(jv[jss::status] == "success"))
return;
}
// Charlies places an on offer for EUR -> USD that should auto-bridge
env(offer(charlie, USD(1), EUR(1)));
env.close();
BEAST_EXPECT(offerOnlyOnceInStream(wsc, 1s, EUR(1), USD(1)));
// RPC unsubscribe
auto jv = wsc->invoke("unsubscribe", books);
BEAST_EXPECT(jv[jss::status] == "success");
}
void
testBookOfferErrors()
{
@@ -1466,6 +1640,8 @@ public:
testMultipleBooksBothSidesEmptyBook();
testMultipleBooksBothSidesOffersInBook();
testTrackOffers();
testCrossingSingleBookOffer();
testCrossingMultiBookOffer();
testBookOfferErrors();
testBookOfferLimits(true);
testBookOfferLimits(false);