add ancestry check

This commit is contained in:
CJ Cobb
2021-03-24 14:46:48 -04:00
parent c74f9654af
commit 14fd6d24ad
5 changed files with 23 additions and 13 deletions

View File

@@ -757,8 +757,9 @@ CassandraBackend::open()
continue;
query = {};
query << " update " << tablePrefix << "ledger_range"
<< " set sequence = ? where is_latest = ?";
query
<< " update " << tablePrefix << "ledger_range"
<< " set sequence = ? where is_latest = ? if sequence in (?,null)";
if (!updateLedgerRange_.prepareStatement(query, session_.get()))
continue;

View File

@@ -751,11 +751,13 @@ public:
CassandraStatement statement{updateLedgerRange_};
statement.bindInt(ledgerSequence_);
statement.bindBoolean(false);
statement.bindInt(ledgerSequence_);
executeSyncWrite(statement);
}
CassandraStatement statement{updateLedgerRange_};
statement.bindInt(ledgerSequence_);
statement.bindBoolean(true);
statement.bindInt(ledgerSequence_ - 1);
return executeSyncUpdate(statement);
}
void

View File

@@ -863,6 +863,8 @@ BEGIN
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER verify_ancestry BEFORE INSERT OR UPDATE on ledgers
FOR EACH ROW EXECUTE PROCEDURE insert_ancestry();
-- Trigger function prior to delete on ledgers table. Disallow gaps from
-- forming. Do not allow deletions if both the previous and next ledgers

View File

@@ -581,7 +581,7 @@ PostgresBackend::finishWrites() const
accountTxBuffer_.str("");
accountTxBuffer_.clear();
numRowsInObjectsBuffer_ = 0;
return true;
return !abortWrite_;
}
bool
PostgresBackend::doOnlineDelete(uint32_t minLedgerToKeep) const

View File

@@ -461,6 +461,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
if (success)
BOOST_LOG_TRIVIAL(info)
<< "Load phase of etl : "
<< "Successfully published ledger! Ledger info: "
@@ -469,12 +470,16 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
<< ". load time = " << duration
<< ". load txns per second = " << numTxns / duration
<< ". load objs per second = " << numObjects / duration;
else
BOOST_LOG_TRIVIAL(error)
<< "Error writing ledger. " << detail::toString(lgrInfo);
// success is false if the ledger was already written
if (success)
{
publishLedger(lgrInfo);
lastPublishedSequence = lgrInfo.seq;
}
writeConflict = !success;
auto range = flatMapBackend_->fetchLedgerRange();
if (onlineDeleteInterval_ && !deleting_ &&
range->maxSequence - range->minSequence >
@@ -626,7 +631,7 @@ ReportingETL::monitor()
<< " . Beginning ETL";
// doContinousETLPipelined returns the most recent sequence
// published empty optional if no sequence was published
std::optional<uint32_t> lastPublished = nextSequence;
std::optional<uint32_t> lastPublished =
runETLPipeline(nextSequence, extractorThreads_);
BOOST_LOG_TRIVIAL(info)
<< __func__ << " : "