Add WS transaction_subscribe and transaction_unsubscribe support.

This commit is contained in:
Arthur Britto
2012-06-26 20:45:19 -07:00
parent 1bf80d8c51
commit 03342cacd8
3 changed files with 90 additions and 33 deletions

View File

@@ -717,49 +717,68 @@ void NetworkOPs::pubAccountInfo(const NewcoinAddress& naAccountID, const Json::V
void NetworkOPs::pubLedger(const Ledger::pointer& lpAccepted)
{
if (!mSubLedger.empty())
{
Json::Value jvObj(Json::objectValue);
jvObj["type"] = "ledgerAccepted";
jvObj["seq"] = lpAccepted->getLedgerSeq();
jvObj["hash"] = lpAccepted->getHash().ToString();
jvObj["time"] = Json::Value::UInt(lpAccepted->getCloseTimeNC());
boost::interprocess::sharable_lock<boost::interprocess::interprocess_upgradable_mutex> sl(mMonitorLock);
BOOST_FOREACH(InfoSub* ispListener, mSubLedger)
if (!mSubLedger.empty())
{
ispListener->send(jvObj);
Json::Value jvObj(Json::objectValue);
jvObj["type"] = "ledgerAccepted";
jvObj["seq"] = lpAccepted->getLedgerSeq();
jvObj["hash"] = lpAccepted->getHash().ToString();
jvObj["time"] = Json::Value::UInt(lpAccepted->getCloseTimeNC());
BOOST_FOREACH(InfoSub* ispListener, mSubLedger)
{
ispListener->send(jvObj);
}
}
}
if (!mSubLedgerAccounts.empty())
{
Json::Value jvAccounts(Json::arrayValue);
BOOST_FOREACH(const NewcoinAddress& naAccountID, getAffectedAccounts(lpAccepted->getLedgerSeq()))
{
jvAccounts.append(Json::Value(naAccountID.humanAccountID()));
}
Json::Value jvObj(Json::objectValue);
jvObj["type"] = "ledgerAcceptedAccounts";
jvObj["seq"] = lpAccepted->getLedgerSeq();
jvObj["hash"] = lpAccepted->getHash().ToString();
jvObj["time"] = Json::Value::UInt(lpAccepted->getCloseTimeNC());
jvObj["accounts"] = jvAccounts;
boost::interprocess::sharable_lock<boost::interprocess::interprocess_upgradable_mutex> sl(mMonitorLock);
BOOST_FOREACH(InfoSub* ispListener, mSubLedgerAccounts)
if (!mSubLedgerAccounts.empty())
{
ispListener->send(jvObj);
Json::Value jvAccounts(Json::arrayValue);
BOOST_FOREACH(const NewcoinAddress& naAccountID, getAffectedAccounts(lpAccepted->getLedgerSeq()))
{
jvAccounts.append(Json::Value(naAccountID.humanAccountID()));
}
Json::Value jvObj(Json::objectValue);
jvObj["type"] = "ledgerAcceptedAccounts";
jvObj["seq"] = lpAccepted->getLedgerSeq();
jvObj["hash"] = lpAccepted->getHash().ToString();
jvObj["time"] = Json::Value::UInt(lpAccepted->getCloseTimeNC());
jvObj["accounts"] = jvAccounts;
BOOST_FOREACH(InfoSub* ispListener, mSubLedgerAccounts)
{
ispListener->send(jvObj);
}
}
}
}
void NetworkOPs::pubTransaction(const Ledger::pointer& lpCurrent, const SerializedTransaction& stTxn, TransactionEngineResult terResult, const std::vector<NewcoinAddress>& naAffectedAccountIds)
{
boost::interprocess::scoped_lock<boost::interprocess::interprocess_upgradable_mutex> sl(mMonitorLock);
if (!mSubTransaction.empty())
{
Json::Value jvObj(Json::objectValue);
jvObj["type"] = "transactionProposed";
jvObj["seq"] = lpCurrent->getLedgerSeq();
jvObj["transaction"] = stTxn.getJson(0); // XXX Verify what options there are.
BOOST_FOREACH(InfoSub* ispListener, mSubTransaction)
{
ispListener->send(jvObj);
}
}
}
//
@@ -849,4 +868,16 @@ bool NetworkOPs::unsubLedgerAccounts(InfoSub* ispListener)
return !!mSubLedgerAccounts.erase(ispListener);
}
// <-- bool: true=added, false=already there
bool NetworkOPs::subTransaction(InfoSub* ispListener)
{
return mSubTransaction.insert(ispListener).second;
}
// <-- bool: true=erased, false=was not there
bool NetworkOPs::unsubTransaction(InfoSub* ispListener)
{
return !!mSubTransaction.erase(ispListener);
}
// vim:ts=4

