diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 0c4ac057..a6f65e97 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -757,8 +757,9 @@ CassandraBackend::open() continue; query = {}; - query << " update " << tablePrefix << "ledger_range" - << " set sequence = ? where is_latest = ?"; + query + << " update " << tablePrefix << "ledger_range" + << " set sequence = ? where is_latest = ? if sequence in (?,null)"; if (!updateLedgerRange_.prepareStatement(query, session_.get())) continue; diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index 86c41384..5451bc13 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -751,11 +751,13 @@ public: CassandraStatement statement{updateLedgerRange_}; statement.bindInt(ledgerSequence_); statement.bindBoolean(false); + statement.bindInt(ledgerSequence_); executeSyncWrite(statement); } CassandraStatement statement{updateLedgerRange_}; statement.bindInt(ledgerSequence_); statement.bindBoolean(true); + statement.bindInt(ledgerSequence_ - 1); return executeSyncUpdate(statement); } void diff --git a/reporting/Pg.cpp b/reporting/Pg.cpp index 876c4ebc..1fbc6416 100644 --- a/reporting/Pg.cpp +++ b/reporting/Pg.cpp @@ -863,6 +863,8 @@ BEGIN RETURN NEW; END; $$ LANGUAGE plpgsql; +CREATE TRIGGER verify_ancestry BEFORE INSERT OR UPDATE on ledgers + FOR EACH ROW EXECUTE PROCEDURE insert_ancestry(); -- Trigger function prior to delete on ledgers table. Disallow gaps from -- forming. Do not allow deletions if both the previous and next ledgers diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index 245a9137..be1f77b8 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -581,7 +581,7 @@ PostgresBackend::finishWrites() const accountTxBuffer_.str(""); accountTxBuffer_.clear(); numRowsInObjectsBuffer_ = 0; - return true; + return !abortWrite_; } bool PostgresBackend::doOnlineDelete(uint32_t minLedgerToKeep) const diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 75c48e8e..709f5b48 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -461,20 +461,25 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) auto end = std::chrono::system_clock::now(); auto duration = ((end - start).count()) / 1000000000.0; - BOOST_LOG_TRIVIAL(info) - << "Load phase of etl : " - << "Successfully published ledger! Ledger info: " - << detail::toString(lgrInfo) << ". txn count = " << numTxns - << ". object count = " << numObjects - << ". load time = " << duration - << ". load txns per second = " << numTxns / duration - << ". load objs per second = " << numObjects / duration; + if (success) + BOOST_LOG_TRIVIAL(info) + << "Load phase of etl : " + << "Successfully published ledger! Ledger info: " + << detail::toString(lgrInfo) << ". txn count = " << numTxns + << ". object count = " << numObjects + << ". load time = " << duration + << ". load txns per second = " << numTxns / duration + << ". load objs per second = " << numObjects / duration; + else + BOOST_LOG_TRIVIAL(error) + << "Error writing ledger. " << detail::toString(lgrInfo); // success is false if the ledger was already written if (success) { publishLedger(lgrInfo); lastPublishedSequence = lgrInfo.seq; } + writeConflict = !success; auto range = flatMapBackend_->fetchLedgerRange(); if (onlineDeleteInterval_ && !deleting_ && range->maxSequence - range->minSequence > @@ -626,8 +631,8 @@ ReportingETL::monitor() << " . Beginning ETL"; // doContinousETLPipelined returns the most recent sequence // published empty optional if no sequence was published - std::optional lastPublished = nextSequence; - runETLPipeline(nextSequence, extractorThreads_); + std::optional lastPublished = + runETLPipeline(nextSequence, extractorThreads_); BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Aborting ETL. Falling back to publishing";