Add validations subscriptions (RIPD-504)

This commit is contained in:
wilsonianb
2015-07-09 09:49:05 -07:00
committed by Nik Bougalis
parent 7c2478480d
commit 5bad2db667
8 changed files with 82 additions and 1 deletions

View File

@@ -420,7 +420,7 @@ ledger's sequence number, it attempts to publish those ledgers, retrieving
them if needed. them if needed.
If there are no new ledgers to publish, `doAdvance` determines if it can If there are no new ledgers to publish, `doAdvance` determines if it can
backfill history. If the publication is not caught up, bakfilling is not backfill history. If the publication is not caught up, backfilling is not
attempted to conserve resources. attempted to conserve resources.
If history can be backfilled, the missing ledger with the highest If history can be backfilled, the missing ledger with the highest

View File

@@ -383,6 +383,9 @@ public:
bool subRTTransactions (InfoSub::ref ispListener) override; bool subRTTransactions (InfoSub::ref ispListener) override;
bool unsubRTTransactions (std::uint64_t uListener) override; bool unsubRTTransactions (std::uint64_t uListener) override;
bool subValidations (InfoSub::ref ispListener) override;
bool unsubValidations (std::uint64_t uListener) override;
InfoSub::pointer findRpcSub (std::string const& strUrl) override; InfoSub::pointer findRpcSub (std::string const& strUrl) override;
InfoSub::pointer addRpcSub ( InfoSub::pointer addRpcSub (
std::string const& strUrl, InfoSub::ref) override; std::string const& strUrl, InfoSub::ref) override;
@@ -421,6 +424,7 @@ private:
bool isAccepted); bool isAccepted);
void pubServer (); void pubServer ();
void pubValidation (STValidation::ref val);
std::string getHostId (bool forAdmin); std::string getHostId (bool forAdmin);
@@ -466,6 +470,7 @@ private:
SubMapType mSubServer; // When server changes connectivity state. SubMapType mSubServer; // When server changes connectivity state.
SubMapType mSubTransactions; // All accepted transactions. SubMapType mSubTransactions; // All accepted transactions.
SubMapType mSubRTTransactions; // All proposed and accepted transactions. SubMapType mSubRTTransactions; // All proposed and accepted transactions.
SubMapType mSubValidations; // Received validations.
std::uint32_t mLastLoadBase; std::uint32_t mLastLoadBase;
std::uint32_t mLastLoadFactor; std::uint32_t mLastLoadFactor;
@@ -1595,6 +1600,39 @@ void NetworkOPsImp::pubServer ()
} }
} }
void NetworkOPsImp::pubValidation (STValidation::ref val)
{
// VFALCO consider std::shared_mutex
ScopedLockType sl (mSubLock);
if (!mSubValidations.empty ())
{
Json::Value jvObj (Json::objectValue);
jvObj [jss::type] = "validationReceived";
jvObj [jss::validation_public_key] = val->getSignerPublic ().humanNodePublic ();
jvObj [jss::ledger_hash] = to_string (val->getLedgerHash ());
jvObj [jss::signature] = strHex (val->getSignature ());
for (auto i = mSubValidations.begin (); i != mSubValidations.end (); )
{
InfoSub::pointer p = i->second.lock ();
if (p)
{
p->send (jvObj, true);
++i;
}
else
{
i = mSubValidations.erase (i);
}
}
}
}
void NetworkOPsImp::setMode (OperatingMode om) void NetworkOPsImp::setMode (OperatingMode om)
{ {
if (om == omCONNECTED) if (om == omCONNECTED)
@@ -1871,6 +1909,7 @@ bool NetworkOPsImp::recvValidation (
{ {
m_journal.debug << "recvValidation " << val->getLedgerHash () m_journal.debug << "recvValidation " << val->getLedgerHash ()
<< " from " << source; << " from " << source;
pubValidation (val);
return getApp().getValidations ().addValidation (val, source); return getApp().getValidations ().addValidation (val, source);
} }
@@ -2557,6 +2596,20 @@ bool NetworkOPsImp::unsubRTTransactions (std::uint64_t uSeq)
return mSubRTTransactions.erase (uSeq); return mSubRTTransactions.erase (uSeq);
} }
// <-- bool: true=added, false=already there
bool NetworkOPsImp::subValidations (InfoSub::ref isrListener)
{
ScopedLockType sl (mSubLock);
return mSubValidations.emplace (isrListener->getSeq (), isrListener).second;
}
// <-- bool: true=erased, false=was not there
bool NetworkOPsImp::unsubValidations (std::uint64_t uSeq)
{
ScopedLockType sl (mSubLock);
return mSubValidations.erase (uSeq);
}
InfoSub::pointer NetworkOPsImp::findRpcSub (std::string const& strUrl) InfoSub::pointer NetworkOPsImp::findRpcSub (std::string const& strUrl)
{ {
ScopedLockType sl (mSubLock); ScopedLockType sl (mSubLock);

View File

@@ -98,6 +98,9 @@ public:
virtual bool subRTTransactions (ref ispListener) = 0; virtual bool subRTTransactions (ref ispListener) = 0;
virtual bool unsubRTTransactions (std::uint64_t uListener) = 0; virtual bool unsubRTTransactions (std::uint64_t uListener) = 0;
virtual bool subValidations (ref ispListener) = 0;
virtual bool unsubValidations (std::uint64_t uListener) = 0;
// VFALCO TODO Remove // VFALCO TODO Remove
// This was added for one particular partner, it // This was added for one particular partner, it
// "pushes" subscription data to a particular URL. // "pushes" subscription data to a particular URL.

View File

@@ -57,6 +57,7 @@ InfoSub::~InfoSub ()
m_source.unsubRTTransactions (mSeq); m_source.unsubRTTransactions (mSeq);
m_source.unsubLedger (mSeq); m_source.unsubLedger (mSeq);
m_source.unsubServer (mSeq); m_source.unsubServer (mSeq);
m_source.unsubValidations (mSeq);
// Use the internal unsubscribe so that it won't call // Use the internal unsubscribe so that it won't call
// back to us and modify its own parameter // back to us and modify its own parameter

View File

@@ -314,6 +314,7 @@ JSS ( seqNum ); // out: LedgerToJson
JSS ( server_state ); // out: NetworkOPs JSS ( server_state ); // out: NetworkOPs
JSS ( server_status ); // out: NetworkOPs JSS ( server_status ); // out: NetworkOPs
JSS ( severity ); // in: LogLevel JSS ( severity ); // in: LogLevel
JSS ( signature ); // out: NetworkOPs
JSS ( snapshot ); // in: Subscribe JSS ( snapshot ); // in: Subscribe
JSS ( source_account ); // in: PathRequest, RipplePathFind JSS ( source_account ); // in: PathRequest, RipplePathFind
JSS ( source_amount ); // in: PathRequest, RipplePathFind JSS ( source_amount ); // in: PathRequest, RipplePathFind

View File

@@ -123,6 +123,10 @@ Json::Value doSubscribe (RPC::Context& context)
{ {
context.netOps.subRTTransactions (ispSub); context.netOps.subRTTransactions (ispSub);
} }
else if (streamName == "validations")
{
context.netOps.subValidations (ispSub);
}
else else
{ {
jvResult[jss::error] = "unknownStream"; jvResult[jss::error] = "unknownStream";

View File

@@ -73,6 +73,9 @@ Json::Value doUnsubscribe (RPC::Context& context)
|| streamName == "rt_transactions") // DEPRECATED || streamName == "rt_transactions") // DEPRECATED
context.netOps.unsubRTTransactions (ispSub->getSeq ()); context.netOps.unsubRTTransactions (ispSub->getSeq ());
else if (streamName == "validations")
context.netOps.unsubValidations (ispSub->getSeq ());
else else
jvResult[jss::error] = "Unknown stream: " + streamName; jvResult[jss::error] = "Unknown stream: " + streamName;
} }

View File

@@ -169,4 +169,20 @@ suite('JSON-RPC', function() {
done(); done();
}); });
}); });
test('subscribe validations', function(done) {
var rippled_config = testutils.get_server_config(config);
var client = jsonrpc.client("http://" + rippled_config.rpc_ip + ":" + rippled_config.rpc_port);
var http_config = config.http_servers["zed"];
client.call('subscribe', [{
'url' : "http://" + http_config.ip + ":" + http_config.port,
'streams' : [ 'validations' ],
}], function (result) {
// console.log(JSON.stringify(result, undefined, 2));
assert(typeof result === 'object');
assert(result.status === 'success');
done();
});
});
}); });