diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index 09200356b5..2f8afe372f 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -717,49 +717,68 @@ void NetworkOPs::pubAccountInfo(const NewcoinAddress& naAccountID, const Json::V void NetworkOPs::pubLedger(const Ledger::pointer& lpAccepted) { - if (!mSubLedger.empty()) { - Json::Value jvObj(Json::objectValue); - - jvObj["type"] = "ledgerAccepted"; - jvObj["seq"] = lpAccepted->getLedgerSeq(); - jvObj["hash"] = lpAccepted->getHash().ToString(); - jvObj["time"] = Json::Value::UInt(lpAccepted->getCloseTimeNC()); - boost::interprocess::sharable_lock sl(mMonitorLock); - BOOST_FOREACH(InfoSub* ispListener, mSubLedger) + + if (!mSubLedger.empty()) { - ispListener->send(jvObj); + Json::Value jvObj(Json::objectValue); + + jvObj["type"] = "ledgerAccepted"; + jvObj["seq"] = lpAccepted->getLedgerSeq(); + jvObj["hash"] = lpAccepted->getHash().ToString(); + jvObj["time"] = Json::Value::UInt(lpAccepted->getCloseTimeNC()); + + BOOST_FOREACH(InfoSub* ispListener, mSubLedger) + { + ispListener->send(jvObj); + } } } - if (!mSubLedgerAccounts.empty()) { - Json::Value jvAccounts(Json::arrayValue); - - BOOST_FOREACH(const NewcoinAddress& naAccountID, getAffectedAccounts(lpAccepted->getLedgerSeq())) - { - jvAccounts.append(Json::Value(naAccountID.humanAccountID())); - } - - Json::Value jvObj(Json::objectValue); - - jvObj["type"] = "ledgerAcceptedAccounts"; - jvObj["seq"] = lpAccepted->getLedgerSeq(); - jvObj["hash"] = lpAccepted->getHash().ToString(); - jvObj["time"] = Json::Value::UInt(lpAccepted->getCloseTimeNC()); - jvObj["accounts"] = jvAccounts; - boost::interprocess::sharable_lock sl(mMonitorLock); - BOOST_FOREACH(InfoSub* ispListener, mSubLedgerAccounts) + if (!mSubLedgerAccounts.empty()) { - ispListener->send(jvObj); + Json::Value jvAccounts(Json::arrayValue); + + BOOST_FOREACH(const NewcoinAddress& naAccountID, getAffectedAccounts(lpAccepted->getLedgerSeq())) + { + jvAccounts.append(Json::Value(naAccountID.humanAccountID())); + } + + Json::Value jvObj(Json::objectValue); + + jvObj["type"] = "ledgerAcceptedAccounts"; + jvObj["seq"] = lpAccepted->getLedgerSeq(); + jvObj["hash"] = lpAccepted->getHash().ToString(); + jvObj["time"] = Json::Value::UInt(lpAccepted->getCloseTimeNC()); + jvObj["accounts"] = jvAccounts; + + BOOST_FOREACH(InfoSub* ispListener, mSubLedgerAccounts) + { + ispListener->send(jvObj); + } } } } void NetworkOPs::pubTransaction(const Ledger::pointer& lpCurrent, const SerializedTransaction& stTxn, TransactionEngineResult terResult, const std::vector& naAffectedAccountIds) { + boost::interprocess::scoped_lock sl(mMonitorLock); + if (!mSubTransaction.empty()) + { + Json::Value jvObj(Json::objectValue); + + jvObj["type"] = "transactionProposed"; + jvObj["seq"] = lpCurrent->getLedgerSeq(); + jvObj["transaction"] = stTxn.getJson(0); // XXX Verify what options there are. + + BOOST_FOREACH(InfoSub* ispListener, mSubTransaction) + { + ispListener->send(jvObj); + } + } } // @@ -849,4 +868,16 @@ bool NetworkOPs::unsubLedgerAccounts(InfoSub* ispListener) return !!mSubLedgerAccounts.erase(ispListener); } +// <-- bool: true=added, false=already there +bool NetworkOPs::subTransaction(InfoSub* ispListener) +{ + return mSubTransaction.insert(ispListener).second; +} + +// <-- bool: true=erased, false=was not there +bool NetworkOPs::unsubTransaction(InfoSub* ispListener) +{ + return !!mSubTransaction.erase(ispListener); +} + // vim:ts=4 diff --git a/src/NetworkOPs.h b/src/NetworkOPs.h index 060128b31f..8b3717b081 100644 --- a/src/NetworkOPs.h +++ b/src/NetworkOPs.h @@ -56,10 +56,13 @@ protected: typedef boost::unordered_map > subInfoMapType; typedef boost::unordered_map >::iterator subInfoMapIterator; + // XXX Split into more locks. boost::interprocess::interprocess_upgradable_mutex mMonitorLock; subInfoMapType mSubAccountInfo; boost::unordered_set mSubLedger; // ledger accepteds boost::unordered_set mSubLedgerAccounts; // ledger accepteds + affected accounts + boost::unordered_set mSubTransaction; // all transactions + subInfoMapType mSubTransactionAccounts; public: NetworkOPs(boost::asio::io_service& io_service, LedgerMaster* pLedgerMaster); @@ -180,6 +183,9 @@ public: bool subLedgerAccounts(InfoSub* ispListener); bool unsubLedgerAccounts(InfoSub* ispListener); + + bool subTransaction(InfoSub* ispListener); + bool unsubTransaction(InfoSub* ispListener); }; #endif diff --git a/src/WSDoor.cpp b/src/WSDoor.cpp index b6274c4285..159356e380 100644 --- a/src/WSDoor.cpp +++ b/src/WSDoor.cpp @@ -82,9 +82,10 @@ public: void doLedgerUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest); void doLedgerAccountsSubcribe(Json::Value& jvResult, const Json::Value& jvRequest); void doLedgerAccountsUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest); + void doTransactionSubcribe(Json::Value& jvResult, const Json::Value& jvRequest); + void doTransactionUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest); }; - // A single instance of this object is made. // This instance dispatches all events. There is no per connection persistence. template @@ -267,6 +268,7 @@ void WSDoor::stop() WSConnection::~WSConnection() { + theApp->getOPs().unsubTransaction(this); theApp->getOPs().unsubLedger(this); theApp->getOPs().unsubLedgerAccounts(this); theApp->getOPs().unsubAccountInfo(this, mSubAccountInfo); @@ -287,12 +289,14 @@ Json::Value WSConnection::invokeCommand(const Json::Value& jvRequest) const char* pCommand; doFuncPtr dfpFunc; } commandsA[] = { - { "account_info_subscribe", &WSConnection::doAccountInfoSubscribe }, - { "account_info_unsubscribe", &WSConnection::doAccountInfoUnsubscribe }, - { "ledger_subscribe", &WSConnection::doLedgerSubcribe }, - { "ledger_unsubscribe", &WSConnection::doLedgerUnsubscribe }, + { "account_info_subscribe", &WSConnection::doAccountInfoSubscribe }, + { "account_info_unsubscribe", &WSConnection::doAccountInfoUnsubscribe }, + { "ledger_subscribe", &WSConnection::doLedgerSubcribe }, + { "ledger_unsubscribe", &WSConnection::doLedgerUnsubscribe }, { "ledger_accounts_subscribe", &WSConnection::doLedgerAccountsSubcribe }, { "ledger_accounts_unsubscribe", &WSConnection::doLedgerAccountsUnsubscribe }, + { "transaction_subscribe", &WSConnection::doTransactionSubcribe }, + { "transaction_unsubscribe", &WSConnection::doTransactionUnsubscribe }, }; if (!jvRequest.isMember("command")) @@ -467,4 +471,20 @@ void WSConnection::doLedgerAccountsUnsubscribe(Json::Value& jvResult, const Json } } +void WSConnection::doTransactionSubcribe(Json::Value& jvResult, const Json::Value& jvRequest) +{ + if (!theApp->getOPs().subTransaction(this)) + { + jvResult["error"] = "TransactionsSubscribed"; + } +} + +void WSConnection::doTransactionUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest) +{ + if (!theApp->getOPs().unsubTransaction(this)) + { + jvResult["error"] = "TransactionsNotSubscribed"; + } +} + // vim:ts=4