Add tests for subscribe/unsubscribe error cases:

Fixes: RIPD-1417

Fix incorrect error case messages. Fix crash in NetworkOps instance when
exiting with remaining RPC subscriptions. Add code to remove URL
subscription when requested.
This commit is contained in:
Mike Ellery
2017-06-19 14:31:22 -07:00
committed by seelabs
parent e0168b98d7
commit ac1ab720c4
5 changed files with 367 additions and 58 deletions

View File

@@ -217,6 +217,10 @@ public:
~NetworkOPsImp() override
{
jobCounter_.join();
// this clear() is necessary to ensure the shared_ptrs in this map get
// destroyed NOW because the objects in this map invoke methods on this
// class when they are destroyed
mRpcSubMap.clear();
}
public:
@@ -477,6 +481,7 @@ public:
InfoSub::pointer findRpcSub (std::string const& strUrl) override;
InfoSub::pointer addRpcSub (
std::string const& strUrl, InfoSub::ref) override;
bool tryRemoveRpcSub (std::string const& strUrl) override;
//--------------------------------------------------------------------------
//
@@ -554,13 +559,20 @@ private:
subRpcMapType mRpcSubMap;
SubMapType mSubLedger; // Accepted ledgers.
SubMapType mSubManifests; // Received validator manifests.
SubMapType mSubServer; // When server changes connectivity state.
SubMapType mSubTransactions; // All accepted transactions.
SubMapType mSubRTTransactions; // All proposed and accepted transactions.
SubMapType mSubValidations; // Received validations.
SubMapType mSubPeerStatus; // peer status changes
enum SubTypes
{
sLedger, // Accepted ledgers.
sManifests, // Received validator manifests.
sServer, // When server changes connectivity state.
sTransactions, // All accepted transactions.
sRTTransactions, // All proposed and accepted transactions.
sValidations, // Received validations.
sPeerStatus, // Peer status changes.
sLastEntry = sPeerStatus // as this name implies, any new entry must
// be ADDED ABOVE this one
};
std::array<SubMapType, SubTypes::sLastEntry+1> mStreamMaps;
ServerFeeSummary mLastFeeSummary;
@@ -1593,7 +1605,7 @@ void NetworkOPsImp::pubManifest (Manifest const& mo)
// VFALCO consider std::shared_mutex
ScopedLockType sl (mSubLock);
if (!mSubManifests.empty ())
if (!mStreamMaps[sManifests].empty ())
{
Json::Value jvObj (Json::objectValue);
@@ -1606,7 +1618,8 @@ void NetworkOPsImp::pubManifest (Manifest const& mo)
jvObj [jss::signature] = strHex (mo.getSignature ());
jvObj [jss::master_signature] = strHex (mo.getMasterSignature ());
for (auto i = mSubManifests.begin (); i != mSubManifests.end (); )
for (auto i = mStreamMaps[sManifests].begin ();
i != mStreamMaps[sManifests].end (); )
{
if (auto p = i->second.lock())
{
@@ -1615,7 +1628,7 @@ void NetworkOPsImp::pubManifest (Manifest const& mo)
}
else
{
i = mSubManifests.erase (i);
i = mStreamMaps[sManifests].erase (i);
}
}
}
@@ -1661,7 +1674,7 @@ void NetworkOPsImp::pubServer ()
//
ScopedLockType sl (mSubLock);
if (!mSubServer.empty ())
if (!mStreamMaps[sServer].empty ())
{
Json::Value jvObj (Json::objectValue);
@@ -1704,7 +1717,8 @@ void NetworkOPsImp::pubServer ()
mLastFeeSummary = f;
for (auto i = mSubServer.begin (); i != mSubServer.end (); )
for (auto i = mStreamMaps[sServer].begin ();
i != mStreamMaps[sServer].end (); )
{
InfoSub::pointer p = i->second.lock ();
@@ -1718,7 +1732,7 @@ void NetworkOPsImp::pubServer ()
}
else
{
i = mSubServer.erase (i);
i = mStreamMaps[sServer].erase (i);
}
}
}
@@ -1730,7 +1744,7 @@ void NetworkOPsImp::pubValidation (STValidation::ref val)
// VFALCO consider std::shared_mutex
ScopedLockType sl (mSubLock);
if (!mSubValidations.empty ())
if (!mStreamMaps[sValidations].empty ())
{
Json::Value jvObj (Json::objectValue);
@@ -1769,7 +1783,8 @@ void NetworkOPsImp::pubValidation (STValidation::ref val)
if (auto const reserveInc = (*val)[~sfReserveIncrement])
jvObj [jss::reserve_inc] = *reserveInc;
for (auto i = mSubValidations.begin (); i != mSubValidations.end (); )
for (auto i = mStreamMaps[sValidations].begin ();
i != mStreamMaps[sValidations].end (); )
{
if (auto p = i->second.lock())
{
@@ -1778,7 +1793,7 @@ void NetworkOPsImp::pubValidation (STValidation::ref val)
}
else
{
i = mSubValidations.erase (i);
i = mStreamMaps[sValidations].erase (i);
}
}
}
@@ -1789,13 +1804,14 @@ void NetworkOPsImp::pubPeerStatus (
{
ScopedLockType sl (mSubLock);
if (!mSubPeerStatus.empty ())
if (!mStreamMaps[sPeerStatus].empty ())
{
Json::Value jvObj (func());
jvObj [jss::type] = "peerStatusChange";
for (auto i = mSubPeerStatus.begin (); i != mSubPeerStatus.end (); )
for (auto i = mStreamMaps[sPeerStatus].begin ();
i != mStreamMaps[sPeerStatus].end (); )
{
InfoSub::pointer p = i->second.lock ();
@@ -1806,7 +1822,7 @@ void NetworkOPsImp::pubPeerStatus (
}
else
{
i = mSubValidations.erase (i);
i = mStreamMaps[sPeerStatus].erase (i);
}
}
}
@@ -2361,8 +2377,8 @@ void NetworkOPsImp::pubProposedTransaction (
{
ScopedLockType sl (mSubLock);
auto it = mSubRTTransactions.begin ();
while (it != mSubRTTransactions.end ())
auto it = mStreamMaps[sRTTransactions].begin ();
while (it != mStreamMaps[sRTTransactions].end ())
{
InfoSub::pointer p = it->second.lock ();
@@ -2373,7 +2389,7 @@ void NetworkOPsImp::pubProposedTransaction (
}
else
{
it = mSubRTTransactions.erase (it);
it = mStreamMaps[sRTTransactions].erase (it);
}
}
}
@@ -2402,7 +2418,7 @@ void NetworkOPsImp::pubLedger (
{
ScopedLockType sl (mSubLock);
if (!mSubLedger.empty ())
if (!mStreamMaps[sLedger].empty ())
{
Json::Value jvObj (Json::objectValue);
@@ -2426,8 +2442,8 @@ void NetworkOPsImp::pubLedger (
= app_.getLedgerMaster ().getCompleteLedgers ();
}
auto it = mSubLedger.begin ();
while (it != mSubLedger.end ())
auto it = mStreamMaps[sLedger].begin ();
while (it != mStreamMaps[sLedger].end ())
{
InfoSub::pointer p = it->second.lock ();
if (p)
@@ -2436,7 +2452,7 @@ void NetworkOPsImp::pubLedger (
++it;
}
else
it = mSubLedger.erase (it);
it = mStreamMaps[sLedger].erase (it);
}
}
}
@@ -2529,8 +2545,8 @@ void NetworkOPsImp::pubValidatedTransaction (
{
ScopedLockType sl (mSubLock);
auto it = mSubTransactions.begin ();
while (it != mSubTransactions.end ())
auto it = mStreamMaps[sTransactions].begin ();
while (it != mStreamMaps[sTransactions].end ())
{
InfoSub::pointer p = it->second.lock ();
@@ -2540,12 +2556,12 @@ void NetworkOPsImp::pubValidatedTransaction (
++it;
}
else
it = mSubTransactions.erase (it);
it = mStreamMaps[sTransactions].erase (it);
}
it = mSubRTTransactions.begin ();
it = mStreamMaps[sRTTransactions].begin ();
while (it != mSubRTTransactions.end ())
while (it != mStreamMaps[sRTTransactions].end ())
{
InfoSub::pointer p = it->second.lock ();
@@ -2555,7 +2571,7 @@ void NetworkOPsImp::pubValidatedTransaction (
++it;
}
else
it = mSubRTTransactions.erase (it);
it = mStreamMaps[sRTTransactions].erase (it);
}
}
app_.getOrderBookDB ().processTxn (alAccepted, alTx, jvObj);
@@ -2782,28 +2798,30 @@ bool NetworkOPsImp::subLedger (InfoSub::ref isrListener, Json::Value& jvResult)
}
ScopedLockType sl (mSubLock);
return mSubLedger.emplace (isrListener->getSeq (), isrListener).second;
return mStreamMaps[sLedger].emplace (
isrListener->getSeq (), isrListener).second;
}
// <-- bool: true=erased, false=was not there
bool NetworkOPsImp::unsubLedger (std::uint64_t uSeq)
{
ScopedLockType sl (mSubLock);
return mSubLedger.erase (uSeq);
return mStreamMaps[sLedger].erase (uSeq);
}
// <-- bool: true=added, false=already there
bool NetworkOPsImp::subManifests (InfoSub::ref isrListener)
{
ScopedLockType sl (mSubLock);
return mSubManifests.emplace (isrListener->getSeq (), isrListener).second;
return mStreamMaps[sManifests].emplace (
isrListener->getSeq (), isrListener).second;
}
// <-- bool: true=erased, false=was not there
bool NetworkOPsImp::unsubManifests (std::uint64_t uSeq)
{
ScopedLockType sl (mSubLock);
return mSubManifests.erase (uSeq);
return mStreamMaps[sManifests].erase (uSeq);
}
// <-- bool: true=added, false=already there
@@ -2832,21 +2850,22 @@ bool NetworkOPsImp::subServer (InfoSub::ref isrListener, Json::Value& jvResult,
app_.nodeIdentity().first);
ScopedLockType sl (mSubLock);
return mSubServer.emplace (isrListener->getSeq (), isrListener).second;
return mStreamMaps[sServer].emplace (
isrListener->getSeq (), isrListener).second;
}
// <-- bool: true=erased, false=was not there
bool NetworkOPsImp::unsubServer (std::uint64_t uSeq)
{
ScopedLockType sl (mSubLock);
return mSubServer.erase (uSeq);
return mStreamMaps[sServer].erase (uSeq);
}
// <-- bool: true=added, false=already there
bool NetworkOPsImp::subTransactions (InfoSub::ref isrListener)
{
ScopedLockType sl (mSubLock);
return mSubTransactions.emplace (
return mStreamMaps[sTransactions].emplace (
isrListener->getSeq (), isrListener).second;
}
@@ -2854,14 +2873,14 @@ bool NetworkOPsImp::subTransactions (InfoSub::ref isrListener)
bool NetworkOPsImp::unsubTransactions (std::uint64_t uSeq)
{
ScopedLockType sl (mSubLock);
return mSubTransactions.erase (uSeq);
return mStreamMaps[sTransactions].erase (uSeq);
}
// <-- bool: true=added, false=already there
bool NetworkOPsImp::subRTTransactions (InfoSub::ref isrListener)
{
ScopedLockType sl (mSubLock);
return mSubRTTransactions.emplace (
return mStreamMaps[sRTTransactions].emplace (
isrListener->getSeq (), isrListener).second;
}
@@ -2869,35 +2888,37 @@ bool NetworkOPsImp::subRTTransactions (InfoSub::ref isrListener)
bool NetworkOPsImp::unsubRTTransactions (std::uint64_t uSeq)
{
ScopedLockType sl (mSubLock);
return mSubRTTransactions.erase (uSeq);
return mStreamMaps[sRTTransactions].erase (uSeq);
}
// <-- bool: true=added, false=already there
bool NetworkOPsImp::subValidations (InfoSub::ref isrListener)
{
ScopedLockType sl (mSubLock);
return mSubValidations.emplace (isrListener->getSeq (), isrListener).second;
return mStreamMaps[sValidations].emplace (
isrListener->getSeq (), isrListener).second;
}
// <-- bool: true=erased, false=was not there
bool NetworkOPsImp::unsubValidations (std::uint64_t uSeq)
{
ScopedLockType sl (mSubLock);
return mSubValidations.erase (uSeq);
return mStreamMaps[sValidations].erase (uSeq);
}
// <-- bool: true=added, false=already there
bool NetworkOPsImp::subPeerStatus (InfoSub::ref isrListener)
{
ScopedLockType sl (mSubLock);
return mSubPeerStatus.emplace (isrListener->getSeq (), isrListener).second;
return mStreamMaps[sPeerStatus].emplace (
isrListener->getSeq (), isrListener).second;
}
// <-- bool: true=erased, false=was not there
bool NetworkOPsImp::unsubPeerStatus (std::uint64_t uSeq)
{
ScopedLockType sl (mSubLock);
return mSubPeerStatus.erase (uSeq);
return mStreamMaps[sPeerStatus].erase (uSeq);
}
InfoSub::pointer NetworkOPsImp::findRpcSub (std::string const& strUrl)
@@ -2922,6 +2943,25 @@ InfoSub::pointer NetworkOPsImp::addRpcSub (
return rspEntry;
}
bool NetworkOPsImp::tryRemoveRpcSub (std::string const& strUrl)
{
ScopedLockType sl (mSubLock);
auto pInfo = findRpcSub(strUrl);
if (!pInfo)
return false;
// check to see if any of the stream maps still hold a weak reference to
// this entry before removing
for (SubMapType const& map : mStreamMaps)
{
if (map.find(pInfo->getSeq()) != map.end())
return false;
}
mRpcSubMap.erase(strUrl);
return true;
}
#ifndef USE_NEW_BOOK_PAGE
// NIKB FIXME this should be looked at. There's no reason why this shouldn't