etl pipeline

This commit is contained in:
CJ Cobb
2021-03-24 04:29:54 -04:00
parent c69fb9672e
commit a7a71ec0c7
3 changed files with 11 additions and 14 deletions

View File

@@ -1515,13 +1515,6 @@ public:
throw std::runtime_error("decrementing num outstanding below 0"); throw std::runtime_error("decrementing num outstanding below 0");
} }
size_t cur = (--numRequestsOutstanding_); size_t cur = (--numRequestsOutstanding_);
// sanity check
if (!canAddRequest())
{
assert(false);
throw std::runtime_error(
"decremented num outstanding but can't add more");
}
{ {
// mutex lock required to prevent race condition around spurious // mutex lock required to prevent race condition around spurious
// wakeup // wakeup

View File

@@ -317,7 +317,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
// Database must be populated when this starts // Database must be populated when this starts
std::optional<uint32_t> std::optional<uint32_t>
ReportingETL::runETLPipeline(uint32_t startSequence) ReportingETL::runETLPipeline(uint32_t startSequence, int offset)
{ {
/* /*
* Behold, mortals! This function spawns three separate threads, which talk * Behold, mortals! This function spawns three separate threads, which talk
@@ -363,9 +363,10 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
std::thread extracter{[this, std::thread extracter{[this,
&startSequence, &startSequence,
&writeConflict, &writeConflict,
&transformQueue]() { &transformQueue,
&offset]() {
beast::setCurrentThreadName("rippled: ReportingETL extract"); beast::setCurrentThreadName("rippled: ReportingETL extract");
uint32_t currentSequence = startSequence; uint32_t currentSequence = startSequence + offset;
// there are two stopping conditions here. // there are two stopping conditions here.
// First, if there is a write conflict in the load thread, the ETL // First, if there is a write conflict in the load thread, the ETL
@@ -401,7 +402,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
} }
transformQueue.push(std::move(fetchResponse)); transformQueue.push(std::move(fetchResponse));
++currentSequence; currentSequence += offset;
} }
// empty optional tells the transformer to shut down // empty optional tells the transformer to shut down
transformQueue.push({}); transformQueue.push({});
@@ -598,8 +599,11 @@ ReportingETL::monitor()
<< " . Beginning ETL"; << " . Beginning ETL";
// doContinousETLPipelined returns the most recent sequence // doContinousETLPipelined returns the most recent sequence
// published empty optional if no sequence was published // published empty optional if no sequence was published
std::optional<uint32_t> lastPublished = std::optional<uint32_t> lastPublished = nextSequence;
runETLPipeline(nextSequence); for (size_t i = 0; i < 10; ++i)
{
runETLPipeline(nextSequence, i);
}
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< __func__ << " : " << __func__ << " : "
<< "Aborting ETL. Falling back to publishing"; << "Aborting ETL. Falling back to publishing";

View File

@@ -166,7 +166,7 @@ private:
/// @param startSequence the first ledger to extract /// @param startSequence the first ledger to extract
/// @return the last ledger written to the database, if any /// @return the last ledger written to the database, if any
std::optional<uint32_t> std::optional<uint32_t>
runETLPipeline(uint32_t startSequence); runETLPipeline(uint32_t startSequence, int offset);
/// Monitor the network for newly validated ledgers. Also monitor the /// Monitor the network for newly validated ledgers. Also monitor the
/// database to see if any process is writing those ledgers. This function /// database to see if any process is writing those ledgers. This function