Subscribe/Unsubscribe improvements:

* Don't acquire the master lock where it's not needed
* InfoSub tracks RT and validated accounts separately
* Correctly remove accounts from the InfoSub
This commit is contained in:
JoelKatz
2015-03-16 16:47:46 -07:00
committed by Nik Bougalis
parent 2f32910bef
commit 1c2f5d60a5
5 changed files with 96 additions and 73 deletions

View File

@@ -468,10 +468,17 @@ public:
//
void subAccount (
InfoSub::ref ispListener,
const hash_set<RippleAddress>& vnaAccountIDs,
std::uint32_t uLedgerIndex, bool rt);
const hash_set<RippleAddress>& vnaAccountIDs, bool rt);
void unsubAccount (
std::uint64_t uListener, const hash_set<RippleAddress>& vnaAccountIDs,
InfoSub::ref ispListener,
const hash_set<RippleAddress>& vnaAccountIDs,
bool rt);
// Just remove the subscription from the tracking
// not from the InfoSub. Needed for InfoSub destruction
void unsubAccountInternal (
std::uint64_t seq,
const hash_set<RippleAddress>& vnaAccountIDs,
bool rt);
bool subLedger (InfoSub::ref ispListener, Json::Value& jvResult);
@@ -547,7 +554,7 @@ private:
std::unique_ptr <LocalTxs> m_localTX;
std::unique_ptr <FeeVote> m_feeVote;
LockType mLock;
LockType mSubLock;
std::atomic<OperatingMode> mMode;
@@ -1756,7 +1763,7 @@ void NetworkOPsImp::pubServer ()
// list into a local array while holding the lock then release the
// lock and call send on everyone.
//
ScopedLockType sl (mLock);
ScopedLockType sl (mSubLock);
if (!mSubServer.empty ())
{
@@ -2526,7 +2533,7 @@ void NetworkOPsImp::pubProposedTransaction (
Json::Value jvObj = transJson (*stTxn, terResult, false, lpCurrent);
{
ScopedLockType sl (mLock);
ScopedLockType sl (mSubLock);
auto it = mSubRTTransactions.begin ();
while (it != mSubRTTransactions.end ())
@@ -2558,7 +2565,7 @@ void NetworkOPsImp::pubLedger (Ledger::ref accepted)
Ledger::ref lpAccepted = alpAccepted->getLedger ();
{
ScopedLockType sl (mLock);
ScopedLockType sl (mSubLock);
if (!mSubLedger.empty ())
{
@@ -2682,7 +2689,7 @@ void NetworkOPsImp::pubValidatedTransaction (
std::string sObj = to_string (jvObj);
{
ScopedLockType sl (mLock);
ScopedLockType sl (mSubLock);
auto it = mSubTransactions.begin ();
while (it != mSubTransactions.end ())
@@ -2725,7 +2732,7 @@ void NetworkOPsImp::pubAccountTransaction (
int iAccepted = 0;
{
ScopedLockType sl (mLock);
ScopedLockType sl (mSubLock);
if (!bAccepted && mSubRTAccount.empty ()) return;
@@ -2804,22 +2811,21 @@ void NetworkOPsImp::pubAccountTransaction (
// Monitoring
//
void NetworkOPsImp::subAccount (InfoSub::ref isrListener,
const hash_set<RippleAddress>& vnaAccountIDs,
std::uint32_t uLedgerIndex, bool rt)
void NetworkOPsImp::subAccount (
InfoSub::ref isrListener,
const hash_set<RippleAddress>& vnaAccountIDs, bool rt)
{
SubInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount;
// For the connection, monitor each account.
for (auto const& naAccountID : vnaAccountIDs)
{
m_journal.trace << "subAccount:"
" account: " << naAccountID.humanAccountID ();
isrListener->insertSubAccountInfo (naAccountID, uLedgerIndex);
isrListener->insertSubAccountInfo (naAccountID, rt);
}
ScopedLockType sl (mLock);
ScopedLockType sl (mSubLock);
for (auto const& naAccountID : vnaAccountIDs)
{
@@ -2841,21 +2847,29 @@ void NetworkOPsImp::subAccount (InfoSub::ref isrListener,
}
void NetworkOPsImp::unsubAccount (
InfoSub::ref isrListener,
hash_set<RippleAddress> const& vnaAccountIDs,
bool rt)
{
for (auto const& naAccountID : vnaAccountIDs)
{
// Remove from the InfoSub
isrListener->deleteSubAccountInfo(naAccountID, rt);
}
// Remove from the server
unsubAccountInternal (isrListener->getSeq(), vnaAccountIDs, rt);
}
void NetworkOPsImp::unsubAccountInternal (
std::uint64_t uSeq,
hash_set<RippleAddress> const& vnaAccountIDs,
bool rt)
{
ScopedLockType sl (mSubLock);
SubInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount;
// For the connection, unmonitor each account.
// FIXME: Don't we need to unsub?
// BOOST_FOREACH(RippleAddress const& naAccountID, vnaAccountIDs)
// {
// isrListener->deleteSubAccountInfo(naAccountID);
// }
ScopedLockType sl (mLock);
for (auto const& naAccountID : vnaAccountIDs)
{
auto simIterator = subMap.find (naAccountID.getAccountID ());
@@ -2945,14 +2959,14 @@ bool NetworkOPsImp::subLedger (InfoSub::ref isrListener, Json::Value& jvResult)
= getApp().getLedgerMaster ().getCompleteLedgers ();
}
ScopedLockType sl (mLock);
ScopedLockType sl (mSubLock);
return mSubLedger.emplace (isrListener->getSeq (), isrListener).second;
}
// <-- bool: true=erased, false=was not there
bool NetworkOPsImp::unsubLedger (std::uint64_t uSeq)
{
ScopedLockType sl (mLock);
ScopedLockType sl (mSubLock);
return mSubLedger.erase (uSeq);
}
@@ -2976,21 +2990,21 @@ bool NetworkOPsImp::subServer (InfoSub::ref isrListener, Json::Value& jvResult,
jvResult[jss::pubkey_node] = getApp ().getLocalCredentials ().
getNodePublic ().humanNodePublic ();
ScopedLockType sl (mLock);
ScopedLockType sl (mSubLock);
return mSubServer.emplace (isrListener->getSeq (), isrListener).second;
}
// <-- bool: true=erased, false=was not there
bool NetworkOPsImp::unsubServer (std::uint64_t uSeq)
{
ScopedLockType sl (mLock);
ScopedLockType sl (mSubLock);
return mSubServer.erase (uSeq);
}
// <-- bool: true=added, false=already there
bool NetworkOPsImp::subTransactions (InfoSub::ref isrListener)
{
ScopedLockType sl (mLock);
ScopedLockType sl (mSubLock);
return mSubTransactions.emplace (
isrListener->getSeq (), isrListener).second;
}
@@ -2998,14 +3012,14 @@ bool NetworkOPsImp::subTransactions (InfoSub::ref isrListener)
// <-- bool: true=erased, false=was not there
bool NetworkOPsImp::unsubTransactions (std::uint64_t uSeq)
{
ScopedLockType sl (mLock);
ScopedLockType sl (mSubLock);
return mSubTransactions.erase (uSeq);
}
// <-- bool: true=added, false=already there
bool NetworkOPsImp::subRTTransactions (InfoSub::ref isrListener)
{
ScopedLockType sl (mLock);
ScopedLockType sl (mSubLock);
return mSubRTTransactions.emplace (
isrListener->getSeq (), isrListener).second;
}
@@ -3013,13 +3027,13 @@ bool NetworkOPsImp::subRTTransactions (InfoSub::ref isrListener)
// <-- bool: true=erased, false=was not there
bool NetworkOPsImp::unsubRTTransactions (std::uint64_t uSeq)
{
ScopedLockType sl (mLock);
ScopedLockType sl (mSubLock);
return mSubRTTransactions.erase (uSeq);
}
InfoSub::pointer NetworkOPsImp::findRpcSub (std::string const& strUrl)
{
ScopedLockType sl (mLock);
ScopedLockType sl (mSubLock);
subRpcMapType::iterator it = mRpcSubMap.find (strUrl);
@@ -3032,7 +3046,7 @@ InfoSub::pointer NetworkOPsImp::findRpcSub (std::string const& strUrl)
InfoSub::pointer NetworkOPsImp::addRpcSub (
std::string const& strUrl, InfoSub::ref rspEntry)
{
ScopedLockType sl (mLock);
ScopedLockType sl (mSubLock);
mRpcSubMap.emplace (strUrl, rspEntry);

View File

@@ -61,14 +61,25 @@ public:
Source (char const* name, beast::Stoppable& parent);
public:
// VFALCO TODO Rename the 'rt' parameters to something meaningful.
// For some reason, these were originally called "rt"
// for "real time". They actually refer to whether
// you get transactions as they occur or once their
// results are confirmed
virtual void subAccount (ref ispListener,
const hash_set<RippleAddress>& vnaAccountIDs,
std::uint32_t uLedgerIndex, bool rt) = 0;
bool realTime) = 0;
virtual void unsubAccount (std::uint64_t uListener,
// for normal use, removes from InfoSub and server
virtual void unsubAccount (ref isplistener,
const hash_set<RippleAddress>& vnaAccountIDs,
bool rt) = 0;
bool realTime) = 0;
// for use during InfoSub destruction
// Removes only from the server
virtual void unsubAccountInternal (std::uint64_t uListener,
const hash_set<RippleAddress>& vnaAccountIDs,
bool realTime) = 0;
// VFALCO TODO Document the bool return value
virtual bool subLedger (ref ispListener, Json::Value& jvResult) = 0;
@@ -104,7 +115,7 @@ public:
virtual void send (Json::Value const& jvObj, bool broadcast) = 0;
// VFALCO NOTE Why is this virtual?
// virtual so that a derived class can optimize this case
virtual void send (
Json::Value const& jvObj, std::string const& sObj, bool broadcast);
@@ -112,7 +123,13 @@ public:
void onSendEmpty ();
void insertSubAccountInfo (RippleAddress addr, std::uint32_t uLedgerIndex);
void insertSubAccountInfo (
RippleAddress addr,
bool rt);
void deleteSubAccountInfo (
RippleAddress addr,
bool rt);
void clearPathRequest ();
@@ -128,7 +145,8 @@ protected:
private:
Consumer m_consumer;
Source& m_source;
hash_set <RippleAddress> mSubAccountInfo;
hash_set <RippleAddress> mSubAccountInfo_t; // real time subscriptions
hash_set <RippleAddress> mSubAccountInfo_f; // normal subscriptions
hash_set <RippleAddress> mSubAccountTransaction;
std::shared_ptr <PathRequest> mPathRequest;
std::uint64_t mSeq;

View File

@@ -57,8 +57,16 @@ InfoSub::~InfoSub ()
m_source.unsubRTTransactions (mSeq);
m_source.unsubLedger (mSeq);
m_source.unsubServer (mSeq);
m_source.unsubAccount (mSeq, mSubAccountInfo, true);
m_source.unsubAccount (mSeq, mSubAccountInfo, false);
// Use the internal unsubscribe so that it won't call
// back to us and modify its own parameter
if (! mSubAccountInfo_t.empty ())
m_source.unsubAccountInternal
(mSeq, mSubAccountInfo_t, true);
if (! mSubAccountInfo_t.empty ())
m_source.unsubAccountInternal
(mSeq, mSubAccountInfo_f, false);
}
Resource::Consumer& InfoSub::getConsumer()
@@ -81,12 +89,18 @@ void InfoSub::onSendEmpty ()
{
}
void InfoSub::insertSubAccountInfo (
RippleAddress addr, std::uint32_t uLedgerIndex)
void InfoSub::insertSubAccountInfo (RippleAddress addr, bool rt)
{
ScopedLockType sl (mLock);
mSubAccountInfo.insert (addr);
(rt ? mSubAccountInfo_t : mSubAccountInfo_f).insert (addr);
}
void InfoSub::deleteSubAccountInfo (RippleAddress addr, bool rt)
{
ScopedLockType sl (mLock);
(rt ? mSubAccountInfo_t : mSubAccountInfo_f).erase (addr);
}
void InfoSub::clearPathRequest ()

View File

@@ -28,10 +28,6 @@ Json::Value doSubscribe (RPC::Context& context)
{
InfoSub::pointer ispSub;
Json::Value jvResult (Json::objectValue);
std::uint32_t uLedgerIndex = context.params.isMember (jss::ledger_index)
&& context.params[jss::ledger_index].isNumeric ()
? context.params[jss::ledger_index].asUInt ()
: 0;
if (!context.infoSub && !context.params.isMember (jss::url))
{
@@ -42,10 +38,6 @@ Json::Value doSubscribe (RPC::Context& context)
return rpcError (rpcINVALID_PARAMS);
}
// FIXME:
// Subscriptions need to be protected by their own lock
auto lock = getApp().masterLock();
if (context.params.isMember (jss::url))
{
if (context.role != Role::ADMIN)
@@ -161,7 +153,7 @@ Json::Value doSubscribe (RPC::Context& context)
if (ids.empty ())
jvResult[jss::error] = "malformedAccount";
else
context.netOps.subAccount (ispSub, ids, uLedgerIndex, true);
context.netOps.subAccount (ispSub, ids, true);
}
if (!context.params.isMember (jss::accounts))
@@ -181,13 +173,12 @@ Json::Value doSubscribe (RPC::Context& context)
}
else
{
context.netOps.subAccount (ispSub, ids, uLedgerIndex, false);
context.netOps.subAccount (ispSub, ids, false);
WriteLog (lsDEBUG, RPCHandler)
<< "doSubscribe: accounts: " << ids.size ();
}
}
bool bHaveMasterLock = true;
if (!context.params.isMember (jss::books))
{
}
@@ -286,12 +277,6 @@ Json::Value doSubscribe (RPC::Context& context)
return rpcError (rpcBAD_MARKET);
}
if (!bHaveMasterLock)
{
lock->lock ();
bHaveMasterLock = true;
}
context.netOps.subBook (ispSub, book);
if (bBoth)
@@ -299,12 +284,6 @@ Json::Value doSubscribe (RPC::Context& context)
if (bSnapshot)
{
if (bHaveMasterLock)
{
lock->unlock ();
bHaveMasterLock = false;
}
context.loadType = Resource::feeMediumBurdenRPC;
auto lpLedger = getApp().getLedgerMaster ().
getPublishedLedger ();

View File

@@ -36,8 +36,6 @@ Json::Value doUnsubscribe (RPC::Context& context)
return rpcError (rpcINVALID_PARAMS);
}
auto lock = getApp().masterLock();
if (context.params.isMember (jss::url))
{
if (context.role != Role::ADMIN)
@@ -96,7 +94,7 @@ Json::Value doUnsubscribe (RPC::Context& context)
if (accounts.empty ())
jvResult[jss::error] = "malformedAccount";
else
context.netOps.unsubAccount (ispSub->getSeq (), accounts, true);
context.netOps.unsubAccount (ispSub, accounts, true);
}
if (context.params.isMember (jss::accounts))
@@ -106,7 +104,7 @@ Json::Value doUnsubscribe (RPC::Context& context)
if (accounts.empty ())
jvResult[jss::error] = "malformedAccount";
else
context.netOps.unsubAccount (ispSub->getSeq (), accounts, false);
context.netOps.unsubAccount (ispSub, accounts, false);
}
if (!context.params.isMember (jss::books))