From c570cca15e1ab639852e08e79ff887498bd8de70 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Tue, 26 Feb 2013 00:11:23 -0800 Subject: [PATCH] Rewrite the notification code to use smart pointers. This fixes several fatal race conditions in notifications. This makes failure to remove a notification non-fatal (it will remove itself harmlessly when an attempt is made to notify it). --- src/cpp/ripple/NetworkOPs.cpp | 196 ++++++++++++++++++++------------- src/cpp/ripple/NetworkOPs.h | 70 +++++++----- src/cpp/ripple/OrderBookDB.cpp | 20 +++- src/cpp/ripple/OrderBookDB.h | 6 +- src/cpp/ripple/RPCHandler.cpp | 40 +++---- src/cpp/ripple/RPCHandler.h | 8 +- src/cpp/ripple/RPCSub.h | 3 + src/cpp/ripple/WSConnection.h | 6 +- 8 files changed, 209 insertions(+), 140 deletions(-) diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index d8c3cfdbd9..662bf28f21 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -40,6 +40,9 @@ NetworkOPs::NetworkOPs(boost::asio::io_service& io_service, LedgerMaster* pLedge { } +uint64 InfoSub::sSeq = 0; +boost::mutex InfoSub::sSeqLock; + std::string NetworkOPs::strOperatingMode() { static const char* paStatusToken[] = { @@ -1028,10 +1031,19 @@ void NetworkOPs::pubServer() jvObj["load_base"] = (mLastLoadBase = theApp->getFeeTrack().getLoadBase()); jvObj["load_factor"] = (mLastLoadFactor = theApp->getFeeTrack().getLoadFactor()); - BOOST_FOREACH(InfoSub* ispListener, mSubServer) + NetworkOPs::subMapType::const_iterator it = mSubServer.begin(); + while (it != mSubServer.end()) { - ispListener->send(jvObj, true); + InfoSub::pointer p = it->second.lock(); + if (p) + { + p->send(jvObj, true); + ++it; + } + else + it = mSubServer.erase(it); } + } } @@ -1285,9 +1297,17 @@ void NetworkOPs::pubProposedTransaction(Ledger::ref lpCurrent, const SerializedT { boost::recursive_mutex::scoped_lock sl(mMonitorLock); - BOOST_FOREACH(InfoSub* ispListener, mSubRTTransactions) + NetworkOPs::subMapType::const_iterator it = mSubRTTransactions.begin(); + while (it != mSubRTTransactions.end()) { - ispListener->send(jvObj, true); + InfoSub::pointer p = it->second.lock(); + if (p) + { + p->send(jvObj, true); + ++it; + } + else + it = mSubRTTransactions.erase(it); } } TransactionMetaSet::pointer ret; @@ -1316,15 +1336,23 @@ void NetworkOPs::pubLedger(Ledger::ref lpAccepted) jvObj["reserve_base"] = Json::UInt(lpAccepted->getReserve(0)); jvObj["reserve_inc"] = Json::UInt(lpAccepted->getReserveInc()); - BOOST_FOREACH(InfoSub* ispListener, mSubLedger) + NetworkOPs::subMapType::const_iterator it = mSubLedger.begin(); + while (it != mSubLedger.end()) { - ispListener->send(jvObj, true); + InfoSub::pointer p = it->second.lock(); + if (p) + { + p->send(jvObj, true); + ++it; + } + else + it = mSubLedger.erase(it); } } } // Don't lock since pubAcceptedTransaction is locking. - if (!mSubTransactions.empty() || !mSubRTTransactions.empty() || !mSubAccount.empty() || !mSubRTAccount.empty() || !mSubmitMap.empty() ) + if (!mSubTransactions.empty() || !mSubRTTransactions.empty() || !mSubAccount.empty() || !mSubRTAccount.empty()) { SHAMap& txSet = *lpAccepted->peekTransactionMap(); @@ -1390,14 +1418,30 @@ void NetworkOPs::pubAcceptedTransaction(Ledger::ref lpCurrent, const SerializedT { boost::recursive_mutex::scoped_lock sl(mMonitorLock); - BOOST_FOREACH(InfoSub* ispListener, mSubTransactions) + NetworkOPs::subMapType::const_iterator it = mSubTransactions.begin(); + while (it != mSubTransactions.end()) { - ispListener->send(jvObj, true); + InfoSub::pointer p = it->second.lock(); + if (p) + { + p->send(jvObj, true); + ++it; + } + else + it = mSubTransactions.erase(it); } - BOOST_FOREACH(InfoSub* ispListener, mSubRTTransactions) + it = mSubRTTransactions.begin(); + while (it != mSubRTTransactions.end()) { - ispListener->send(jvObj, true); + InfoSub::pointer p = it->second.lock(); + if (p) + { + p->send(jvObj, true); + ++it; + } + else + it = mSubRTTransactions.erase(it); } } theApp->getOrderBookDB().processTxn(stTxn, terResult, meta, jvObj); @@ -1407,7 +1451,7 @@ void NetworkOPs::pubAcceptedTransaction(Ledger::ref lpCurrent, const SerializedT void NetworkOPs::pubAccountTransaction(Ledger::ref lpCurrent, const SerializedTransaction& stTxn, TER terResult, bool bAccepted, TransactionMetaSet::pointer& meta) { - boost::unordered_set notify; + boost::unordered_set notify; int iProposed = 0; int iAccepted = 0; @@ -1425,10 +1469,18 @@ void NetworkOPs::pubAccountTransaction(Ledger::ref lpCurrent, const SerializedTr if (simiIt != mSubRTAccount.end()) { - BOOST_FOREACH(InfoSub* ispListener, simiIt->second) + NetworkOPs::subMapType::const_iterator it = simiIt->second.begin(); + while (it != simiIt->second.end()) { - ++iProposed; - notify.insert(ispListener); + InfoSub::pointer p = it->second.lock(); + if (p) + { + notify.insert(p); + ++it; + ++iProposed; + } + else + it = simiIt->second.erase(it); } } @@ -1438,10 +1490,18 @@ void NetworkOPs::pubAccountTransaction(Ledger::ref lpCurrent, const SerializedTr if (simiIt != mSubAccount.end()) { - BOOST_FOREACH(InfoSub* ispListener, simiIt->second) + NetworkOPs::subMapType::const_iterator it = simiIt->second.begin(); + while (it != simiIt->second.end()) { - ++iAccepted; - notify.insert(ispListener); + InfoSub::pointer p = it->second.lock(); + if (p) + { + notify.insert(p); + ++it; + ++iAccepted; + } + else + it = simiIt->second.erase(it); } } } @@ -1450,16 +1510,15 @@ void NetworkOPs::pubAccountTransaction(Ledger::ref lpCurrent, const SerializedTr } cLog(lsINFO) << boost::str(boost::format("pubAccountTransaction: iProposed=%d iAccepted=%d") % iProposed % iAccepted); - // FIXME: This can crash. An InfoSub can go away while we hold a regular pointer to it. if (!notify.empty()) { Json::Value jvObj = transJson(stTxn, terResult, bAccepted, lpCurrent, "account"); if (meta) jvObj["meta"] = meta->getJson(0); - BOOST_FOREACH(InfoSub* ispListener, notify) + BOOST_FOREACH(InfoSub::ref isrListener, notify) { - ispListener->send(jvObj, true); + isrListener->send(jvObj, true); } } } @@ -1468,7 +1527,7 @@ void NetworkOPs::pubAccountTransaction(Ledger::ref lpCurrent, const SerializedTr // Monitoring // -void NetworkOPs::subAccount(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs, uint32 uLedgerIndex, bool rt) +void NetworkOPs::subAccount(InfoSub::ref isrListener, const boost::unordered_set& vnaAccountIDs, uint32 uLedgerIndex, bool rt) { subInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount; @@ -1477,7 +1536,7 @@ void NetworkOPs::subAccount(InfoSub* ispListener, const boost::unordered_setinsertSubAccountInfo(naAccountID, uLedgerIndex); + isrListener->insertSubAccountInfo(naAccountID, uLedgerIndex); } boost::recursive_mutex::scoped_lock sl(mMonitorLock); @@ -1488,20 +1547,19 @@ void NetworkOPs::subAccount(InfoSub* ispListener, const boost::unordered_set usisElement; - - usisElement.insert(ispListener); + subMapType usisElement; + usisElement[isrListener->getSeq()] = isrListener; subMap.insert(simIterator, make_pair(naAccountID.getAccountID(), usisElement)); } else { // Found, note that the account has another listener. - simIterator->second.insert(ispListener); + simIterator->second[isrListener->getSeq()] = isrListener; } } } -void NetworkOPs::unsubAccount(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs, bool rt) +void NetworkOPs::unsubAccount(uint64 uSeq, const boost::unordered_set& vnaAccountIDs, bool rt) { subInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount; @@ -1509,7 +1567,7 @@ void NetworkOPs::unsubAccount(InfoSub* ispListener, const boost::unordered_setdeleteSubAccountInfo(naAccountID); + // isrListener->deleteSubAccountInfo(naAccountID); // } boost::recursive_mutex::scoped_lock sl(mMonitorLock); @@ -1527,7 +1585,7 @@ void NetworkOPs::unsubAccount(InfoSub* ispListener, const boost::unordered_setsecond.erase(ispListener); + simIterator->second.erase(uSeq); if (simIterator->second.empty()) { @@ -1538,17 +1596,17 @@ void NetworkOPs::unsubAccount(InfoSub* ispListener, const boost::unordered_setgetOrderBookDB().makeBookListeners(currencyIn, currencyOut, issuerIn, issuerOut); - if(listeners) listeners->addSubscriber(ispListener); + if(listeners) listeners->addSubscriber(isrListener); return(true); } -bool NetworkOPs::unsubBook(InfoSub* ispListener, uint160 currencyIn, uint160 currencyOut, uint160 issuerIn, uint160 issuerOut) +bool NetworkOPs::unsubBook(uint64 uSeq, uint160 currencyIn, uint160 currencyOut, uint160 issuerIn, uint160 issuerOut) { BookListeners::pointer listeners=theApp->getOrderBookDB().getBookListeners(currencyIn, currencyOut, issuerIn, issuerOut); - if(listeners) listeners->removeSubscriber(ispListener); + if(listeners) listeners->removeSubscriber(uSeq); return(true); } @@ -1578,26 +1636,26 @@ void NetworkOPs::storeProposal(LedgerProposal::ref proposal, const RippleAddress InfoSub::~InfoSub() { NetworkOPs& ops = theApp->getOPs(); - ops.unsubTransactions(this); - ops.unsubRTTransactions(this); - ops.unsubLedger(this); - ops.unsubServer(this); - ops.unsubAccount(this, mSubAccountInfo, true); - ops.unsubAccount(this, mSubAccountInfo, false); + ops.unsubTransactions(mSeq); + ops.unsubRTTransactions(mSeq); + ops.unsubLedger(mSeq); + ops.unsubServer(mSeq); + ops.unsubAccount(mSeq, mSubAccountInfo, true); + ops.unsubAccount(mSeq, mSubAccountInfo, false); } #if 0 -void NetworkOPs::subAccountChanges(InfoSub* ispListener, const uint256 uLedgerHash) +void NetworkOPs::subAccountChanges(InfoSub* isrListener, const uint256 uLedgerHash) { } -void NetworkOPs::unsubAccountChanges(InfoSub* ispListener) +void NetworkOPs::unsubAccountChanges(InfoSub* isrListener) { } #endif // <-- bool: true=added, false=already there -bool NetworkOPs::subLedger(InfoSub* ispListener, Json::Value& jvResult) +bool NetworkOPs::subLedger(InfoSub::ref isrListener, Json::Value& jvResult) { Ledger::pointer lpClosed = getClosedLedger(); @@ -1610,17 +1668,17 @@ bool NetworkOPs::subLedger(InfoSub* ispListener, Json::Value& jvResult) jvResult["reserve_base"] = Json::UInt(lpClosed->getReserve(0)); jvResult["reserve_inc"] = Json::UInt(lpClosed->getReserveInc()); - return mSubLedger.insert(ispListener).second; + return mSubLedger.insert(std::make_pair(isrListener->getSeq(), isrListener)).second; } // <-- bool: true=erased, false=was not there -bool NetworkOPs::unsubLedger(InfoSub* ispListener) +bool NetworkOPs::unsubLedger(uint64 uSeq) { - return !!mSubLedger.erase(ispListener); + return !!mSubLedger.erase(uSeq); } // <-- bool: true=added, false=already there -bool NetworkOPs::subServer(InfoSub* ispListener, Json::Value& jvResult) +bool NetworkOPs::subServer(InfoSub::ref isrListener, Json::Value& jvResult) { uint256 uRandom; @@ -1636,60 +1694,50 @@ bool NetworkOPs::subServer(InfoSub* ispListener, Json::Value& jvResult) jvResult["load_base"] = theApp->getFeeTrack().getLoadBase(); jvResult["load_factor"] = theApp->getFeeTrack().getLoadFactor(); - return mSubServer.insert(ispListener).second; + return mSubServer.insert(std::make_pair(isrListener->getSeq(), isrListener)).second; } // <-- bool: true=erased, false=was not there -bool NetworkOPs::unsubServer(InfoSub* ispListener) +bool NetworkOPs::unsubServer(uint64 uSeq) { - return !!mSubServer.erase(ispListener); + return !!mSubServer.erase(uSeq); } // <-- bool: true=added, false=already there -bool NetworkOPs::subTransactions(InfoSub* ispListener) +bool NetworkOPs::subTransactions(InfoSub::ref isrListener) { - return mSubTransactions.insert(ispListener).second; + return mSubTransactions.insert(std::make_pair(isrListener->getSeq(), isrListener)).second; } // <-- bool: true=erased, false=was not there -bool NetworkOPs::unsubTransactions(InfoSub* ispListener) +bool NetworkOPs::unsubTransactions(uint64 uSeq) { - return !!mSubTransactions.erase(ispListener); + return !!mSubTransactions.erase(uSeq); } // <-- bool: true=added, false=already there -bool NetworkOPs::subRTTransactions(InfoSub* ispListener) +bool NetworkOPs::subRTTransactions(InfoSub::ref isrListener) { - return mSubTransactions.insert(ispListener).second; + return mSubTransactions.insert(std::make_pair(isrListener->getSeq(), isrListener)).second; } // <-- bool: true=erased, false=was not there -bool NetworkOPs::unsubRTTransactions(InfoSub* ispListener) +bool NetworkOPs::unsubRTTransactions(uint64 uSeq) { - return !!mSubTransactions.erase(ispListener); + return !!mSubTransactions.erase(uSeq); } -RPCSub* NetworkOPs::findRpcSub(const std::string& strUrl) +InfoSub::pointer NetworkOPs::findRpcSub(const std::string& strUrl) { - RPCSub* rspResult; boost::recursive_mutex::scoped_lock sl(mMonitorLock); - subRpcMapType::iterator it; - - it = mRpcSubMap.find(strUrl); - if (it == mRpcSubMap.end()) - { - rspResult = (RPCSub*)(0); - } - else - { - rspResult = it->second; - } - - return rspResult; + subRpcMapType::iterator it = mRpcSubMap.find(strUrl); + if (it != mRpcSubMap.end()) + return it->second; + return InfoSub::pointer(); } -RPCSub* NetworkOPs::addRpcSub(const std::string& strUrl, RPCSub* rspEntry) +InfoSub::pointer NetworkOPs::addRpcSub(const std::string& strUrl, InfoSub::ref rspEntry) { boost::recursive_mutex::scoped_lock sl(mMonitorLock); diff --git a/src/cpp/ripple/NetworkOPs.h b/src/cpp/ripple/NetworkOPs.h index 8d04c75614..acde0e81a2 100644 --- a/src/cpp/ripple/NetworkOPs.h +++ b/src/cpp/ripple/NetworkOPs.h @@ -23,8 +23,6 @@ class LedgerConsensus; DEFINE_INSTANCE(InfoSub); -class RPCSub; - class InfoSub : public IS_INSTANCE(InfoSub) { protected: @@ -33,12 +31,30 @@ protected: boost::mutex mLockInfo; + uint64 mSeq; + static uint64 sSeq; + static boost::mutex sSeqLock; + public: + typedef boost::shared_ptr pointer; + typedef boost::weak_ptr wptr; + typedef const boost::shared_ptr& ref; + + InfoSub() + { + boost::mutex::scoped_lock sl(sSeqLock); + mSeq = ++sSeq; + } virtual ~InfoSub(); virtual void send(const Json::Value& jvObj, bool broadcast) = 0; + uint64 getSeq() + { + return mSeq; + } + void onSendEmpty(); void insertSubAccountInfo(RippleAddress addr, uint32 uLedgerIndex) @@ -66,15 +82,14 @@ public: omFULL = 3 // we have the ledger and can even validate }; + typedef boost::unordered_map subMapType; + protected: - typedef boost::unordered_map > subInfoMapType; - typedef boost::unordered_map >::value_type subInfoMapValue; - typedef boost::unordered_map >::iterator subInfoMapIterator; + typedef boost::unordered_map subInfoMapType; + typedef boost::unordered_map::value_type subInfoMapValue; + typedef boost::unordered_map::iterator subInfoMapIterator; - typedef boost::unordered_map > subSubmitMapType; - //typedef boost::unordered_map > subOrderMap; - - typedef boost::unordered_map subRpcMapType; + typedef boost::unordered_map subRpcMapType; OperatingMode mMode; bool mNeedNetworkLedger; @@ -104,15 +119,14 @@ protected: boost::recursive_mutex mMonitorLock; subInfoMapType mSubAccount; subInfoMapType mSubRTAccount; - subSubmitMapType mSubmitMap; // TODO: probably dump this subRpcMapType mRpcSubMap; - boost::unordered_set mSubLedger; // accepted ledgers - boost::unordered_set mSubServer; // when server changes connectivity state - boost::unordered_set mSubTransactions; // all accepted transactions - boost::unordered_set mSubRTTransactions; // all proposed and accepted transactions + subMapType mSubLedger; // accepted ledgers + subMapType mSubServer; // when server changes connectivity state + subMapType mSubTransactions; // all accepted transactions + subMapType mSubRTTransactions; // all proposed and accepted transactions boost::recursive_mutex mWantedHashLock; boost::unordered_set mWantedHashes; @@ -292,26 +306,26 @@ public: // // Monitoring: subscriber side // - void subAccount(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs, uint32 uLedgerIndex, bool rt); - void unsubAccount(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs, bool rt); + void subAccount(InfoSub::ref ispListener, const boost::unordered_set& vnaAccountIDs, uint32 uLedgerIndex, bool rt); + void unsubAccount(uint64 uListener, const boost::unordered_set& vnaAccountIDs, bool rt); - bool subLedger(InfoSub* ispListener, Json::Value& jvResult); - bool unsubLedger(InfoSub* ispListener); + bool subLedger(InfoSub::ref ispListener, Json::Value& jvResult); + bool unsubLedger(uint64 uListener); - bool subServer(InfoSub* ispListener, Json::Value& jvResult); - bool unsubServer(InfoSub* ispListener); + bool subServer(InfoSub::ref ispListener, Json::Value& jvResult); + bool unsubServer(uint64 uListener); - bool subBook(InfoSub* ispListener, uint160 currencyIn, uint160 currencyOut, uint160 issuerIn, uint160 issuerOut); - bool unsubBook(InfoSub* ispListener, uint160 currencyIn, uint160 currencyOut, uint160 issuerIn, uint160 issuerOut); + bool subBook(InfoSub::ref ispListener, uint160 currencyIn, uint160 currencyOut, uint160 issuerIn, uint160 issuerOut); + bool unsubBook(uint64 uListener, uint160 currencyIn, uint160 currencyOut, uint160 issuerIn, uint160 issuerOut); - bool subTransactions(InfoSub* ispListener); - bool unsubTransactions(InfoSub* ispListener); + bool subTransactions(InfoSub::ref ispListener); + bool unsubTransactions(uint64 uListener); - bool subRTTransactions(InfoSub* ispListener); - bool unsubRTTransactions(InfoSub* ispListener); + bool subRTTransactions(InfoSub::ref ispListener); + bool unsubRTTransactions(uint64 uListener); - RPCSub* findRpcSub(const std::string& strUrl); - RPCSub* addRpcSub(const std::string& strUrl, RPCSub* rspEntry); + InfoSub::pointer findRpcSub(const std::string& strUrl); + InfoSub::pointer addRpcSub(const std::string& strUrl, InfoSub::ref rspEntry); }; #endif diff --git a/src/cpp/ripple/OrderBookDB.cpp b/src/cpp/ripple/OrderBookDB.cpp index 64ff9804f9..da2050b90e 100644 --- a/src/cpp/ripple/OrderBookDB.cpp +++ b/src/cpp/ripple/OrderBookDB.cpp @@ -232,23 +232,31 @@ void OrderBookDB::processTxn(const SerializedTransaction& stTxn, TER terResult,T } } -void BookListeners::addSubscriber(InfoSub* sub) +void BookListeners::addSubscriber(InfoSub::ref sub) { - mListeners.insert(sub); + mListeners[sub->getSeq()] = sub; } -void BookListeners::removeSubscriber(InfoSub* sub) +void BookListeners::removeSubscriber(uint64 seq) { - mListeners.erase(sub); + mListeners.erase(seq); } void BookListeners::publish(Json::Value& jvObj) { //Json::Value jvObj=node.getJson(0); - BOOST_FOREACH(InfoSub* sub,mListeners) + NetworkOPs::subMapType::const_iterator it = mListeners.begin(); + while (it != mListeners.end()) { - sub->send(jvObj, true); + InfoSub::pointer p = it->second.lock(); + if (p) + { + p->send(jvObj, true); + ++it; + } + else + it = mListeners.erase(it); } } diff --git a/src/cpp/ripple/OrderBookDB.h b/src/cpp/ripple/OrderBookDB.h index 5563b2e3ef..6133f41e09 100644 --- a/src/cpp/ripple/OrderBookDB.h +++ b/src/cpp/ripple/OrderBookDB.h @@ -16,12 +16,12 @@ class BookListeners { - boost::unordered_set mListeners; + boost::unordered_map mListeners; public: typedef boost::shared_ptr pointer; - void addSubscriber(InfoSub* sub); - void removeSubscriber(InfoSub* sub); + void addSubscriber(InfoSub::ref sub); + void removeSubscriber(uint64 sub); void publish(Json::Value& jvObj); }; diff --git a/src/cpp/ripple/RPCHandler.cpp b/src/cpp/ripple/RPCHandler.cpp index d8fe8d02de..f9c0902848 100644 --- a/src/cpp/ripple/RPCHandler.cpp +++ b/src/cpp/ripple/RPCHandler.cpp @@ -66,13 +66,11 @@ int iAdminGet(const Json::Value& jvRequest, const std::string& strRemoteIp) RPCHandler::RPCHandler(NetworkOPs* netOps) { mNetOps = netOps; - mInfoSub = NULL; } -RPCHandler::RPCHandler(NetworkOPs* netOps, InfoSub* infoSub) +RPCHandler::RPCHandler(NetworkOPs* netOps, InfoSub::pointer infoSub) : mInfoSub(infoSub) { mNetOps = netOps; - mInfoSub = infoSub; } Json::Value RPCHandler::transactionSign(Json::Value jvRequest, bool bSubmit) @@ -2492,7 +2490,7 @@ rt_accounts */ Json::Value RPCHandler::doSubscribe(Json::Value jvRequest) { - InfoSub* ispSub; + InfoSub::pointer ispSub; Json::Value jvResult(Json::objectValue); uint32 uLedgerIndex = jvRequest.isMember("ledger_index") && jvRequest["ledger_index"].isNumeric() ? jvRequest["ledger_index"].asUInt() @@ -2513,25 +2511,23 @@ Json::Value RPCHandler::doSubscribe(Json::Value jvRequest) std::string strUsername = jvRequest.isMember("username") ? jvRequest["username"].asString() : ""; std::string strPassword = jvRequest.isMember("password") ? jvRequest["password"].asString() : ""; - RPCSub *rspSub = mNetOps->findRpcSub(strUrl); - if (!rspSub) + ispSub = mNetOps->findRpcSub(strUrl); + if (!ispSub) { cLog(lsDEBUG) << boost::str(boost::format("doSubscribe: building: %s") % strUrl); - rspSub = mNetOps->addRpcSub(strUrl, new RPCSub(strUrl, strUsername, strPassword)); + ispSub = mNetOps->addRpcSub(strUrl, boost::make_shared(strUrl, strUsername, strPassword)); } else { cLog(lsTRACE) << boost::str(boost::format("doSubscribe: reusing: %s") % strUrl); if (jvRequest.isMember("username")) - rspSub->setUsername(strUsername); + dynamic_cast(&*ispSub)->setUsername(strUsername); if (jvRequest.isMember("password")) - rspSub->setPassword(strPassword); + dynamic_cast(&*ispSub)->setPassword(strPassword); } - - ispSub = rspSub; } else { @@ -2631,7 +2627,7 @@ Json::Value RPCHandler::doSubscribe(Json::Value jvRequest) // FIXME: This leaks RPCSub objects for JSON-RPC. Shouldn't matter for anyone sane. Json::Value RPCHandler::doUnsubscribe(Json::Value jvRequest) { - InfoSub* ispSub; + InfoSub::pointer ispSub; Json::Value jvResult(Json::objectValue); if (!mInfoSub && !jvRequest.isMember("url")) @@ -2647,11 +2643,9 @@ Json::Value RPCHandler::doUnsubscribe(Json::Value jvRequest) std::string strUrl = jvRequest["url"].asString(); - RPCSub *rspSub = mNetOps->findRpcSub(strUrl); - if (!rspSub) + ispSub = mNetOps->findRpcSub(strUrl); + if (!ispSub) return jvResult; - - ispSub = rspSub; } else { @@ -2668,19 +2662,19 @@ Json::Value RPCHandler::doUnsubscribe(Json::Value jvRequest) if (streamName == "server") { - mNetOps->unsubServer(ispSub); + mNetOps->unsubServer(ispSub->getSeq()); } else if (streamName == "ledger") { - mNetOps->unsubLedger(ispSub); + mNetOps->unsubLedger(ispSub->getSeq()); } else if (streamName == "transactions") { - mNetOps->unsubTransactions(ispSub); + mNetOps->unsubTransactions(ispSub->getSeq()); } else if (streamName == "rt_transactions") { - mNetOps->unsubRTTransactions(ispSub); + mNetOps->unsubRTTransactions(ispSub->getSeq()); } else { @@ -2704,7 +2698,7 @@ Json::Value RPCHandler::doUnsubscribe(Json::Value jvRequest) } else { - mNetOps->unsubAccount(ispSub, usnaAccoundIds, true); + mNetOps->unsubAccount(ispSub->getSeq(), usnaAccoundIds, true); } } @@ -2718,7 +2712,7 @@ Json::Value RPCHandler::doUnsubscribe(Json::Value jvRequest) } else { - mNetOps->unsubAccount(ispSub, usnaAccoundIds, false); + mNetOps->unsubAccount(ispSub->getSeq(), usnaAccoundIds, false); } } @@ -2733,7 +2727,7 @@ Json::Value RPCHandler::doUnsubscribe(Json::Value jvRequest) STAmount::issuerFromString(currencyOut,(*it)["CurrencyIn"].asString()); uint160 issuerIn=RippleAddress::createNodePublic( (*it)["IssuerIn"].asString() ).getAccountID(); - mNetOps->unsubBook(ispSub,currencyIn,currencyOut,issuerIn,issuerOut); + mNetOps->unsubBook(ispSub->getSeq(), currencyIn, currencyOut, issuerIn, issuerOut); } } diff --git a/src/cpp/ripple/RPCHandler.h b/src/cpp/ripple/RPCHandler.h index 07aec9944e..47e30c2085 100644 --- a/src/cpp/ripple/RPCHandler.h +++ b/src/cpp/ripple/RPCHandler.h @@ -15,9 +15,9 @@ class InfoSub; class RPCHandler { - NetworkOPs* mNetOps; - InfoSub* mInfoSub; - int mRole; + NetworkOPs* mNetOps; + InfoSub::pointer mInfoSub; + int mRole; typedef Json::Value (RPCHandler::*doFuncPtr)(Json::Value params); enum { @@ -115,7 +115,7 @@ public: enum { GUEST, USER, ADMIN, FORBID }; RPCHandler(NetworkOPs* netOps); - RPCHandler(NetworkOPs* netOps, InfoSub* infoSub); + RPCHandler(NetworkOPs* netOps, InfoSub::pointer infoSub); Json::Value doCommand(const Json::Value& jvRequest, int role); Json::Value doRpcCommand(const std::string& strCommand, Json::Value& jvParams, int iRole); diff --git a/src/cpp/ripple/RPCSub.h b/src/cpp/ripple/RPCSub.h index 03172410f7..cb8c94348d 100644 --- a/src/cpp/ripple/RPCSub.h +++ b/src/cpp/ripple/RPCSub.h @@ -30,6 +30,9 @@ protected: void sendThread(); public: + typedef boost::shared_ptr pointer; + typedef const pointer& ref; + RPCSub(const std::string& strUrl, const std::string& strUsername, const std::string& strPassword); virtual ~RPCSub() { ; } diff --git a/src/cpp/ripple/WSConnection.h b/src/cpp/ripple/WSConnection.h index 291a6b035c..acd3b2c865 100644 --- a/src/cpp/ripple/WSConnection.h +++ b/src/cpp/ripple/WSConnection.h @@ -6,6 +6,7 @@ #include #include +#include #include "WSDoor.h" #include "Application.h" @@ -28,7 +29,8 @@ class WSServerHandler; // - Subscriptions // template -class WSConnection : public InfoSub, public IS_INSTANCE(WebSocketConnection) +class WSConnection : public InfoSub, public IS_INSTANCE(WebSocketConnection), + public boost::enable_shared_from_this< WSConnection > { public: typedef typename endpoint_type::connection_type connection; @@ -101,7 +103,7 @@ public: return jvResult; } - RPCHandler mRPCHandler(&mNetwork, this); + RPCHandler mRPCHandler(&mNetwork, this->shared_from_this()); Json::Value jvResult(Json::objectValue); int iRole = mHandler->getPublic()