From 5bad2db667d8953e97d4ea49da05c708c025fb2e Mon Sep 17 00:00:00 2001 From: wilsonianb Date: Thu, 9 Jul 2015 09:49:05 -0700 Subject: [PATCH] Add validations subscriptions (RIPD-504) --- src/ripple/app/ledger/README.md | 2 +- src/ripple/app/misc/NetworkOPs.cpp | 53 +++++++++++++++++++++++++ src/ripple/net/InfoSub.h | 3 ++ src/ripple/net/impl/InfoSub.cpp | 1 + src/ripple/protocol/JsonFields.h | 1 + src/ripple/rpc/handlers/Subscribe.cpp | 4 ++ src/ripple/rpc/handlers/Unsubscribe.cpp | 3 ++ test/jsonrpc-test.js | 16 ++++++++ 8 files changed, 82 insertions(+), 1 deletion(-) diff --git a/src/ripple/app/ledger/README.md b/src/ripple/app/ledger/README.md index b132688bd..d57e7044e 100644 --- a/src/ripple/app/ledger/README.md +++ b/src/ripple/app/ledger/README.md @@ -420,7 +420,7 @@ ledger's sequence number, it attempts to publish those ledgers, retrieving them if needed. If there are no new ledgers to publish, `doAdvance` determines if it can -backfill history. If the publication is not caught up, bakfilling is not +backfill history. If the publication is not caught up, backfilling is not attempted to conserve resources. If history can be backfilled, the missing ledger with the highest diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 2e9777f1a..cb23ebe62 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -383,6 +383,9 @@ public: bool subRTTransactions (InfoSub::ref ispListener) override; bool unsubRTTransactions (std::uint64_t uListener) override; + bool subValidations (InfoSub::ref ispListener) override; + bool unsubValidations (std::uint64_t uListener) override; + InfoSub::pointer findRpcSub (std::string const& strUrl) override; InfoSub::pointer addRpcSub ( std::string const& strUrl, InfoSub::ref) override; @@ -421,6 +424,7 @@ private: bool isAccepted); void pubServer (); + void pubValidation (STValidation::ref val); std::string getHostId (bool forAdmin); @@ -466,6 +470,7 @@ private: SubMapType mSubServer; // When server changes connectivity state. SubMapType mSubTransactions; // All accepted transactions. SubMapType mSubRTTransactions; // All proposed and accepted transactions. + SubMapType mSubValidations; // Received validations. std::uint32_t mLastLoadBase; std::uint32_t mLastLoadFactor; @@ -1595,6 +1600,39 @@ void NetworkOPsImp::pubServer () } } + +void NetworkOPsImp::pubValidation (STValidation::ref val) +{ + // VFALCO consider std::shared_mutex + ScopedLockType sl (mSubLock); + + if (!mSubValidations.empty ()) + { + Json::Value jvObj (Json::objectValue); + + jvObj [jss::type] = "validationReceived"; + jvObj [jss::validation_public_key] = val->getSignerPublic ().humanNodePublic (); + jvObj [jss::ledger_hash] = to_string (val->getLedgerHash ()); + jvObj [jss::signature] = strHex (val->getSignature ()); + + for (auto i = mSubValidations.begin (); i != mSubValidations.end (); ) + { + InfoSub::pointer p = i->second.lock (); + + if (p) + { + p->send (jvObj, true); + ++i; + } + else + { + i = mSubValidations.erase (i); + } + } + } +} + + void NetworkOPsImp::setMode (OperatingMode om) { if (om == omCONNECTED) @@ -1871,6 +1909,7 @@ bool NetworkOPsImp::recvValidation ( { m_journal.debug << "recvValidation " << val->getLedgerHash () << " from " << source; + pubValidation (val); return getApp().getValidations ().addValidation (val, source); } @@ -2557,6 +2596,20 @@ bool NetworkOPsImp::unsubRTTransactions (std::uint64_t uSeq) return mSubRTTransactions.erase (uSeq); } +// <-- bool: true=added, false=already there +bool NetworkOPsImp::subValidations (InfoSub::ref isrListener) +{ + ScopedLockType sl (mSubLock); + return mSubValidations.emplace (isrListener->getSeq (), isrListener).second; +} + +// <-- bool: true=erased, false=was not there +bool NetworkOPsImp::unsubValidations (std::uint64_t uSeq) +{ + ScopedLockType sl (mSubLock); + return mSubValidations.erase (uSeq); +} + InfoSub::pointer NetworkOPsImp::findRpcSub (std::string const& strUrl) { ScopedLockType sl (mSubLock); diff --git a/src/ripple/net/InfoSub.h b/src/ripple/net/InfoSub.h index cee6c389c..8124d271f 100644 --- a/src/ripple/net/InfoSub.h +++ b/src/ripple/net/InfoSub.h @@ -98,6 +98,9 @@ public: virtual bool subRTTransactions (ref ispListener) = 0; virtual bool unsubRTTransactions (std::uint64_t uListener) = 0; + virtual bool subValidations (ref ispListener) = 0; + virtual bool unsubValidations (std::uint64_t uListener) = 0; + // VFALCO TODO Remove // This was added for one particular partner, it // "pushes" subscription data to a particular URL. diff --git a/src/ripple/net/impl/InfoSub.cpp b/src/ripple/net/impl/InfoSub.cpp index a92a9119f..e787c7b4c 100644 --- a/src/ripple/net/impl/InfoSub.cpp +++ b/src/ripple/net/impl/InfoSub.cpp @@ -57,6 +57,7 @@ InfoSub::~InfoSub () m_source.unsubRTTransactions (mSeq); m_source.unsubLedger (mSeq); m_source.unsubServer (mSeq); + m_source.unsubValidations (mSeq); // Use the internal unsubscribe so that it won't call // back to us and modify its own parameter diff --git a/src/ripple/protocol/JsonFields.h b/src/ripple/protocol/JsonFields.h index ce3427fd2..8c69cc762 100644 --- a/src/ripple/protocol/JsonFields.h +++ b/src/ripple/protocol/JsonFields.h @@ -314,6 +314,7 @@ JSS ( seqNum ); // out: LedgerToJson JSS ( server_state ); // out: NetworkOPs JSS ( server_status ); // out: NetworkOPs JSS ( severity ); // in: LogLevel +JSS ( signature ); // out: NetworkOPs JSS ( snapshot ); // in: Subscribe JSS ( source_account ); // in: PathRequest, RipplePathFind JSS ( source_amount ); // in: PathRequest, RipplePathFind diff --git a/src/ripple/rpc/handlers/Subscribe.cpp b/src/ripple/rpc/handlers/Subscribe.cpp index 52587abfb..d16d244c2 100644 --- a/src/ripple/rpc/handlers/Subscribe.cpp +++ b/src/ripple/rpc/handlers/Subscribe.cpp @@ -123,6 +123,10 @@ Json::Value doSubscribe (RPC::Context& context) { context.netOps.subRTTransactions (ispSub); } + else if (streamName == "validations") + { + context.netOps.subValidations (ispSub); + } else { jvResult[jss::error] = "unknownStream"; diff --git a/src/ripple/rpc/handlers/Unsubscribe.cpp b/src/ripple/rpc/handlers/Unsubscribe.cpp index ec74afb8c..858c6acf9 100644 --- a/src/ripple/rpc/handlers/Unsubscribe.cpp +++ b/src/ripple/rpc/handlers/Unsubscribe.cpp @@ -73,6 +73,9 @@ Json::Value doUnsubscribe (RPC::Context& context) || streamName == "rt_transactions") // DEPRECATED context.netOps.unsubRTTransactions (ispSub->getSeq ()); + else if (streamName == "validations") + context.netOps.unsubValidations (ispSub->getSeq ()); + else jvResult[jss::error] = "Unknown stream: " + streamName; } diff --git a/test/jsonrpc-test.js b/test/jsonrpc-test.js index bb9210b95..4d7d40eb4 100644 --- a/test/jsonrpc-test.js +++ b/test/jsonrpc-test.js @@ -169,4 +169,20 @@ suite('JSON-RPC', function() { done(); }); }); + + test('subscribe validations', function(done) { + var rippled_config = testutils.get_server_config(config); + var client = jsonrpc.client("http://" + rippled_config.rpc_ip + ":" + rippled_config.rpc_port); + var http_config = config.http_servers["zed"]; + + client.call('subscribe', [{ + 'url' : "http://" + http_config.ip + ":" + http_config.port, + 'streams' : [ 'validations' ], + }], function (result) { + // console.log(JSON.stringify(result, undefined, 2)); + assert(typeof result === 'object'); + assert(result.status === 'success'); + done(); + }); + }); });