diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index ab9682235..fc7a4778a 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -449,6 +449,10 @@ public: void pubValidation(std::shared_ptr const& val) override; + void + forwardValidation(Json::Value const& jvObj) override; + void + forwardManifest(Json::Value const& jvObj) override; void forwardProposedTransaction(Json::Value const& jvObj) override; void @@ -2588,6 +2592,46 @@ NetworkOPsImp::forwardProposedTransaction(Json::Value const& jvObj) forwardProposedAccountTransaction(jvObj); } +void +NetworkOPsImp::forwardValidation(Json::Value const& jvObj) +{ + std::lock_guard sl(mSubLock); + + for (auto i = mStreamMaps[sValidations].begin(); + i != mStreamMaps[sValidations].end();) + { + if (auto p = i->second.lock()) + { + p->send(jvObj, true); + ++i; + } + else + { + i = mStreamMaps[sValidations].erase(i); + } + } +} + +void +NetworkOPsImp::forwardManifest(Json::Value const& jvObj) +{ + std::lock_guard sl(mSubLock); + + for (auto i = mStreamMaps[sManifests].begin(); + i != mStreamMaps[sManifests].end();) + { + if (auto p = i->second.lock()) + { + p->send(jvObj, true); + ++i; + } + else + { + i = mStreamMaps[sManifests].erase(i); + } + } +} + static void getAccounts(Json::Value const& jvObj, std::vector& accounts) { diff --git a/src/ripple/app/misc/NetworkOPs.h b/src/ripple/app/misc/NetworkOPs.h index 444da2cc8..5e00c4cc6 100644 --- a/src/ripple/app/misc/NetworkOPs.h +++ b/src/ripple/app/misc/NetworkOPs.h @@ -261,6 +261,10 @@ public: virtual void pubValidation(std::shared_ptr const& val) = 0; + virtual void + forwardValidation(Json::Value const& jvObj) = 0; + virtual void + forwardManifest(Json::Value const& jvObj) = 0; virtual void forwardProposedTransaction(Json::Value const& jvObj) = 0; virtual void diff --git a/src/ripple/app/reporting/ETLSource.cpp b/src/ripple/app/reporting/ETLSource.cpp index 09eef5a60..5c6d1d07b 100644 --- a/src/ripple/app/reporting/ETLSource.cpp +++ b/src/ripple/app/reporting/ETLSource.cpp @@ -261,6 +261,10 @@ ETLSource::onHandshake(boost::beast::error_code ec) jv["streams"].append(ledgerStream); Json::Value txnStream("transactions_proposed"); jv["streams"].append(txnStream); + Json::Value validationStream("validations"); + jv["streams"].append(validationStream); + Json::Value manifestStream("manifests"); + jv["streams"].append(manifestStream); Json::FastWriter fastWriter; JLOG(journal_.trace()) << "Sending subscribe stream message"; @@ -352,15 +356,28 @@ ETLSource::handleMessage() } else { - if (response.isMember(jss::transaction)) + if (etl_.getETLLoadBalancer().shouldPropagateStream(this)) { - if (etl_.getETLLoadBalancer().shouldPropagateTxnStream(this)) + if (response.isMember(jss::transaction)) { etl_.getApplication().getOPs().forwardProposedTransaction( response); } + else if ( + response.isMember("type") && + response["type"] == "validationReceived") + { + etl_.getApplication().getOPs().forwardValidation(response); + } + else if ( + response.isMember("type") && + response["type"] == "manifestReceived") + { + etl_.getApplication().getOPs().forwardManifest(response); + } } - else + + if (response.isMember("type") && response["type"] == "ledgerClosed") { JLOG(journal_.debug()) << __func__ << " : " diff --git a/src/ripple/app/reporting/ETLSource.h b/src/ripple/app/reporting/ETLSource.h index b4bb5ccf9..66cb61bcc 100644 --- a/src/ripple/app/reporting/ETLSource.h +++ b/src/ripple/app/reporting/ETLSource.h @@ -368,13 +368,13 @@ public: /// Determine whether messages received on the transactions_proposed stream /// should be forwarded to subscribing clients. The server subscribes to - /// transactions_proposed on multiple ETLSources, yet only forwards messages - /// from one source at any given time (to avoid sending duplicate messages - /// to clients). + /// transactions_proposed, validations, and manifests on multiple + /// ETLSources, yet only forwards messages from one source at any given time + /// (to avoid sending duplicate messages to clients). /// @param in ETLSource in question /// @return true if messages should be forwarded bool - shouldPropagateTxnStream(ETLSource* in) const + shouldPropagateStream(ETLSource* in) const { for (auto& src : sources_) { diff --git a/src/ripple/nodestore/backend/CassandraFactory.cpp b/src/ripple/nodestore/backend/CassandraFactory.cpp index 26850e7a6..10282d94b 100644 --- a/src/ripple/nodestore/backend/CassandraFactory.cpp +++ b/src/ripple/nodestore/backend/CassandraFactory.cpp @@ -370,7 +370,7 @@ public: continue; } - query = {}; + query.str(""); query << "SELECT * FROM " << tableName << " LIMIT 1"; statement = makeStatement(query.str().c_str(), 0); fut = cass_session_execute(session_.get(), statement); @@ -433,7 +433,7 @@ public: */ cass_future_free(prepare_future); - query = {}; + query.str(""); query << "SELECT object FROM " << tableName << " WHERE hash = ?"; prepare_future = cass_session_prepare(session_.get(), query.str().c_str()); diff --git a/src/ripple/rpc/handlers/Subscribe.cpp b/src/ripple/rpc/handlers/Subscribe.cpp index c11fa7e4d..3cbea6ab6 100644 --- a/src/ripple/rpc/handlers/Subscribe.cpp +++ b/src/ripple/rpc/handlers/Subscribe.cpp @@ -138,8 +138,6 @@ doSubscribe(RPC::JsonContext& context) } else if (streamName == "manifests") { - if (context.app.config().reporting()) - return rpcError(rpcREPORTING_UNSUPPORTED); context.netOps.subManifests(ispSub); } else if (streamName == "transactions") @@ -154,8 +152,6 @@ doSubscribe(RPC::JsonContext& context) } else if (streamName == "validations") { - if (context.app.config().reporting()) - return rpcError(rpcREPORTING_UNSUPPORTED); context.netOps.subValidations(ispSub); } else if (streamName == "peer_status")