Work toward subscribe accounts by ledger index.

This commit is contained in:
Arthur Britto
2013-01-05 16:51:45 -08:00
parent b912eeb0af
commit 04c17ac1f3
5 changed files with 80 additions and 54 deletions

View File

@@ -27,6 +27,11 @@
SETUP_LOG();
DECLARE_INSTANCE(InfoSub);
void InfoSub::onSendEmpty()
{
}
NetworkOPs::NetworkOPs(boost::asio::io_service& io_service, LedgerMaster* pLedgerMaster) :
mMode(omDISCONNECTED), mNeedNetworkLedger(false), mNetTimer(io_service), mLedgerMaster(pLedgerMaster),
mCloseTimeOffset(0), mLastCloseProposers(0), mLastCloseConvergeTime(1000 * LEDGER_IDLE_INTERVAL),
@@ -1189,7 +1194,7 @@ void NetworkOPs::pubLedger(Ledger::ref lpAccepted)
}
}
// we don't lock since pubAcceptedTransaction is locking
// Don't lock since pubAcceptedTransaction is locking.
if (!mSubTransactions.empty() || !mSubRTTransactions.empty() || !mSubAccount.empty() || !mSubRTAccount.empty() || !mSubmitMap.empty() )
{
SHAMap& txSet = *lpAccepted->peekTransactionMap();
@@ -1241,10 +1246,12 @@ Json::Value NetworkOPs::transJson(const SerializedTransaction& stTxn, TER terRes
void NetworkOPs::pubAcceptedTransaction(Ledger::ref lpCurrent, const SerializedTransaction& stTxn, TER terResult,TransactionMetaSet::pointer& meta)
{
Json::Value jvObj = transJson(stTxn, terResult, true, lpCurrent, "transaction");
if(meta) jvObj["meta"]=meta->getJson(0);
if (meta) jvObj["meta"] = meta->getJson(0);
{
boost::recursive_mutex::scoped_lock sl(mMonitorLock);
BOOST_FOREACH(InfoSub* ispListener, mSubTransactions)
{
ispListener->send(jvObj);
@@ -1256,22 +1263,22 @@ void NetworkOPs::pubAcceptedTransaction(Ledger::ref lpCurrent, const SerializedT
}
}
pubAccountTransaction(lpCurrent,stTxn,terResult,true,meta);
pubAccountTransaction(lpCurrent, stTxn, terResult, true, meta);
}
void NetworkOPs::pubAccountTransaction(Ledger::ref lpCurrent, const SerializedTransaction& stTxn, TER terResult, bool bAccepted,TransactionMetaSet::pointer& meta)
void NetworkOPs::pubAccountTransaction(Ledger::ref lpCurrent, const SerializedTransaction& stTxn, TER terResult, bool bAccepted, TransactionMetaSet::pointer& meta)
{
boost::unordered_set<InfoSub*> notify;
{
boost::recursive_mutex::scoped_lock sl(mMonitorLock);
if(!bAccepted && mSubRTAccount.empty()) return;
if (!bAccepted && mSubRTAccount.empty()) return;
if (!mSubAccount.empty() || (!mSubRTAccount.empty()) )
{
typedef std::map<RippleAddress, bool>::value_type AccountPair;
BOOST_FOREACH(const AccountPair& affectedAccount, getAffectedAccounts(stTxn))
{
subInfoMapIterator simiIt = mSubRTAccount.find(affectedAccount.first.getAccountID());
@@ -1283,7 +1290,8 @@ void NetworkOPs::pubAccountTransaction(Ledger::ref lpCurrent, const SerializedTr
notify.insert(ispListener);
}
}
if(bAccepted)
if (bAccepted)
{
simiIt = mSubAccount.find(affectedAccount.first.getAccountID());
@@ -1302,7 +1310,8 @@ void NetworkOPs::pubAccountTransaction(Ledger::ref lpCurrent, const SerializedTr
if (!notify.empty())
{
Json::Value jvObj = transJson(stTxn, terResult, bAccepted, lpCurrent, "account");
if(meta) jvObj["meta"]=meta->getJson(0);
if (meta) jvObj["meta"] = meta->getJson(0);
BOOST_FOREACH(InfoSub* ispListener, notify)
{
@@ -1344,12 +1353,15 @@ std::map<RippleAddress,bool> NetworkOPs::getAffectedAccounts(const SerializedTra
// Monitoring
//
void NetworkOPs::subAccount(InfoSub* ispListener, const boost::unordered_set<RippleAddress>& vnaAccountIDs,bool rt)
void NetworkOPs::subAccount(InfoSub* ispListener, const boost::unordered_set<RippleAddress>& vnaAccountIDs, uint32 uLedgerIndex, bool rt)
{
subInfoMapType& subMap=mSubAccount;
if(rt) subMap=mSubRTAccount;
subInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount;
// For the connection, monitor each account.
BOOST_FOREACH(const RippleAddress& naAccountID, vnaAccountIDs)
{
ispListener->insertSubAccountInfo(naAccountID, uLedgerIndex);
}
boost::recursive_mutex::scoped_lock sl(mMonitorLock);
@@ -1358,7 +1370,7 @@ void NetworkOPs::subAccount(InfoSub* ispListener, const boost::unordered_set<Rip
subInfoMapType::iterator simIterator = subMap.find(naAccountID.getAccountID());
if (simIterator == subMap.end())
{
// Not found
// Not found, note that account has a new single listner.
boost::unordered_set<InfoSub*> usisElement;
usisElement.insert(ispListener);
@@ -1366,21 +1378,30 @@ void NetworkOPs::subAccount(InfoSub* ispListener, const boost::unordered_set<Rip
}
else
{
// Found
// Found, note that the account has another listener.
simIterator->second.insert(ispListener);
}
}
}
void NetworkOPs::unsubAccount(InfoSub* ispListener, const boost::unordered_set<RippleAddress>& vnaAccountIDs,bool rt)
void NetworkOPs::unsubAccount(InfoSub* ispListener, const boost::unordered_set<RippleAddress>& vnaAccountIDs, bool rt)
{
subInfoMapType& subMap= rt ? mSubRTAccount : mSubAccount;
subInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount;
// For the connection, unmonitor each account.
// FIXME: Don't we need to unsub?
// BOOST_FOREACH(const RippleAddress& naAccountID, vnaAccountIDs)
// {
// ispListener->deleteSubAccountInfo(naAccountID);
// }
boost::recursive_mutex::scoped_lock sl(mMonitorLock);
BOOST_FOREACH(const RippleAddress& naAccountID, vnaAccountIDs)
{
subInfoMapType::iterator simIterator = subMap.find(naAccountID.getAccountID());
if (simIterator == mSubAccount.end())
{
// Not found. Done.

View File

@@ -26,12 +26,6 @@ class RPCSub;
class InfoSub : public IS_INSTANCE(InfoSub)
{
public:
virtual ~InfoSub();
virtual void send(const Json::Value& jvObj) = 0;
protected:
boost::unordered_set<RippleAddress> mSubAccountInfo;
boost::unordered_set<RippleAddress> mSubAccountTransaction;
@@ -39,9 +33,17 @@ protected:
boost::mutex mLockInfo;
public:
void insertSubAccountInfo(RippleAddress addr)
virtual ~InfoSub();
virtual void send(const Json::Value& jvObj) = 0;
void onSendEmpty();
void insertSubAccountInfo(RippleAddress addr, uint32 uLedgerIndex)
{
boost::mutex::scoped_lock sl(mLockInfo);
mSubAccountInfo.insert(addr);
}
};
@@ -267,8 +269,8 @@ public:
//
// Monitoring: subscriber side
//
void subAccount(InfoSub* ispListener, const boost::unordered_set<RippleAddress>& vnaAccountIDs,bool rt);
void unsubAccount(InfoSub* ispListener, const boost::unordered_set<RippleAddress>& vnaAccountIDs,bool rt);
void subAccount(InfoSub* ispListener, const boost::unordered_set<RippleAddress>& vnaAccountIDs, uint32 uLedgerIndex, bool rt);
void unsubAccount(InfoSub* ispListener, const boost::unordered_set<RippleAddress>& vnaAccountIDs, bool rt);
bool subLedger(InfoSub* ispListener, Json::Value& jvResult);
bool unsubLedger(InfoSub* ispListener);

View File

@@ -514,7 +514,7 @@ Json::Value RPCHandler::doProfile(Json::Value jvRequest)
STAmount(uCurrencyOfferB, naAccountB.getAccountID(), 1+n), // saTakerGets
0); // uExpiration
if(bSubmit)
if (bSubmit)
tpOfferA = mNetOps->submitTransactionSync(tpOfferA); // FIXME: Don't use synch interface
}
@@ -1194,7 +1194,7 @@ Json::Value RPCHandler::doTxHistory(Json::Value jvRequest)
SQL_FOREACH(db, sql)
{
Transaction::pointer trans=Transaction::transactionFromSQL(db, false);
if(trans) txs.append(trans->getJson(0));
if (trans) txs.append(trans->getJson(0));
}
}
@@ -1344,8 +1344,10 @@ Json::Value RPCHandler::doAccountTransactions(Json::Value jvRequest)
for (std::vector< std::pair<Transaction::pointer, TransactionMetaSet::pointer> >::iterator it = txns.begin(), end = txns.end(); it != end; ++it)
{
Json::Value obj(Json::objectValue);
if(it->first) obj["tx"]=it->first->getJson(1);
if(it->second) obj["meta"]=it->second->getJson(0);
if (it->first) obj["tx"] = it->first->getJson(1);
if (it->second) obj["meta"] = it->second->getJson(0);
ret["transactions"].append(obj);
}
return ret;
@@ -2149,6 +2151,9 @@ Json::Value RPCHandler::doSubscribe(Json::Value jvRequest)
{
InfoSub* ispSub;
Json::Value jvResult(Json::objectValue);
uint32 uLedgerIndex = jvRequest.isMember("ledger_index") && jvRequest["ledger_index"].isNumeric()
? jvRequest["ledger_index"].asUInt()
: 0;
if (!mInfoSub && !jvRequest.isMember("url"))
{
@@ -2235,12 +2240,7 @@ Json::Value RPCHandler::doSubscribe(Json::Value jvRequest)
}
else
{
BOOST_FOREACH(const RippleAddress& naAccountID, usnaAccoundIds)
{
ispSub->insertSubAccountInfo(naAccountID);
}
mNetOps->subAccount(ispSub, usnaAccoundIds, true);
mNetOps->subAccount(ispSub, usnaAccoundIds, uLedgerIndex, true);
}
}
@@ -2254,12 +2254,7 @@ Json::Value RPCHandler::doSubscribe(Json::Value jvRequest)
}
else
{
BOOST_FOREACH(const RippleAddress& naAccountID, usnaAccoundIds)
{
ispSub->insertSubAccountInfo(naAccountID);
}
mNetOps->subAccount(ispSub, usnaAccoundIds, false);
mNetOps->subAccount(ispSub, usnaAccoundIds, uLedgerIndex, false);
}
}
@@ -2342,12 +2337,7 @@ Json::Value RPCHandler::doUnsubscribe(Json::Value jvRequest)
}
else
{
BOOST_FOREACH(const RippleAddress& naAccountID, usnaAccoundIds)
{
ispSub->insertSubAccountInfo(naAccountID);
}
mNetOps->unsubAccount(ispSub, usnaAccoundIds,true);
mNetOps->unsubAccount(ispSub, usnaAccoundIds, true);
}
}
@@ -2361,12 +2351,7 @@ Json::Value RPCHandler::doUnsubscribe(Json::Value jvRequest)
}
else
{
BOOST_FOREACH(const RippleAddress& naAccountID, usnaAccoundIds)
{
ispSub->insertSubAccountInfo(naAccountID);
}
mNetOps->unsubAccount(ispSub, usnaAccoundIds,false);
mNetOps->unsubAccount(ispSub, usnaAccoundIds, false);
}
}

View File

@@ -103,6 +103,22 @@ public:
}
}
void on_send_empty(connection_ptr cpClient)
{
typedef boost::shared_ptr< WSConnection<endpoint_type> > wsc_ptr;
wsc_ptr ptr;
{
boost::mutex::scoped_lock sl(mMapLock);
typename boost::unordered_map<connection_ptr, wsc_ptr>::iterator it = mMap.find(cpClient);
if (it == mMap.end())
return;
ptr = it->second;
}
ptr->onSendEmpty();
}
void on_open(connection_ptr cpClient)
{
boost::mutex::scoped_lock sl(mMapLock);