diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index 13e917e56..cc5287983 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -27,6 +27,11 @@ SETUP_LOG(); DECLARE_INSTANCE(InfoSub); +void InfoSub::onSendEmpty() +{ + +} + NetworkOPs::NetworkOPs(boost::asio::io_service& io_service, LedgerMaster* pLedgerMaster) : mMode(omDISCONNECTED), mNeedNetworkLedger(false), mNetTimer(io_service), mLedgerMaster(pLedgerMaster), mCloseTimeOffset(0), mLastCloseProposers(0), mLastCloseConvergeTime(1000 * LEDGER_IDLE_INTERVAL), @@ -1189,7 +1194,7 @@ void NetworkOPs::pubLedger(Ledger::ref lpAccepted) } } - // we don't lock since pubAcceptedTransaction is locking + // Don't lock since pubAcceptedTransaction is locking. if (!mSubTransactions.empty() || !mSubRTTransactions.empty() || !mSubAccount.empty() || !mSubRTAccount.empty() || !mSubmitMap.empty() ) { SHAMap& txSet = *lpAccepted->peekTransactionMap(); @@ -1241,10 +1246,12 @@ Json::Value NetworkOPs::transJson(const SerializedTransaction& stTxn, TER terRes void NetworkOPs::pubAcceptedTransaction(Ledger::ref lpCurrent, const SerializedTransaction& stTxn, TER terResult,TransactionMetaSet::pointer& meta) { Json::Value jvObj = transJson(stTxn, terResult, true, lpCurrent, "transaction"); - if(meta) jvObj["meta"]=meta->getJson(0); + + if (meta) jvObj["meta"] = meta->getJson(0); { boost::recursive_mutex::scoped_lock sl(mMonitorLock); + BOOST_FOREACH(InfoSub* ispListener, mSubTransactions) { ispListener->send(jvObj); @@ -1256,22 +1263,22 @@ void NetworkOPs::pubAcceptedTransaction(Ledger::ref lpCurrent, const SerializedT } } - pubAccountTransaction(lpCurrent,stTxn,terResult,true,meta); + pubAccountTransaction(lpCurrent, stTxn, terResult, true, meta); } - -void NetworkOPs::pubAccountTransaction(Ledger::ref lpCurrent, const SerializedTransaction& stTxn, TER terResult, bool bAccepted,TransactionMetaSet::pointer& meta) +void NetworkOPs::pubAccountTransaction(Ledger::ref lpCurrent, const SerializedTransaction& stTxn, TER terResult, bool bAccepted, TransactionMetaSet::pointer& meta) { boost::unordered_set notify; { boost::recursive_mutex::scoped_lock sl(mMonitorLock); - if(!bAccepted && mSubRTAccount.empty()) return; + if (!bAccepted && mSubRTAccount.empty()) return; if (!mSubAccount.empty() || (!mSubRTAccount.empty()) ) { typedef std::map::value_type AccountPair; + BOOST_FOREACH(const AccountPair& affectedAccount, getAffectedAccounts(stTxn)) { subInfoMapIterator simiIt = mSubRTAccount.find(affectedAccount.first.getAccountID()); @@ -1283,7 +1290,8 @@ void NetworkOPs::pubAccountTransaction(Ledger::ref lpCurrent, const SerializedTr notify.insert(ispListener); } } - if(bAccepted) + + if (bAccepted) { simiIt = mSubAccount.find(affectedAccount.first.getAccountID()); @@ -1302,7 +1310,8 @@ void NetworkOPs::pubAccountTransaction(Ledger::ref lpCurrent, const SerializedTr if (!notify.empty()) { Json::Value jvObj = transJson(stTxn, terResult, bAccepted, lpCurrent, "account"); - if(meta) jvObj["meta"]=meta->getJson(0); + + if (meta) jvObj["meta"] = meta->getJson(0); BOOST_FOREACH(InfoSub* ispListener, notify) { @@ -1344,12 +1353,15 @@ std::map NetworkOPs::getAffectedAccounts(const SerializedTra // Monitoring // - - -void NetworkOPs::subAccount(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs,bool rt) +void NetworkOPs::subAccount(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs, uint32 uLedgerIndex, bool rt) { - subInfoMapType& subMap=mSubAccount; - if(rt) subMap=mSubRTAccount; + subInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount; + + // For the connection, monitor each account. + BOOST_FOREACH(const RippleAddress& naAccountID, vnaAccountIDs) + { + ispListener->insertSubAccountInfo(naAccountID, uLedgerIndex); + } boost::recursive_mutex::scoped_lock sl(mMonitorLock); @@ -1358,7 +1370,7 @@ void NetworkOPs::subAccount(InfoSub* ispListener, const boost::unordered_set usisElement; usisElement.insert(ispListener); @@ -1366,21 +1378,30 @@ void NetworkOPs::subAccount(InfoSub* ispListener, const boost::unordered_setsecond.insert(ispListener); } } } -void NetworkOPs::unsubAccount(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs,bool rt) +void NetworkOPs::unsubAccount(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs, bool rt) { - subInfoMapType& subMap= rt ? mSubRTAccount : mSubAccount; + subInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount; + + // For the connection, unmonitor each account. + // FIXME: Don't we need to unsub? + // BOOST_FOREACH(const RippleAddress& naAccountID, vnaAccountIDs) + // { + // ispListener->deleteSubAccountInfo(naAccountID); + // } boost::recursive_mutex::scoped_lock sl(mMonitorLock); BOOST_FOREACH(const RippleAddress& naAccountID, vnaAccountIDs) { subInfoMapType::iterator simIterator = subMap.find(naAccountID.getAccountID()); + + if (simIterator == mSubAccount.end()) { // Not found. Done. diff --git a/src/cpp/ripple/NetworkOPs.h b/src/cpp/ripple/NetworkOPs.h index 0bc53601b..66d757ffa 100644 --- a/src/cpp/ripple/NetworkOPs.h +++ b/src/cpp/ripple/NetworkOPs.h @@ -26,12 +26,6 @@ class RPCSub; class InfoSub : public IS_INSTANCE(InfoSub) { -public: - - virtual ~InfoSub(); - - virtual void send(const Json::Value& jvObj) = 0; - protected: boost::unordered_set mSubAccountInfo; boost::unordered_set mSubAccountTransaction; @@ -39,9 +33,17 @@ protected: boost::mutex mLockInfo; public: - void insertSubAccountInfo(RippleAddress addr) + + virtual ~InfoSub(); + + virtual void send(const Json::Value& jvObj) = 0; + + void onSendEmpty(); + + void insertSubAccountInfo(RippleAddress addr, uint32 uLedgerIndex) { boost::mutex::scoped_lock sl(mLockInfo); + mSubAccountInfo.insert(addr); } }; @@ -267,8 +269,8 @@ public: // // Monitoring: subscriber side // - void subAccount(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs,bool rt); - void unsubAccount(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs,bool rt); + void subAccount(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs, uint32 uLedgerIndex, bool rt); + void unsubAccount(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs, bool rt); bool subLedger(InfoSub* ispListener, Json::Value& jvResult); bool unsubLedger(InfoSub* ispListener); diff --git a/src/cpp/ripple/RPCHandler.cpp b/src/cpp/ripple/RPCHandler.cpp index 65508ba7e..cc64c1280 100644 --- a/src/cpp/ripple/RPCHandler.cpp +++ b/src/cpp/ripple/RPCHandler.cpp @@ -514,7 +514,7 @@ Json::Value RPCHandler::doProfile(Json::Value jvRequest) STAmount(uCurrencyOfferB, naAccountB.getAccountID(), 1+n), // saTakerGets 0); // uExpiration - if(bSubmit) + if (bSubmit) tpOfferA = mNetOps->submitTransactionSync(tpOfferA); // FIXME: Don't use synch interface } @@ -1194,7 +1194,7 @@ Json::Value RPCHandler::doTxHistory(Json::Value jvRequest) SQL_FOREACH(db, sql) { Transaction::pointer trans=Transaction::transactionFromSQL(db, false); - if(trans) txs.append(trans->getJson(0)); + if (trans) txs.append(trans->getJson(0)); } } @@ -1344,8 +1344,10 @@ Json::Value RPCHandler::doAccountTransactions(Json::Value jvRequest) for (std::vector< std::pair >::iterator it = txns.begin(), end = txns.end(); it != end; ++it) { Json::Value obj(Json::objectValue); - if(it->first) obj["tx"]=it->first->getJson(1); - if(it->second) obj["meta"]=it->second->getJson(0); + + if (it->first) obj["tx"] = it->first->getJson(1); + if (it->second) obj["meta"] = it->second->getJson(0); + ret["transactions"].append(obj); } return ret; @@ -2149,6 +2151,9 @@ Json::Value RPCHandler::doSubscribe(Json::Value jvRequest) { InfoSub* ispSub; Json::Value jvResult(Json::objectValue); + uint32 uLedgerIndex = jvRequest.isMember("ledger_index") && jvRequest["ledger_index"].isNumeric() + ? jvRequest["ledger_index"].asUInt() + : 0; if (!mInfoSub && !jvRequest.isMember("url")) { @@ -2235,12 +2240,7 @@ Json::Value RPCHandler::doSubscribe(Json::Value jvRequest) } else { - BOOST_FOREACH(const RippleAddress& naAccountID, usnaAccoundIds) - { - ispSub->insertSubAccountInfo(naAccountID); - } - - mNetOps->subAccount(ispSub, usnaAccoundIds, true); + mNetOps->subAccount(ispSub, usnaAccoundIds, uLedgerIndex, true); } } @@ -2254,12 +2254,7 @@ Json::Value RPCHandler::doSubscribe(Json::Value jvRequest) } else { - BOOST_FOREACH(const RippleAddress& naAccountID, usnaAccoundIds) - { - ispSub->insertSubAccountInfo(naAccountID); - } - - mNetOps->subAccount(ispSub, usnaAccoundIds, false); + mNetOps->subAccount(ispSub, usnaAccoundIds, uLedgerIndex, false); } } @@ -2342,12 +2337,7 @@ Json::Value RPCHandler::doUnsubscribe(Json::Value jvRequest) } else { - BOOST_FOREACH(const RippleAddress& naAccountID, usnaAccoundIds) - { - ispSub->insertSubAccountInfo(naAccountID); - } - - mNetOps->unsubAccount(ispSub, usnaAccoundIds,true); + mNetOps->unsubAccount(ispSub, usnaAccoundIds, true); } } @@ -2361,12 +2351,7 @@ Json::Value RPCHandler::doUnsubscribe(Json::Value jvRequest) } else { - BOOST_FOREACH(const RippleAddress& naAccountID, usnaAccoundIds) - { - ispSub->insertSubAccountInfo(naAccountID); - } - - mNetOps->unsubAccount(ispSub, usnaAccoundIds,false); + mNetOps->unsubAccount(ispSub, usnaAccoundIds, false); } } diff --git a/src/cpp/ripple/WSHandler.h b/src/cpp/ripple/WSHandler.h index ed6e4ba0d..887303b29 100644 --- a/src/cpp/ripple/WSHandler.h +++ b/src/cpp/ripple/WSHandler.h @@ -103,6 +103,22 @@ public: } } + void on_send_empty(connection_ptr cpClient) + { + typedef boost::shared_ptr< WSConnection > wsc_ptr; + + wsc_ptr ptr; + { + boost::mutex::scoped_lock sl(mMapLock); + typename boost::unordered_map::iterator it = mMap.find(cpClient); + if (it == mMap.end()) + return; + ptr = it->second; + } + + ptr->onSendEmpty(); + } + void on_open(connection_ptr cpClient) { boost::mutex::scoped_lock sl(mMapLock); diff --git a/src/js/remote.js b/src/js/remote.js index 8207d1971..e64a26671 100644 --- a/src/js/remote.js +++ b/src/js/remote.js @@ -687,6 +687,7 @@ Remote.prototype.request_ledger_entry = function (type) { return request; }; +// .accounts(accounts, realtime) Remote.prototype.request_subscribe = function (streams) { var request = new Request(this, 'subscribe'); @@ -700,6 +701,7 @@ Remote.prototype.request_subscribe = function (streams) { return request; }; +// .accounts(accounts, realtime) Remote.prototype.request_unsubscribe = function (streams) { var request = new Request(this, 'unsubscribe');