From 5a6349b328867eb40a434f87158845a5e52feeaf Mon Sep 17 00:00:00 2001 From: Arthur Britto Date: Fri, 29 Jun 2012 16:23:06 -0700 Subject: [PATCH] Add support for WS account_transaction_subscribe and account_transaction_unsubscribe. --- src/NetworkOPs.cpp | 120 ++++++++++++++++++++++++++++++++++++++++----- src/NetworkOPs.h | 13 +++-- src/WSDoor.cpp | 90 ++++++++++++++++++++++++++++++---- 3 files changed, 196 insertions(+), 27 deletions(-) diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index cc3729701f..5195ab28d9 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -650,7 +650,7 @@ std::vector< std::pair > } std::vector - NetworkOPs::getAffectedAccounts(uint32 ledgerSeq) + NetworkOPs::getLedgerAffectedAccounts(uint32 ledgerSeq) { std::vector accounts; std::string sql = str(boost::format @@ -702,7 +702,7 @@ void NetworkOPs::pubAccountInfo(const NewcoinAddress& naAccountID, const Json::V { boost::interprocess::sharable_lock sl(mMonitorLock); - subInfoMapType::iterator simIterator = mSubAccountInfo.find(naAccountID); + subInfoMapType::iterator simIterator = mSubAccountInfo.find(naAccountID.getAccountID()); if (simIterator == mSubAccountInfo.end()) { @@ -742,11 +742,11 @@ void NetworkOPs::pubLedger(const Ledger::pointer& lpAccepted) { boost::interprocess::sharable_lock sl(mMonitorLock); - if (!mSubLedgerAccounts.empty()) + if (!mSubAccountTransaction.empty()) { Json::Value jvAccounts(Json::arrayValue); - BOOST_FOREACH(const NewcoinAddress& naAccountID, getAffectedAccounts(lpAccepted->getLedgerSeq())) + BOOST_FOREACH(const NewcoinAddress& naAccountID, getLedgerAffectedAccounts(lpAccepted->getLedgerSeq())) { jvAccounts.append(Json::Value(naAccountID.humanAccountID())); } @@ -769,17 +769,62 @@ void NetworkOPs::pubLedger(const Ledger::pointer& lpAccepted) void NetworkOPs::pubTransaction(const Ledger::pointer& lpCurrent, const SerializedTransaction& stTxn, TransactionEngineResult terResult) { - // std::vector affectedAccounts = stTxn.getAffectedAccounts(); - boost::interprocess::scoped_lock sl(mMonitorLock); - if (!mSubTransaction.empty()) { + boost::interprocess::sharable_lock sl(mMonitorLock); + if (!mSubTransaction.empty()) + { + Json::Value jvObj(Json::objectValue); + + jvObj["type"] = "transactionProposed"; + jvObj["seq"] = lpCurrent->getLedgerSeq(); + jvObj["transaction"] = stTxn.getJson(0); + + BOOST_FOREACH(InfoSub* ispListener, mSubTransaction) + { + ispListener->send(jvObj); + } + } + } + + boost::unordered_set usisNotify; + + { + boost::interprocess::sharable_lock sl(mMonitorLock); + + if (!mSubAccountTransaction.empty()) + { + BOOST_FOREACH(const NewcoinAddress& naAccountPublic, stTxn.getAffectedAccounts()) + { + subInfoMapIterator simiIt = mSubAccountTransaction.find(naAccountPublic.getAccountID()); + + if (simiIt != mSubAccountTransaction.end()) + { + BOOST_FOREACH(InfoSub* ispListener, simiIt->second) + { + usisNotify.insert(ispListener); + } + } + } + } + } + + if (!usisNotify.empty()) + { + Json::Value jvAccounts(Json::arrayValue); + + BOOST_FOREACH(const NewcoinAddress& naAccountID, stTxn.getAffectedAccounts()) + { + jvAccounts.append(Json::Value(naAccountID.humanAccountID())); + } + Json::Value jvObj(Json::objectValue); - jvObj["type"] = "transactionProposed"; + jvObj["type"] = "accountTransactionProposed"; jvObj["seq"] = lpCurrent->getLedgerSeq(); - jvObj["transaction"] = stTxn.getJson(0); // XXX Verify what options there are. + jvObj["accounts"] = jvAccounts; + jvObj["transaction"] = stTxn.getJson(0); - BOOST_FOREACH(InfoSub* ispListener, mSubTransaction) + BOOST_FOREACH(InfoSub* ispListener, usisNotify) { ispListener->send(jvObj); } @@ -796,14 +841,14 @@ void NetworkOPs::subAccountInfo(InfoSub* ispListener, const boost::unordered_set BOOST_FOREACH(const NewcoinAddress& naAccountID, vnaAccountIDs) { - subInfoMapType::iterator simIterator = mSubAccountInfo.find(naAccountID); + subInfoMapType::iterator simIterator = mSubAccountInfo.find(naAccountID.getAccountID()); if (simIterator == mSubAccountInfo.end()) { // Not found boost::unordered_set usisElement; usisElement.insert(ispListener); - mSubAccountInfo.insert(simIterator, make_pair(naAccountID, usisElement)); + mSubAccountInfo.insert(simIterator, make_pair(naAccountID.getAccountID(), usisElement)); } else { @@ -819,7 +864,7 @@ void NetworkOPs::unsubAccountInfo(InfoSub* ispListener, const boost::unordered_s BOOST_FOREACH(const NewcoinAddress& naAccountID, vnaAccountIDs) { - subInfoMapType::iterator simIterator = mSubAccountInfo.find(naAccountID); + subInfoMapType::iterator simIterator = mSubAccountInfo.find(naAccountID.getAccountID()); if (simIterator == mSubAccountInfo.end()) { // Not found. Done. @@ -839,6 +884,55 @@ void NetworkOPs::unsubAccountInfo(InfoSub* ispListener, const boost::unordered_s } } +void NetworkOPs::subAccountTransaction(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs) +{ + boost::interprocess::scoped_lock sl(mMonitorLock); + + BOOST_FOREACH(const NewcoinAddress& naAccountID, vnaAccountIDs) + { + subInfoMapType::iterator simIterator = mSubAccountTransaction.find(naAccountID.getAccountID()); + if (simIterator == mSubAccountTransaction.end()) + { + // Not found + boost::unordered_set usisElement; + + usisElement.insert(ispListener); + mSubAccountTransaction.insert(simIterator, make_pair(naAccountID.getAccountID(), usisElement)); + } + else + { + // Found + simIterator->second.insert(ispListener); + } + } +} + +void NetworkOPs::unsubAccountTransaction(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs) +{ + boost::interprocess::scoped_lock sl(mMonitorLock); + + BOOST_FOREACH(const NewcoinAddress& naAccountID, vnaAccountIDs) + { + subInfoMapType::iterator simIterator = mSubAccountTransaction.find(naAccountID.getAccountID()); + if (simIterator == mSubAccountTransaction.end()) + { + // Not found. Done. + nothing(); + } + else + { + // Found + simIterator->second.erase(ispListener); + + if (simIterator->second.empty()) + { + // Don't need hash entry. + mSubAccountTransaction.erase(simIterator); + } + } + } +} + #if 0 void NetworkOPs::subAccountChanges(InfoSub* ispListener, const uint256 uLedgerHash) { diff --git a/src/NetworkOPs.h b/src/NetworkOPs.h index 9ef9c38f29..cd2d570b7e 100644 --- a/src/NetworkOPs.h +++ b/src/NetworkOPs.h @@ -53,16 +53,18 @@ protected: void setMode(OperatingMode); - typedef boost::unordered_map > subInfoMapType; - typedef boost::unordered_map >::iterator subInfoMapIterator; + typedef boost::unordered_map > subInfoMapType; + typedef boost::unordered_map >::value_type subInfoMapValue; + typedef boost::unordered_map >::iterator subInfoMapIterator; // XXX Split into more locks. boost::interprocess::interprocess_upgradable_mutex mMonitorLock; subInfoMapType mSubAccountInfo; + subInfoMapType mSubAccountTransaction; boost::unordered_set mSubLedger; // ledger accepteds boost::unordered_set mSubLedgerAccounts; // ledger accepteds + affected accounts boost::unordered_set mSubTransaction; // all transactions - subInfoMapType mSubTransactionAccounts; +// subInfoMapType mSubTransactionAccounts; public: NetworkOPs(boost::asio::io_service& io_service, LedgerMaster* pLedgerMaster); @@ -156,7 +158,7 @@ public: // client information retrieval functions std::vector< std::pair > getAffectedAccounts(const NewcoinAddress& account, uint32 minLedger, uint32 maxLedger); - std::vector getAffectedAccounts(uint32 ledgerSeq); + std::vector getLedgerAffectedAccounts(uint32 ledgerSeq); std::vector getLedgerTransactions(uint32 ledgerSeq); // @@ -175,6 +177,9 @@ public: void subAccountInfo(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs); void unsubAccountInfo(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs); + void subAccountTransaction(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs); + void unsubAccountTransaction(InfoSub* ispListener, const boost::unordered_set& vnaAccountIDs); + // void subAccountChanges(InfoSub* ispListener, const uint256 uLedgerHash); // void unsubAccountChanges(InfoSub* ispListener); diff --git a/src/WSDoor.cpp b/src/WSDoor.cpp index 159356e380..baaec5b013 100644 --- a/src/WSDoor.cpp +++ b/src/WSDoor.cpp @@ -53,6 +53,7 @@ protected: boost::mutex mLock; boost::unordered_set mSubAccountInfo; + boost::unordered_set mSubAccountTransaction; WSServerHandler* mHandler; connection_ptr mConnection; @@ -77,6 +78,8 @@ public: // Commands void doAccountInfoSubscribe(Json::Value& jvResult, const Json::Value& jvRequest); void doAccountInfoUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest); + void doAccountTransactionSubscribe(Json::Value& jvResult, const Json::Value& jvRequest); + void doAccountTransactionUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest); void doLedgerSubcribe(Json::Value& jvResult, const Json::Value& jvRequest); void doLedgerUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest); @@ -272,6 +275,7 @@ WSConnection::~WSConnection() theApp->getOPs().unsubLedger(this); theApp->getOPs().unsubLedgerAccounts(this); theApp->getOPs().unsubAccountInfo(this, mSubAccountInfo); + theApp->getOPs().unsubAccountTransaction(this, mSubAccountTransaction); } void WSConnection::send(const Json::Value& jvObj) @@ -289,14 +293,16 @@ Json::Value WSConnection::invokeCommand(const Json::Value& jvRequest) const char* pCommand; doFuncPtr dfpFunc; } commandsA[] = { - { "account_info_subscribe", &WSConnection::doAccountInfoSubscribe }, - { "account_info_unsubscribe", &WSConnection::doAccountInfoUnsubscribe }, - { "ledger_subscribe", &WSConnection::doLedgerSubcribe }, - { "ledger_unsubscribe", &WSConnection::doLedgerUnsubscribe }, - { "ledger_accounts_subscribe", &WSConnection::doLedgerAccountsSubcribe }, - { "ledger_accounts_unsubscribe", &WSConnection::doLedgerAccountsUnsubscribe }, - { "transaction_subscribe", &WSConnection::doTransactionSubcribe }, - { "transaction_unsubscribe", &WSConnection::doTransactionUnsubscribe }, + { "account_info_subscribe", &WSConnection::doAccountInfoSubscribe }, + { "account_info_unsubscribe", &WSConnection::doAccountInfoUnsubscribe }, + { "account_transaction_subscribe", &WSConnection::doAccountTransactionSubscribe }, + { "account_transaction_unsubscribe", &WSConnection::doAccountTransactionUnsubscribe }, + { "ledger_subscribe", &WSConnection::doLedgerSubcribe }, + { "ledger_unsubscribe", &WSConnection::doLedgerUnsubscribe }, + { "ledger_accounts_subscribe", &WSConnection::doLedgerAccountsSubcribe }, + { "ledger_accounts_unsubscribe", &WSConnection::doLedgerAccountsUnsubscribe }, + { "transaction_subscribe", &WSConnection::doTransactionSubcribe }, + { "transaction_unsubscribe", &WSConnection::doTransactionUnsubscribe }, }; if (!jvRequest.isMember("command")) @@ -381,7 +387,7 @@ void WSConnection::doAccountInfoSubscribe(Json::Value& jvResult, const Json::Val { jvResult["error"] = "missingField"; } - else if (jvResult["accounts"].empty()) + else if (jvRequest["accounts"].empty()) { jvResult["error"] = "emptySet"; } @@ -413,7 +419,7 @@ void WSConnection::doAccountInfoUnsubscribe(Json::Value& jvResult, const Json::V { jvResult["error"] = "missingField"; } - else if (jvResult["accounts"].empty()) + else if (jvRequest["accounts"].empty()) { jvResult["error"] = "emptySet"; } @@ -439,6 +445,70 @@ void WSConnection::doAccountInfoUnsubscribe(Json::Value& jvResult, const Json::V } } +void WSConnection::doAccountTransactionSubscribe(Json::Value& jvResult, const Json::Value& jvRequest) +{ + if (!jvRequest.isMember("accounts")) + { + jvResult["error"] = "missingField"; + } + else if (jvRequest["accounts"].empty()) + { + jvResult["error"] = "emptySet"; + } + else + { + boost::unordered_set usnaAccoundIds = parseAccountIds(jvRequest["accounts"]); + + if (usnaAccoundIds.empty()) + { + jvResult["error"] = "malformedAccount"; + } + else + { + boost::mutex::scoped_lock sl(mLock); + + BOOST_FOREACH(const NewcoinAddress& naAccountID, usnaAccoundIds) + { + mSubAccountTransaction.insert(naAccountID); + } + + theApp->getOPs().subAccountTransaction(this, usnaAccoundIds); + } + } +} + +void WSConnection::doAccountTransactionUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest) +{ + if (!jvRequest.isMember("accounts")) + { + jvResult["error"] = "missingField"; + } + else if (jvRequest["accounts"].empty()) + { + jvResult["error"] = "emptySet"; + } + else + { + boost::unordered_set usnaAccoundIds = parseAccountIds(jvRequest["accounts"]); + + if (usnaAccoundIds.empty()) + { + jvResult["error"] = "malformedAccount"; + } + else + { + boost::mutex::scoped_lock sl(mLock); + + BOOST_FOREACH(const NewcoinAddress& naAccountID, usnaAccoundIds) + { + mSubAccountTransaction.erase(naAccountID); + } + + theApp->getOPs().unsubAccountTransaction(this, usnaAccoundIds); + } + } +} + void WSConnection::doLedgerSubcribe(Json::Value& jvResult, const Json::Value& jvRequest) { if (!theApp->getOPs().subLedger(this))