mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-27 14:35:52 +00:00
Rewrite the notification code to use smart pointers. This fixes several
fatal race conditions in notifications. This makes failure to remove a notification non-fatal (it will remove itself harmlessly when an attempt is made to notify it).
This commit is contained in:
@@ -40,6 +40,9 @@ NetworkOPs::NetworkOPs(boost::asio::io_service& io_service, LedgerMaster* pLedge
|
||||
{
|
||||
}
|
||||
|
||||
uint64 InfoSub::sSeq = 0;
|
||||
boost::mutex InfoSub::sSeqLock;
|
||||
|
||||
std::string NetworkOPs::strOperatingMode()
|
||||
{
|
||||
static const char* paStatusToken[] = {
|
||||
@@ -1028,10 +1031,19 @@ void NetworkOPs::pubServer()
|
||||
jvObj["load_base"] = (mLastLoadBase = theApp->getFeeTrack().getLoadBase());
|
||||
jvObj["load_factor"] = (mLastLoadFactor = theApp->getFeeTrack().getLoadFactor());
|
||||
|
||||
BOOST_FOREACH(InfoSub* ispListener, mSubServer)
|
||||
NetworkOPs::subMapType::const_iterator it = mSubServer.begin();
|
||||
while (it != mSubServer.end())
|
||||
{
|
||||
ispListener->send(jvObj, true);
|
||||
InfoSub::pointer p = it->second.lock();
|
||||
if (p)
|
||||
{
|
||||
p->send(jvObj, true);
|
||||
++it;
|
||||
}
|
||||
else
|
||||
it = mSubServer.erase(it);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1285,9 +1297,17 @@ void NetworkOPs::pubProposedTransaction(Ledger::ref lpCurrent, const SerializedT
|
||||
|
||||
{
|
||||
boost::recursive_mutex::scoped_lock sl(mMonitorLock);
|
||||
BOOST_FOREACH(InfoSub* ispListener, mSubRTTransactions)
|
||||
NetworkOPs::subMapType::const_iterator it = mSubRTTransactions.begin();
|
||||
while (it != mSubRTTransactions.end())
|
||||
{
|
||||
ispListener->send(jvObj, true);
|
||||
InfoSub::pointer p = it->second.lock();
|
||||
if (p)
|
||||
{
|
||||
p->send(jvObj, true);
|
||||
++it;
|
||||
}
|
||||
else
|
||||
it = mSubRTTransactions.erase(it);
|
||||
}
|
||||
}
|
||||
TransactionMetaSet::pointer ret;
|
||||
@@ -1316,15 +1336,23 @@ void NetworkOPs::pubLedger(Ledger::ref lpAccepted)
|
||||
jvObj["reserve_base"] = Json::UInt(lpAccepted->getReserve(0));
|
||||
jvObj["reserve_inc"] = Json::UInt(lpAccepted->getReserveInc());
|
||||
|
||||
BOOST_FOREACH(InfoSub* ispListener, mSubLedger)
|
||||
NetworkOPs::subMapType::const_iterator it = mSubLedger.begin();
|
||||
while (it != mSubLedger.end())
|
||||
{
|
||||
ispListener->send(jvObj, true);
|
||||
InfoSub::pointer p = it->second.lock();
|
||||
if (p)
|
||||
{
|
||||
p->send(jvObj, true);
|
||||
++it;
|
||||
}
|
||||
else
|
||||
it = mSubLedger.erase(it);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Don't lock since pubAcceptedTransaction is locking.
|
||||
if (!mSubTransactions.empty() || !mSubRTTransactions.empty() || !mSubAccount.empty() || !mSubRTAccount.empty() || !mSubmitMap.empty() )
|
||||
if (!mSubTransactions.empty() || !mSubRTTransactions.empty() || !mSubAccount.empty() || !mSubRTAccount.empty())
|
||||
{
|
||||
SHAMap& txSet = *lpAccepted->peekTransactionMap();
|
||||
|
||||
@@ -1390,14 +1418,30 @@ void NetworkOPs::pubAcceptedTransaction(Ledger::ref lpCurrent, const SerializedT
|
||||
{
|
||||
boost::recursive_mutex::scoped_lock sl(mMonitorLock);
|
||||
|
||||
BOOST_FOREACH(InfoSub* ispListener, mSubTransactions)
|
||||
NetworkOPs::subMapType::const_iterator it = mSubTransactions.begin();
|
||||
while (it != mSubTransactions.end())
|
||||
{
|
||||
ispListener->send(jvObj, true);
|
||||
InfoSub::pointer p = it->second.lock();
|
||||
if (p)
|
||||
{
|
||||
p->send(jvObj, true);
|
||||
++it;
|
||||
}
|
||||
else
|
||||
it = mSubTransactions.erase(it);
|
||||
}
|
||||
|
||||
BOOST_FOREACH(InfoSub* ispListener, mSubRTTransactions)
|
||||
it = mSubRTTransactions.begin();
|
||||
while (it != mSubRTTransactions.end())
|
||||
{
|
||||
ispListener->send(jvObj, true);
|
||||
InfoSub::pointer p = it->second.lock();
|
||||
if (p)
|
||||
{
|
||||
p->send(jvObj, true);
|
||||
++it;
|
||||
}
|
||||
else
|
||||
it = mSubRTTransactions.erase(it);
|
||||
}
|
||||
}
|
||||
theApp->getOrderBookDB().processTxn(stTxn, terResult, meta, jvObj);
|
||||
@@ -1407,7 +1451,7 @@ void NetworkOPs::pubAcceptedTransaction(Ledger::ref lpCurrent, const SerializedT
|
||||
|
||||
void NetworkOPs::pubAccountTransaction(Ledger::ref lpCurrent, const SerializedTransaction& stTxn, TER terResult, bool bAccepted, TransactionMetaSet::pointer& meta)
|
||||
{
|
||||
boost::unordered_set<InfoSub*> notify;
|
||||
boost::unordered_set<InfoSub::pointer> notify;
|
||||
int iProposed = 0;
|
||||
int iAccepted = 0;
|
||||
|
||||
@@ -1425,10 +1469,18 @@ void NetworkOPs::pubAccountTransaction(Ledger::ref lpCurrent, const SerializedTr
|
||||
|
||||
if (simiIt != mSubRTAccount.end())
|
||||
{
|
||||
BOOST_FOREACH(InfoSub* ispListener, simiIt->second)
|
||||
NetworkOPs::subMapType::const_iterator it = simiIt->second.begin();
|
||||
while (it != simiIt->second.end())
|
||||
{
|
||||
++iProposed;
|
||||
notify.insert(ispListener);
|
||||
InfoSub::pointer p = it->second.lock();
|
||||
if (p)
|
||||
{
|
||||
notify.insert(p);
|
||||
++it;
|
||||
++iProposed;
|
||||
}
|
||||
else
|
||||
it = simiIt->second.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1438,10 +1490,18 @@ void NetworkOPs::pubAccountTransaction(Ledger::ref lpCurrent, const SerializedTr
|
||||
|
||||
if (simiIt != mSubAccount.end())
|
||||
{
|
||||
BOOST_FOREACH(InfoSub* ispListener, simiIt->second)
|
||||
NetworkOPs::subMapType::const_iterator it = simiIt->second.begin();
|
||||
while (it != simiIt->second.end())
|
||||
{
|
||||
++iAccepted;
|
||||
notify.insert(ispListener);
|
||||
InfoSub::pointer p = it->second.lock();
|
||||
if (p)
|
||||
{
|
||||
notify.insert(p);
|
||||
++it;
|
||||
++iAccepted;
|
||||
}
|
||||
else
|
||||
it = simiIt->second.erase(it);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1450,16 +1510,15 @@ void NetworkOPs::pubAccountTransaction(Ledger::ref lpCurrent, const SerializedTr
|
||||
}
|
||||
cLog(lsINFO) << boost::str(boost::format("pubAccountTransaction: iProposed=%d iAccepted=%d") % iProposed % iAccepted);
|
||||
|
||||
// FIXME: This can crash. An InfoSub can go away while we hold a regular pointer to it.
|
||||
if (!notify.empty())
|
||||
{
|
||||
Json::Value jvObj = transJson(stTxn, terResult, bAccepted, lpCurrent, "account");
|
||||
|
||||
if (meta) jvObj["meta"] = meta->getJson(0);
|
||||
|
||||
BOOST_FOREACH(InfoSub* ispListener, notify)
|
||||
BOOST_FOREACH(InfoSub::ref isrListener, notify)
|
||||
{
|
||||
ispListener->send(jvObj, true);
|
||||
isrListener->send(jvObj, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1468,7 +1527,7 @@ void NetworkOPs::pubAccountTransaction(Ledger::ref lpCurrent, const SerializedTr
|
||||
// Monitoring
|
||||
//
|
||||
|
||||
void NetworkOPs::subAccount(InfoSub* ispListener, const boost::unordered_set<RippleAddress>& vnaAccountIDs, uint32 uLedgerIndex, bool rt)
|
||||
void NetworkOPs::subAccount(InfoSub::ref isrListener, const boost::unordered_set<RippleAddress>& vnaAccountIDs, uint32 uLedgerIndex, bool rt)
|
||||
{
|
||||
subInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount;
|
||||
|
||||
@@ -1477,7 +1536,7 @@ void NetworkOPs::subAccount(InfoSub* ispListener, const boost::unordered_set<Rip
|
||||
{
|
||||
cLog(lsTRACE) << boost::str(boost::format("subAccount: account: %d") % naAccountID.humanAccountID());
|
||||
|
||||
ispListener->insertSubAccountInfo(naAccountID, uLedgerIndex);
|
||||
isrListener->insertSubAccountInfo(naAccountID, uLedgerIndex);
|
||||
}
|
||||
|
||||
boost::recursive_mutex::scoped_lock sl(mMonitorLock);
|
||||
@@ -1488,20 +1547,19 @@ void NetworkOPs::subAccount(InfoSub* ispListener, const boost::unordered_set<Rip
|
||||
if (simIterator == subMap.end())
|
||||
{
|
||||
// Not found, note that account has a new single listner.
|
||||
boost::unordered_set<InfoSub*> usisElement;
|
||||
|
||||
usisElement.insert(ispListener);
|
||||
subMapType usisElement;
|
||||
usisElement[isrListener->getSeq()] = isrListener;
|
||||
subMap.insert(simIterator, make_pair(naAccountID.getAccountID(), usisElement));
|
||||
}
|
||||
else
|
||||
{
|
||||
// Found, note that the account has another listener.
|
||||
simIterator->second.insert(ispListener);
|
||||
simIterator->second[isrListener->getSeq()] = isrListener;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void NetworkOPs::unsubAccount(InfoSub* ispListener, const boost::unordered_set<RippleAddress>& vnaAccountIDs, bool rt)
|
||||
void NetworkOPs::unsubAccount(uint64 uSeq, const boost::unordered_set<RippleAddress>& vnaAccountIDs, bool rt)
|
||||
{
|
||||
subInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount;
|
||||
|
||||
@@ -1509,7 +1567,7 @@ void NetworkOPs::unsubAccount(InfoSub* ispListener, const boost::unordered_set<R
|
||||
// FIXME: Don't we need to unsub?
|
||||
// BOOST_FOREACH(const RippleAddress& naAccountID, vnaAccountIDs)
|
||||
// {
|
||||
// ispListener->deleteSubAccountInfo(naAccountID);
|
||||
// isrListener->deleteSubAccountInfo(naAccountID);
|
||||
// }
|
||||
|
||||
boost::recursive_mutex::scoped_lock sl(mMonitorLock);
|
||||
@@ -1527,7 +1585,7 @@ void NetworkOPs::unsubAccount(InfoSub* ispListener, const boost::unordered_set<R
|
||||
else
|
||||
{
|
||||
// Found
|
||||
simIterator->second.erase(ispListener);
|
||||
simIterator->second.erase(uSeq);
|
||||
|
||||
if (simIterator->second.empty())
|
||||
{
|
||||
@@ -1538,17 +1596,17 @@ void NetworkOPs::unsubAccount(InfoSub* ispListener, const boost::unordered_set<R
|
||||
}
|
||||
}
|
||||
|
||||
bool NetworkOPs::subBook(InfoSub* ispListener, uint160 currencyIn, uint160 currencyOut, uint160 issuerIn, uint160 issuerOut)
|
||||
bool NetworkOPs::subBook(InfoSub::ref isrListener, uint160 currencyIn, uint160 currencyOut, uint160 issuerIn, uint160 issuerOut)
|
||||
{
|
||||
BookListeners::pointer listeners=theApp->getOrderBookDB().makeBookListeners(currencyIn, currencyOut, issuerIn, issuerOut);
|
||||
if(listeners) listeners->addSubscriber(ispListener);
|
||||
if(listeners) listeners->addSubscriber(isrListener);
|
||||
return(true);
|
||||
}
|
||||
|
||||
bool NetworkOPs::unsubBook(InfoSub* ispListener, uint160 currencyIn, uint160 currencyOut, uint160 issuerIn, uint160 issuerOut)
|
||||
bool NetworkOPs::unsubBook(uint64 uSeq, uint160 currencyIn, uint160 currencyOut, uint160 issuerIn, uint160 issuerOut)
|
||||
{
|
||||
BookListeners::pointer listeners=theApp->getOrderBookDB().getBookListeners(currencyIn, currencyOut, issuerIn, issuerOut);
|
||||
if(listeners) listeners->removeSubscriber(ispListener);
|
||||
if(listeners) listeners->removeSubscriber(uSeq);
|
||||
return(true);
|
||||
}
|
||||
|
||||
@@ -1578,26 +1636,26 @@ void NetworkOPs::storeProposal(LedgerProposal::ref proposal, const RippleAddress
|
||||
InfoSub::~InfoSub()
|
||||
{
|
||||
NetworkOPs& ops = theApp->getOPs();
|
||||
ops.unsubTransactions(this);
|
||||
ops.unsubRTTransactions(this);
|
||||
ops.unsubLedger(this);
|
||||
ops.unsubServer(this);
|
||||
ops.unsubAccount(this, mSubAccountInfo, true);
|
||||
ops.unsubAccount(this, mSubAccountInfo, false);
|
||||
ops.unsubTransactions(mSeq);
|
||||
ops.unsubRTTransactions(mSeq);
|
||||
ops.unsubLedger(mSeq);
|
||||
ops.unsubServer(mSeq);
|
||||
ops.unsubAccount(mSeq, mSubAccountInfo, true);
|
||||
ops.unsubAccount(mSeq, mSubAccountInfo, false);
|
||||
}
|
||||
|
||||
#if 0
|
||||
void NetworkOPs::subAccountChanges(InfoSub* ispListener, const uint256 uLedgerHash)
|
||||
void NetworkOPs::subAccountChanges(InfoSub* isrListener, const uint256 uLedgerHash)
|
||||
{
|
||||
}
|
||||
|
||||
void NetworkOPs::unsubAccountChanges(InfoSub* ispListener)
|
||||
void NetworkOPs::unsubAccountChanges(InfoSub* isrListener)
|
||||
{
|
||||
}
|
||||
#endif
|
||||
|
||||
// <-- bool: true=added, false=already there
|
||||
bool NetworkOPs::subLedger(InfoSub* ispListener, Json::Value& jvResult)
|
||||
bool NetworkOPs::subLedger(InfoSub::ref isrListener, Json::Value& jvResult)
|
||||
{
|
||||
Ledger::pointer lpClosed = getClosedLedger();
|
||||
|
||||
@@ -1610,17 +1668,17 @@ bool NetworkOPs::subLedger(InfoSub* ispListener, Json::Value& jvResult)
|
||||
jvResult["reserve_base"] = Json::UInt(lpClosed->getReserve(0));
|
||||
jvResult["reserve_inc"] = Json::UInt(lpClosed->getReserveInc());
|
||||
|
||||
return mSubLedger.insert(ispListener).second;
|
||||
return mSubLedger.insert(std::make_pair(isrListener->getSeq(), isrListener)).second;
|
||||
}
|
||||
|
||||
// <-- bool: true=erased, false=was not there
|
||||
bool NetworkOPs::unsubLedger(InfoSub* ispListener)
|
||||
bool NetworkOPs::unsubLedger(uint64 uSeq)
|
||||
{
|
||||
return !!mSubLedger.erase(ispListener);
|
||||
return !!mSubLedger.erase(uSeq);
|
||||
}
|
||||
|
||||
// <-- bool: true=added, false=already there
|
||||
bool NetworkOPs::subServer(InfoSub* ispListener, Json::Value& jvResult)
|
||||
bool NetworkOPs::subServer(InfoSub::ref isrListener, Json::Value& jvResult)
|
||||
{
|
||||
uint256 uRandom;
|
||||
|
||||
@@ -1636,60 +1694,50 @@ bool NetworkOPs::subServer(InfoSub* ispListener, Json::Value& jvResult)
|
||||
jvResult["load_base"] = theApp->getFeeTrack().getLoadBase();
|
||||
jvResult["load_factor"] = theApp->getFeeTrack().getLoadFactor();
|
||||
|
||||
return mSubServer.insert(ispListener).second;
|
||||
return mSubServer.insert(std::make_pair(isrListener->getSeq(), isrListener)).second;
|
||||
}
|
||||
|
||||
// <-- bool: true=erased, false=was not there
|
||||
bool NetworkOPs::unsubServer(InfoSub* ispListener)
|
||||
bool NetworkOPs::unsubServer(uint64 uSeq)
|
||||
{
|
||||
return !!mSubServer.erase(ispListener);
|
||||
return !!mSubServer.erase(uSeq);
|
||||
}
|
||||
|
||||
// <-- bool: true=added, false=already there
|
||||
bool NetworkOPs::subTransactions(InfoSub* ispListener)
|
||||
bool NetworkOPs::subTransactions(InfoSub::ref isrListener)
|
||||
{
|
||||
return mSubTransactions.insert(ispListener).second;
|
||||
return mSubTransactions.insert(std::make_pair(isrListener->getSeq(), isrListener)).second;
|
||||
}
|
||||
|
||||
// <-- bool: true=erased, false=was not there
|
||||
bool NetworkOPs::unsubTransactions(InfoSub* ispListener)
|
||||
bool NetworkOPs::unsubTransactions(uint64 uSeq)
|
||||
{
|
||||
return !!mSubTransactions.erase(ispListener);
|
||||
return !!mSubTransactions.erase(uSeq);
|
||||
}
|
||||
|
||||
// <-- bool: true=added, false=already there
|
||||
bool NetworkOPs::subRTTransactions(InfoSub* ispListener)
|
||||
bool NetworkOPs::subRTTransactions(InfoSub::ref isrListener)
|
||||
{
|
||||
return mSubTransactions.insert(ispListener).second;
|
||||
return mSubTransactions.insert(std::make_pair(isrListener->getSeq(), isrListener)).second;
|
||||
}
|
||||
|
||||
// <-- bool: true=erased, false=was not there
|
||||
bool NetworkOPs::unsubRTTransactions(InfoSub* ispListener)
|
||||
bool NetworkOPs::unsubRTTransactions(uint64 uSeq)
|
||||
{
|
||||
return !!mSubTransactions.erase(ispListener);
|
||||
return !!mSubTransactions.erase(uSeq);
|
||||
}
|
||||
|
||||
RPCSub* NetworkOPs::findRpcSub(const std::string& strUrl)
|
||||
InfoSub::pointer NetworkOPs::findRpcSub(const std::string& strUrl)
|
||||
{
|
||||
RPCSub* rspResult;
|
||||
boost::recursive_mutex::scoped_lock sl(mMonitorLock);
|
||||
|
||||
subRpcMapType::iterator it;
|
||||
|
||||
it = mRpcSubMap.find(strUrl);
|
||||
if (it == mRpcSubMap.end())
|
||||
{
|
||||
rspResult = (RPCSub*)(0);
|
||||
}
|
||||
else
|
||||
{
|
||||
rspResult = it->second;
|
||||
}
|
||||
|
||||
return rspResult;
|
||||
subRpcMapType::iterator it = mRpcSubMap.find(strUrl);
|
||||
if (it != mRpcSubMap.end())
|
||||
return it->second;
|
||||
return InfoSub::pointer();
|
||||
}
|
||||
|
||||
RPCSub* NetworkOPs::addRpcSub(const std::string& strUrl, RPCSub* rspEntry)
|
||||
InfoSub::pointer NetworkOPs::addRpcSub(const std::string& strUrl, InfoSub::ref rspEntry)
|
||||
{
|
||||
boost::recursive_mutex::scoped_lock sl(mMonitorLock);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user