Add support for WS account_transaction_subscribe and account_transaction_unsubscribe.

This commit is contained in:
Arthur Britto
2012-06-29 16:23:06 -07:00
parent 22742314f3
commit 5a6349b328
3 changed files with 196 additions and 27 deletions

View File

@@ -650,7 +650,7 @@ std::vector< std::pair<uint32, uint256> >
}
std::vector<NewcoinAddress>
NetworkOPs::getAffectedAccounts(uint32 ledgerSeq)
NetworkOPs::getLedgerAffectedAccounts(uint32 ledgerSeq)
{
std::vector<NewcoinAddress> accounts;
std::string sql = str(boost::format
@@ -702,7 +702,7 @@ void NetworkOPs::pubAccountInfo(const NewcoinAddress& naAccountID, const Json::V
{
boost::interprocess::sharable_lock<boost::interprocess::interprocess_upgradable_mutex> sl(mMonitorLock);
subInfoMapType::iterator simIterator = mSubAccountInfo.find(naAccountID);
subInfoMapType::iterator simIterator = mSubAccountInfo.find(naAccountID.getAccountID());
if (simIterator == mSubAccountInfo.end())
{
@@ -742,11 +742,11 @@ void NetworkOPs::pubLedger(const Ledger::pointer& lpAccepted)
{
boost::interprocess::sharable_lock<boost::interprocess::interprocess_upgradable_mutex> sl(mMonitorLock);
if (!mSubLedgerAccounts.empty())
if (!mSubAccountTransaction.empty())
{
Json::Value jvAccounts(Json::arrayValue);
BOOST_FOREACH(const NewcoinAddress& naAccountID, getAffectedAccounts(lpAccepted->getLedgerSeq()))
BOOST_FOREACH(const NewcoinAddress& naAccountID, getLedgerAffectedAccounts(lpAccepted->getLedgerSeq()))
{
jvAccounts.append(Json::Value(naAccountID.humanAccountID()));
}
@@ -769,17 +769,62 @@ void NetworkOPs::pubLedger(const Ledger::pointer& lpAccepted)
void NetworkOPs::pubTransaction(const Ledger::pointer& lpCurrent, const SerializedTransaction& stTxn, TransactionEngineResult terResult)
{
// std::vector<NewcoinAddress> affectedAccounts = stTxn.getAffectedAccounts();
boost::interprocess::scoped_lock<boost::interprocess::interprocess_upgradable_mutex> sl(mMonitorLock);
if (!mSubTransaction.empty())
{
boost::interprocess::sharable_lock<boost::interprocess::interprocess_upgradable_mutex> sl(mMonitorLock);
if (!mSubTransaction.empty())
{
Json::Value jvObj(Json::objectValue);
jvObj["type"] = "transactionProposed";
jvObj["seq"] = lpCurrent->getLedgerSeq();
jvObj["transaction"] = stTxn.getJson(0);
BOOST_FOREACH(InfoSub* ispListener, mSubTransaction)
{
ispListener->send(jvObj);
}
}
}
boost::unordered_set<InfoSub*> usisNotify;
{
boost::interprocess::sharable_lock<boost::interprocess::interprocess_upgradable_mutex> sl(mMonitorLock);
if (!mSubAccountTransaction.empty())
{
BOOST_FOREACH(const NewcoinAddress& naAccountPublic, stTxn.getAffectedAccounts())
{
subInfoMapIterator simiIt = mSubAccountTransaction.find(naAccountPublic.getAccountID());
if (simiIt != mSubAccountTransaction.end())
{
BOOST_FOREACH(InfoSub* ispListener, simiIt->second)
{
usisNotify.insert(ispListener);
}
}
}
}
}
if (!usisNotify.empty())
{
Json::Value jvAccounts(Json::arrayValue);
BOOST_FOREACH(const NewcoinAddress& naAccountID, stTxn.getAffectedAccounts())
{
jvAccounts.append(Json::Value(naAccountID.humanAccountID()));
}
Json::Value jvObj(Json::objectValue);
jvObj["type"] = "transactionProposed";
jvObj["type"] = "accountTransactionProposed";
jvObj["seq"] = lpCurrent->getLedgerSeq();
jvObj["transaction"] = stTxn.getJson(0); // XXX Verify what options there are.
jvObj["accounts"] = jvAccounts;
jvObj["transaction"] = stTxn.getJson(0);
BOOST_FOREACH(InfoSub* ispListener, mSubTransaction)
BOOST_FOREACH(InfoSub* ispListener, usisNotify)
{
ispListener->send(jvObj);
}
@@ -796,14 +841,14 @@ void NetworkOPs::subAccountInfo(InfoSub* ispListener, const boost::unordered_set
BOOST_FOREACH(const NewcoinAddress& naAccountID, vnaAccountIDs)
{
subInfoMapType::iterator simIterator = mSubAccountInfo.find(naAccountID);
subInfoMapType::iterator simIterator = mSubAccountInfo.find(naAccountID.getAccountID());
if (simIterator == mSubAccountInfo.end())
{
// Not found
boost::unordered_set<InfoSub*> usisElement;
usisElement.insert(ispListener);
mSubAccountInfo.insert(simIterator, make_pair(naAccountID, usisElement));
mSubAccountInfo.insert(simIterator, make_pair(naAccountID.getAccountID(), usisElement));
}
else
{
@@ -819,7 +864,7 @@ void NetworkOPs::unsubAccountInfo(InfoSub* ispListener, const boost::unordered_s
BOOST_FOREACH(const NewcoinAddress& naAccountID, vnaAccountIDs)
{
subInfoMapType::iterator simIterator = mSubAccountInfo.find(naAccountID);
subInfoMapType::iterator simIterator = mSubAccountInfo.find(naAccountID.getAccountID());
if (simIterator == mSubAccountInfo.end())
{
// Not found. Done.
@@ -839,6 +884,55 @@ void NetworkOPs::unsubAccountInfo(InfoSub* ispListener, const boost::unordered_s
}
}
void NetworkOPs::subAccountTransaction(InfoSub* ispListener, const boost::unordered_set<NewcoinAddress>& vnaAccountIDs)
{
boost::interprocess::scoped_lock<boost::interprocess::interprocess_upgradable_mutex> sl(mMonitorLock);
BOOST_FOREACH(const NewcoinAddress& naAccountID, vnaAccountIDs)
{
subInfoMapType::iterator simIterator = mSubAccountTransaction.find(naAccountID.getAccountID());
if (simIterator == mSubAccountTransaction.end())
{
// Not found
boost::unordered_set<InfoSub*> usisElement;
usisElement.insert(ispListener);
mSubAccountTransaction.insert(simIterator, make_pair(naAccountID.getAccountID(), usisElement));
}
else
{
// Found
simIterator->second.insert(ispListener);
}
}
}
void NetworkOPs::unsubAccountTransaction(InfoSub* ispListener, const boost::unordered_set<NewcoinAddress>& vnaAccountIDs)
{
boost::interprocess::scoped_lock<boost::interprocess::interprocess_upgradable_mutex> sl(mMonitorLock);
BOOST_FOREACH(const NewcoinAddress& naAccountID, vnaAccountIDs)
{
subInfoMapType::iterator simIterator = mSubAccountTransaction.find(naAccountID.getAccountID());
if (simIterator == mSubAccountTransaction.end())
{
// Not found. Done.
nothing();
}
else
{
// Found
simIterator->second.erase(ispListener);
if (simIterator->second.empty())
{
// Don't need hash entry.
mSubAccountTransaction.erase(simIterator);
}
}
}
}
#if 0
void NetworkOPs::subAccountChanges(InfoSub* ispListener, const uint256 uLedgerHash)
{

View File

@@ -53,16 +53,18 @@ protected:
void setMode(OperatingMode);
typedef boost::unordered_map<NewcoinAddress,boost::unordered_set<InfoSub*> > subInfoMapType;
typedef boost::unordered_map<NewcoinAddress,boost::unordered_set<InfoSub*> >::iterator subInfoMapIterator;
typedef boost::unordered_map<uint160,boost::unordered_set<InfoSub*> > subInfoMapType;
typedef boost::unordered_map<uint160,boost::unordered_set<InfoSub*> >::value_type subInfoMapValue;
typedef boost::unordered_map<uint160,boost::unordered_set<InfoSub*> >::iterator subInfoMapIterator;
// XXX Split into more locks.
boost::interprocess::interprocess_upgradable_mutex mMonitorLock;
subInfoMapType mSubAccountInfo;
subInfoMapType mSubAccountTransaction;
boost::unordered_set<InfoSub*> mSubLedger; // ledger accepteds
boost::unordered_set<InfoSub*> mSubLedgerAccounts; // ledger accepteds + affected accounts
boost::unordered_set<InfoSub*> mSubTransaction; // all transactions
subInfoMapType mSubTransactionAccounts;
// subInfoMapType mSubTransactionAccounts;
public:
NetworkOPs(boost::asio::io_service& io_service, LedgerMaster* pLedgerMaster);
@@ -156,7 +158,7 @@ public:
// client information retrieval functions
std::vector< std::pair<uint32, uint256> >
getAffectedAccounts(const NewcoinAddress& account, uint32 minLedger, uint32 maxLedger);
std::vector<NewcoinAddress> getAffectedAccounts(uint32 ledgerSeq);
std::vector<NewcoinAddress> getLedgerAffectedAccounts(uint32 ledgerSeq);
std::vector<SerializedTransaction> getLedgerTransactions(uint32 ledgerSeq);
//
@@ -175,6 +177,9 @@ public:
void subAccountInfo(InfoSub* ispListener, const boost::unordered_set<NewcoinAddress>& vnaAccountIDs);
void unsubAccountInfo(InfoSub* ispListener, const boost::unordered_set<NewcoinAddress>& vnaAccountIDs);
void subAccountTransaction(InfoSub* ispListener, const boost::unordered_set<NewcoinAddress>& vnaAccountIDs);
void unsubAccountTransaction(InfoSub* ispListener, const boost::unordered_set<NewcoinAddress>& vnaAccountIDs);
// void subAccountChanges(InfoSub* ispListener, const uint256 uLedgerHash);
// void unsubAccountChanges(InfoSub* ispListener);

View File

@@ -53,6 +53,7 @@ protected:
boost::mutex mLock;
boost::unordered_set<NewcoinAddress> mSubAccountInfo;
boost::unordered_set<NewcoinAddress> mSubAccountTransaction;
WSServerHandler<websocketpp::WSDOOR_SERVER>* mHandler;
connection_ptr mConnection;
@@ -77,6 +78,8 @@ public:
// Commands
void doAccountInfoSubscribe(Json::Value& jvResult, const Json::Value& jvRequest);
void doAccountInfoUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest);
void doAccountTransactionSubscribe(Json::Value& jvResult, const Json::Value& jvRequest);
void doAccountTransactionUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest);
void doLedgerSubcribe(Json::Value& jvResult, const Json::Value& jvRequest);
void doLedgerUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest);
@@ -272,6 +275,7 @@ WSConnection::~WSConnection()
theApp->getOPs().unsubLedger(this);
theApp->getOPs().unsubLedgerAccounts(this);
theApp->getOPs().unsubAccountInfo(this, mSubAccountInfo);
theApp->getOPs().unsubAccountTransaction(this, mSubAccountTransaction);
}
void WSConnection::send(const Json::Value& jvObj)
@@ -289,14 +293,16 @@ Json::Value WSConnection::invokeCommand(const Json::Value& jvRequest)
const char* pCommand;
doFuncPtr dfpFunc;
} commandsA[] = {
{ "account_info_subscribe", &WSConnection::doAccountInfoSubscribe },
{ "account_info_unsubscribe", &WSConnection::doAccountInfoUnsubscribe },
{ "ledger_subscribe", &WSConnection::doLedgerSubcribe },
{ "ledger_unsubscribe", &WSConnection::doLedgerUnsubscribe },
{ "ledger_accounts_subscribe", &WSConnection::doLedgerAccountsSubcribe },
{ "ledger_accounts_unsubscribe", &WSConnection::doLedgerAccountsUnsubscribe },
{ "transaction_subscribe", &WSConnection::doTransactionSubcribe },
{ "transaction_unsubscribe", &WSConnection::doTransactionUnsubscribe },
{ "account_info_subscribe", &WSConnection::doAccountInfoSubscribe },
{ "account_info_unsubscribe", &WSConnection::doAccountInfoUnsubscribe },
{ "account_transaction_subscribe", &WSConnection::doAccountTransactionSubscribe },
{ "account_transaction_unsubscribe", &WSConnection::doAccountTransactionUnsubscribe },
{ "ledger_subscribe", &WSConnection::doLedgerSubcribe },
{ "ledger_unsubscribe", &WSConnection::doLedgerUnsubscribe },
{ "ledger_accounts_subscribe", &WSConnection::doLedgerAccountsSubcribe },
{ "ledger_accounts_unsubscribe", &WSConnection::doLedgerAccountsUnsubscribe },
{ "transaction_subscribe", &WSConnection::doTransactionSubcribe },
{ "transaction_unsubscribe", &WSConnection::doTransactionUnsubscribe },
};
if (!jvRequest.isMember("command"))
@@ -381,7 +387,7 @@ void WSConnection::doAccountInfoSubscribe(Json::Value& jvResult, const Json::Val
{
jvResult["error"] = "missingField";
}
else if (jvResult["accounts"].empty())
else if (jvRequest["accounts"].empty())
{
jvResult["error"] = "emptySet";
}
@@ -413,7 +419,7 @@ void WSConnection::doAccountInfoUnsubscribe(Json::Value& jvResult, const Json::V
{
jvResult["error"] = "missingField";
}
else if (jvResult["accounts"].empty())
else if (jvRequest["accounts"].empty())
{
jvResult["error"] = "emptySet";
}
@@ -439,6 +445,70 @@ void WSConnection::doAccountInfoUnsubscribe(Json::Value& jvResult, const Json::V
}
}
void WSConnection::doAccountTransactionSubscribe(Json::Value& jvResult, const Json::Value& jvRequest)
{
if (!jvRequest.isMember("accounts"))
{
jvResult["error"] = "missingField";
}
else if (jvRequest["accounts"].empty())
{
jvResult["error"] = "emptySet";
}
else
{
boost::unordered_set<NewcoinAddress> usnaAccoundIds = parseAccountIds(jvRequest["accounts"]);
if (usnaAccoundIds.empty())
{
jvResult["error"] = "malformedAccount";
}
else
{
boost::mutex::scoped_lock sl(mLock);
BOOST_FOREACH(const NewcoinAddress& naAccountID, usnaAccoundIds)
{
mSubAccountTransaction.insert(naAccountID);
}
theApp->getOPs().subAccountTransaction(this, usnaAccoundIds);
}
}
}
void WSConnection::doAccountTransactionUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest)
{
if (!jvRequest.isMember("accounts"))
{
jvResult["error"] = "missingField";
}
else if (jvRequest["accounts"].empty())
{
jvResult["error"] = "emptySet";
}
else
{
boost::unordered_set<NewcoinAddress> usnaAccoundIds = parseAccountIds(jvRequest["accounts"]);
if (usnaAccoundIds.empty())
{
jvResult["error"] = "malformedAccount";
}
else
{
boost::mutex::scoped_lock sl(mLock);
BOOST_FOREACH(const NewcoinAddress& naAccountID, usnaAccoundIds)
{
mSubAccountTransaction.erase(naAccountID);
}
theApp->getOPs().unsubAccountTransaction(this, usnaAccoundIds);
}
}
}
void WSConnection::doLedgerSubcribe(Json::Value& jvResult, const Json::Value& jvRequest)
{
if (!theApp->getOPs().subLedger(this))