Forward validations and manifests in reporting mode

This commit is contained in:
Nathan Nichols
2021-08-18 18:55:20 -05:00
committed by Nik Bougalis
parent bdfafa0b58
commit d9d001dffd
6 changed files with 74 additions and 13 deletions

View File

@@ -449,6 +449,10 @@ public:
void void
pubValidation(std::shared_ptr<STValidation> const& val) override; pubValidation(std::shared_ptr<STValidation> const& val) override;
void
forwardValidation(Json::Value const& jvObj) override;
void
forwardManifest(Json::Value const& jvObj) override;
void void
forwardProposedTransaction(Json::Value const& jvObj) override; forwardProposedTransaction(Json::Value const& jvObj) override;
void void
@@ -2588,6 +2592,46 @@ NetworkOPsImp::forwardProposedTransaction(Json::Value const& jvObj)
forwardProposedAccountTransaction(jvObj); forwardProposedAccountTransaction(jvObj);
} }
void
NetworkOPsImp::forwardValidation(Json::Value const& jvObj)
{
std::lock_guard sl(mSubLock);
for (auto i = mStreamMaps[sValidations].begin();
i != mStreamMaps[sValidations].end();)
{
if (auto p = i->second.lock())
{
p->send(jvObj, true);
++i;
}
else
{
i = mStreamMaps[sValidations].erase(i);
}
}
}
void
NetworkOPsImp::forwardManifest(Json::Value const& jvObj)
{
std::lock_guard sl(mSubLock);
for (auto i = mStreamMaps[sManifests].begin();
i != mStreamMaps[sManifests].end();)
{
if (auto p = i->second.lock())
{
p->send(jvObj, true);
++i;
}
else
{
i = mStreamMaps[sManifests].erase(i);
}
}
}
static void static void
getAccounts(Json::Value const& jvObj, std::vector<AccountID>& accounts) getAccounts(Json::Value const& jvObj, std::vector<AccountID>& accounts)
{ {

View File

@@ -261,6 +261,10 @@ public:
virtual void virtual void
pubValidation(std::shared_ptr<STValidation> const& val) = 0; pubValidation(std::shared_ptr<STValidation> const& val) = 0;
virtual void
forwardValidation(Json::Value const& jvObj) = 0;
virtual void
forwardManifest(Json::Value const& jvObj) = 0;
virtual void virtual void
forwardProposedTransaction(Json::Value const& jvObj) = 0; forwardProposedTransaction(Json::Value const& jvObj) = 0;
virtual void virtual void

View File

@@ -261,6 +261,10 @@ ETLSource::onHandshake(boost::beast::error_code ec)
jv["streams"].append(ledgerStream); jv["streams"].append(ledgerStream);
Json::Value txnStream("transactions_proposed"); Json::Value txnStream("transactions_proposed");
jv["streams"].append(txnStream); jv["streams"].append(txnStream);
Json::Value validationStream("validations");
jv["streams"].append(validationStream);
Json::Value manifestStream("manifests");
jv["streams"].append(manifestStream);
Json::FastWriter fastWriter; Json::FastWriter fastWriter;
JLOG(journal_.trace()) << "Sending subscribe stream message"; JLOG(journal_.trace()) << "Sending subscribe stream message";
@@ -352,15 +356,28 @@ ETLSource::handleMessage()
} }
else else
{ {
if (response.isMember(jss::transaction)) if (etl_.getETLLoadBalancer().shouldPropagateStream(this))
{ {
if (etl_.getETLLoadBalancer().shouldPropagateTxnStream(this)) if (response.isMember(jss::transaction))
{ {
etl_.getApplication().getOPs().forwardProposedTransaction( etl_.getApplication().getOPs().forwardProposedTransaction(
response); response);
} }
else if (
response.isMember("type") &&
response["type"] == "validationReceived")
{
etl_.getApplication().getOPs().forwardValidation(response);
}
else if (
response.isMember("type") &&
response["type"] == "manifestReceived")
{
etl_.getApplication().getOPs().forwardManifest(response);
}
} }
else
if (response.isMember("type") && response["type"] == "ledgerClosed")
{ {
JLOG(journal_.debug()) JLOG(journal_.debug())
<< __func__ << " : " << __func__ << " : "

View File

@@ -368,13 +368,13 @@ public:
/// Determine whether messages received on the transactions_proposed stream /// Determine whether messages received on the transactions_proposed stream
/// should be forwarded to subscribing clients. The server subscribes to /// should be forwarded to subscribing clients. The server subscribes to
/// transactions_proposed on multiple ETLSources, yet only forwards messages /// transactions_proposed, validations, and manifests on multiple
/// from one source at any given time (to avoid sending duplicate messages /// ETLSources, yet only forwards messages from one source at any given time
/// to clients). /// (to avoid sending duplicate messages to clients).
/// @param in ETLSource in question /// @param in ETLSource in question
/// @return true if messages should be forwarded /// @return true if messages should be forwarded
bool bool
shouldPropagateTxnStream(ETLSource* in) const shouldPropagateStream(ETLSource* in) const
{ {
for (auto& src : sources_) for (auto& src : sources_)
{ {

View File

@@ -370,7 +370,7 @@ public:
continue; continue;
} }
query = {}; query.str("");
query << "SELECT * FROM " << tableName << " LIMIT 1"; query << "SELECT * FROM " << tableName << " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0); statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement); fut = cass_session_execute(session_.get(), statement);
@@ -433,7 +433,7 @@ public:
*/ */
cass_future_free(prepare_future); cass_future_free(prepare_future);
query = {}; query.str("");
query << "SELECT object FROM " << tableName << " WHERE hash = ?"; query << "SELECT object FROM " << tableName << " WHERE hash = ?";
prepare_future = prepare_future =
cass_session_prepare(session_.get(), query.str().c_str()); cass_session_prepare(session_.get(), query.str().c_str());

View File

@@ -138,8 +138,6 @@ doSubscribe(RPC::JsonContext& context)
} }
else if (streamName == "manifests") else if (streamName == "manifests")
{ {
if (context.app.config().reporting())
return rpcError(rpcREPORTING_UNSUPPORTED);
context.netOps.subManifests(ispSub); context.netOps.subManifests(ispSub);
} }
else if (streamName == "transactions") else if (streamName == "transactions")
@@ -154,8 +152,6 @@ doSubscribe(RPC::JsonContext& context)
} }
else if (streamName == "validations") else if (streamName == "validations")
{ {
if (context.app.config().reporting())
return rpcError(rpcREPORTING_UNSUPPORTED);
context.netOps.subValidations(ispSub); context.netOps.subValidations(ispSub);
} }
else if (streamName == "peer_status") else if (streamName == "peer_status")