View File

@@ -56,10 +56,13 @@ protected:
typedef boost::unordered_map<NewcoinAddress,boost::unordered_set<InfoSub*> > subInfoMapType;
typedef boost::unordered_map<NewcoinAddress,boost::unordered_set<InfoSub*> >::iterator subInfoMapIterator;
// XXX Split into more locks.
boost::interprocess::interprocess_upgradable_mutex mMonitorLock;
subInfoMapType mSubAccountInfo;
boost::unordered_set<InfoSub*> mSubLedger; // ledger accepteds
boost::unordered_set<InfoSub*> mSubLedgerAccounts; // ledger accepteds + affected accounts
boost::unordered_set<InfoSub*> mSubTransaction; // all transactions
subInfoMapType mSubTransactionAccounts;
public:
NetworkOPs(boost::asio::io_service& io_service, LedgerMaster* pLedgerMaster);
@@ -180,6 +183,9 @@ public:
bool subLedgerAccounts(InfoSub* ispListener);
bool unsubLedgerAccounts(InfoSub* ispListener);
bool subTransaction(InfoSub* ispListener);
bool unsubTransaction(InfoSub* ispListener);
};
#endif

View File

@@ -82,9 +82,10 @@ public:
void doLedgerUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest);
void doLedgerAccountsSubcribe(Json::Value& jvResult, const Json::Value& jvRequest);
void doLedgerAccountsUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest);
void doTransactionSubcribe(Json::Value& jvResult, const Json::Value& jvRequest);
void doTransactionUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest);
};
// A single instance of this object is made.
// This instance dispatches all events. There is no per connection persistence.
template <typename endpoint_type>
@@ -267,6 +268,7 @@ void WSDoor::stop()
WSConnection::~WSConnection()
{
theApp->getOPs().unsubTransaction(this);
theApp->getOPs().unsubLedger(this);
theApp->getOPs().unsubLedgerAccounts(this);
theApp->getOPs().unsubAccountInfo(this, mSubAccountInfo);
@@ -287,12 +289,14 @@ Json::Value WSConnection::invokeCommand(const Json::Value& jvRequest)
const char* pCommand;
doFuncPtr dfpFunc;
} commandsA[] = {
{ "account_info_subscribe", &WSConnection::doAccountInfoSubscribe },
{ "account_info_unsubscribe", &WSConnection::doAccountInfoUnsubscribe },
{ "ledger_subscribe", &WSConnection::doLedgerSubcribe },
{ "ledger_unsubscribe", &WSConnection::doLedgerUnsubscribe },
{ "account_info_subscribe", &WSConnection::doAccountInfoSubscribe },
{ "account_info_unsubscribe", &WSConnection::doAccountInfoUnsubscribe },
{ "ledger_subscribe", &WSConnection::doLedgerSubcribe },
{ "ledger_unsubscribe", &WSConnection::doLedgerUnsubscribe },
{ "ledger_accounts_subscribe", &WSConnection::doLedgerAccountsSubcribe },
{ "ledger_accounts_unsubscribe", &WSConnection::doLedgerAccountsUnsubscribe },
{ "transaction_subscribe", &WSConnection::doTransactionSubcribe },
{ "transaction_unsubscribe", &WSConnection::doTransactionUnsubscribe },
};
if (!jvRequest.isMember("command"))
@@ -467,4 +471,20 @@ void WSConnection::doLedgerAccountsUnsubscribe(Json::Value& jvResult, const Json
}
}
void WSConnection::doTransactionSubcribe(Json::Value& jvResult, const Json::Value& jvRequest)
{
if (!theApp->getOPs().subTransaction(this))
{
jvResult["error"] = "TransactionsSubscribed";
}
}
void WSConnection::doTransactionUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest)
{
if (!theApp->getOPs().unsubTransaction(this))
{
jvResult["error"] = "TransactionsNotSubscribed";
}
}
// vim:ts=4