Implement extractor tests (#671)

This commit is contained in:
Alex Kremer
2023-06-07 12:33:46 +01:00
committed by GitHub
parent 244337c5b6
commit 5d2c079f1a
12 changed files with 349 additions and 49 deletions

View File

@@ -19,12 +19,11 @@
#pragma once
#include <etl/SystemState.h>
#include <log/Logger.h>
#include <util/Profiler.h>
#include <ripple/beast/core/CurrentThreadName.h>
#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
#include <grpcpp/grpcpp.h>
#include <chrono>
#include <mutex>
@@ -89,11 +88,7 @@ private:
double totalTime = 0.0;
auto currentSequence = startSequence_;
// Two stopping conditions:
// - if there is a write conflict in the load thread, the ETL mechanism should stop.
// - if the entire server is shutting down - this can be detected in a variety of ways.
while ((not finishSequence_ || currentSequence <= *finishSequence_) && not state_.get().writeConflict &&
not isStopping() && networkValidatedLedgers_->waitUntilValidatedByNetwork(currentSequence))
while (!shouldFinish(currentSequence) && networkValidatedLedgers_->waitUntilValidatedByNetwork(currentSequence))
{
auto [fetchResponse, time] = util::timed<std::chrono::duration<double>>(
[this, currentSequence]() { return ledgerFetcher_.get().fetchDataAndDiff(currentSequence); });
@@ -101,11 +96,12 @@ private:
// if the fetch is unsuccessful, stop. fetchLedger only returns false if the server is shutting down, or if
// the ledger was found in the database (which means another process already wrote the ledger that this
// process was trying to extract; this is a form of a write conflict). Otherwise, fetchLedgerDataAndDiff
// will keep trying to fetch the specified ledger until successful.
// process was trying to extract; this is a form of a write conflict).
// Otherwise, fetchDataAndDiff will keep trying to fetch the specified ledger until successful.
if (!fetchResponse)
break;
// TODO: extract this part into a strategy perhaps
auto const tps = fetchResponse->transactions_list().transactions_size() / time;
log_.info() << "Extract phase time = " << time << "; Extract phase tps = " << tps
<< "; Avg extract time = " << totalTime / (currentSequence - startSequence_ + 1)
@@ -113,9 +109,6 @@ private:
pipe_.get().push(currentSequence, std::move(fetchResponse));
currentSequence += pipe_.get().getStride();
if (finishSequence_ && currentSequence > *finishSequence_)
break;
}
pipe_.get().finish(startSequence_);
@@ -126,6 +119,22 @@ private:
{
return state_.get().isStopping;
}
bool
hasWriteConflict() const
{
return state_.get().writeConflict;
}
bool
shouldFinish(uint32_t seq) const
{
// Stopping conditions:
// - if there is a write conflict in the load thread, the ETL mechanism should stop.
// - if the entire server is shutting down - this can be detected in a variety of ways.
// - when the given sequence is past the finishSequence in case one is specified
return hasWriteConflict() || isStopping() || (finishSequence_ && seq > *finishSequence_);
}
};
} // namespace clio::detail