Add WS ledger_accounts_subscribe and ledger_accounts_unsubscribe.

This commit is contained in:
Arthur Britto
2012-06-26 16:55:29 -07:00
parent 34273d74d7
commit 1bf80d8c51
3 changed files with 75 additions and 12 deletions

View File

@@ -638,9 +638,7 @@ std::vector< std::pair<uint32, uint256> >
SQL_FOREACH(db, sql)
{
std::string txID;
db->getStr("TransID", txID);
affectedAccounts.push_back(std::make_pair<uint32, uint256>(db->getInt("LedgerSeq"), uint256(txID)));
affectedAccounts.push_back(std::make_pair<uint32, uint256>(db->getInt("LedgerSeq"), uint256(db->getStrBinary("TransID"))));
}
}
@@ -660,9 +658,7 @@ std::vector<NewcoinAddress>
ScopedLock dblock = theApp->getTxnDB()->getDBLock();
SQL_FOREACH(db, sql)
{
std::string str;
db->getStr("Account", str);
if (acct.setAccountID(str))
if (acct.setAccountID(db->getStrBinary("Account")))
accounts.push_back(acct);
}
}
@@ -736,6 +732,34 @@ void NetworkOPs::pubLedger(const Ledger::pointer& lpAccepted)
ispListener->send(jvObj);
}
}
if (!mSubLedgerAccounts.empty())
{
Json::Value jvAccounts(Json::arrayValue);
BOOST_FOREACH(const NewcoinAddress& naAccountID, getAffectedAccounts(lpAccepted->getLedgerSeq()))
{
jvAccounts.append(Json::Value(naAccountID.humanAccountID()));
}
Json::Value jvObj(Json::objectValue);
jvObj["type"] = "ledgerAcceptedAccounts";
jvObj["seq"] = lpAccepted->getLedgerSeq();
jvObj["hash"] = lpAccepted->getHash().ToString();
jvObj["time"] = Json::Value::UInt(lpAccepted->getCloseTimeNC());
jvObj["accounts"] = jvAccounts;
boost::interprocess::sharable_lock<boost::interprocess::interprocess_upgradable_mutex> sl(mMonitorLock);
BOOST_FOREACH(InfoSub* ispListener, mSubLedgerAccounts)
{
ispListener->send(jvObj);
}
}
}
void NetworkOPs::pubTransaction(const Ledger::pointer& lpCurrent, const SerializedTransaction& stTxn, TransactionEngineResult terResult, const std::vector<NewcoinAddress>& naAffectedAccountIds)
{
}
//
@@ -813,4 +837,16 @@ bool NetworkOPs::unsubLedger(InfoSub* ispListener)
return !!mSubLedger.erase(ispListener);
}
// <-- bool: true=added, false=already there
bool NetworkOPs::subLedgerAccounts(InfoSub* ispListener)
{
return mSubLedgerAccounts.insert(ispListener).second;
}
// <-- bool: true=erased, false=was not there
bool NetworkOPs::unsubLedgerAccounts(InfoSub* ispListener)
{
return !!mSubLedgerAccounts.erase(ispListener);
}
// vim:ts=4

View File

@@ -58,7 +58,8 @@ protected:
boost::interprocess::interprocess_upgradable_mutex mMonitorLock;
subInfoMapType mSubAccountInfo;
boost::unordered_set<InfoSub*> mSubLedger;
boost::unordered_set<InfoSub*> mSubLedger; // ledger accepteds
boost::unordered_set<InfoSub*> mSubLedgerAccounts; // ledger accepteds + affected accounts
public:
NetworkOPs(boost::asio::io_service& io_service, LedgerMaster* pLedgerMaster);
@@ -153,6 +154,7 @@ public:
std::vector< std::pair<uint32, uint256> >
getAffectedAccounts(const NewcoinAddress& account, uint32 minLedger, uint32 maxLedger);
std::vector<NewcoinAddress> getAffectedAccounts(uint32 ledgerSeq);
std::vector<SerializedTransaction> getLedgerTransactions(uint32 ledgerSeq);
//
// Monitoring: publisher side
@@ -160,6 +162,7 @@ public:
void pubAccountInfo(const NewcoinAddress& naAccountID, const Json::Value& jvObj);
void pubLedger(const Ledger::pointer& lpAccepted);
void pubTransaction(const Ledger::pointer& lpCurrent, const SerializedTransaction& stTxn, TransactionEngineResult terResult, const std::vector<NewcoinAddress>& naAffectedAccountIds);
//
// Monitoring: subscriber side
@@ -174,6 +177,9 @@ public:
bool subLedger(InfoSub* ispListener);
bool unsubLedger(InfoSub* ispListener);
bool subLedgerAccounts(InfoSub* ispListener);
bool unsubLedgerAccounts(InfoSub* ispListener);
};
#endif

View File

@@ -77,8 +77,11 @@ public:
// Commands
void doAccountInfoSubscribe(Json::Value& jvResult, const Json::Value& jvRequest);
void doAccountInfoUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest);
void doLedgerSubcribe(Json::Value& jvResult, const Json::Value& jvRequest);
void doLedgerUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest);
void doLedgerAccountsSubcribe(Json::Value& jvResult, const Json::Value& jvRequest);
void doLedgerAccountsUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest);
};
@@ -265,6 +268,7 @@ void WSDoor::stop()
WSConnection::~WSConnection()
{
theApp->getOPs().unsubLedger(this);
theApp->getOPs().unsubLedgerAccounts(this);
theApp->getOPs().unsubAccountInfo(this, mSubAccountInfo);
}
@@ -283,13 +287,14 @@ Json::Value WSConnection::invokeCommand(const Json::Value& jvRequest)
const char* pCommand;
doFuncPtr dfpFunc;
} commandsA[] = {
{ "account_info_subscribe", &WSConnection::doAccountInfoSubscribe },
{ "account_info_unsubscribe", &WSConnection::doAccountInfoUnsubscribe },
{ "ledger_subscribe", &WSConnection::doLedgerSubcribe },
{ "ledger_unsubscribe", &WSConnection::doLedgerUnsubscribe },
{ "account_info_subscribe", &WSConnection::doAccountInfoSubscribe },
{ "account_info_unsubscribe", &WSConnection::doAccountInfoUnsubscribe },
{ "ledger_subscribe", &WSConnection::doLedgerSubcribe },
{ "ledger_unsubscribe", &WSConnection::doLedgerUnsubscribe },
{ "ledger_accounts_subscribe", &WSConnection::doLedgerAccountsSubcribe },
{ "ledger_accounts_unsubscribe", &WSConnection::doLedgerAccountsUnsubscribe },
};
if (!jvRequest.isMember("command"))
{
Json::Value jvResult(Json::objectValue);
@@ -446,4 +451,20 @@ void WSConnection::doLedgerUnsubscribe(Json::Value& jvResult, const Json::Value&
}
}
void WSConnection::doLedgerAccountsSubcribe(Json::Value& jvResult, const Json::Value& jvRequest)
{
if (!theApp->getOPs().subLedgerAccounts(this))
{
jvResult["error"] = "ledgerAccountsSubscribed";
}
}
void WSConnection::doLedgerAccountsUnsubscribe(Json::Value& jvResult, const Json::Value& jvRequest)
{
if (!theApp->getOPs().unsubLedgerAccounts(this))
{
jvResult["error"] = "ledgerAccountsNotSubscribed";
}
}
// vim:ts=4