proper parallel extraction

This commit is contained in:
CJ Cobb
2021-03-24 05:33:27 -04:00
parent a7a71ec0c7
commit 6eb87bfaff
4 changed files with 65 additions and 57 deletions

View File

@@ -758,7 +758,7 @@ CassandraBackend::open()
query = {}; query = {};
query << " update " << tablePrefix << "ledger_range" query << " update " << tablePrefix << "ledger_range"
<< " set sequence = ? where is_latest = ? if sequence != ?"; << " set sequence = ? where is_latest = ?";
if (!updateLedgerRange_.prepareStatement(query, session_.get())) if (!updateLedgerRange_.prepareStatement(query, session_.get()))
continue; continue;

View File

@@ -751,13 +751,11 @@ public:
CassandraStatement statement{updateLedgerRange_}; CassandraStatement statement{updateLedgerRange_};
statement.bindInt(ledgerSequence_); statement.bindInt(ledgerSequence_);
statement.bindBoolean(false); statement.bindBoolean(false);
statement.bindInt(ledgerSequence_);
executeSyncWrite(statement); executeSyncWrite(statement);
} }
CassandraStatement statement{updateLedgerRange_}; CassandraStatement statement{updateLedgerRange_};
statement.bindInt(ledgerSequence_); statement.bindInt(ledgerSequence_);
statement.bindBoolean(true); statement.bindBoolean(true);
statement.bindInt(ledgerSequence_);
return executeSyncUpdate(statement); return executeSyncUpdate(statement);
} }
void void

View File

@@ -110,7 +110,7 @@ class ThreadSafeQueue
public: public:
/// @param maxSize maximum size of the queue. Calls that would cause the /// @param maxSize maximum size of the queue. Calls that would cause the
/// queue to exceed this size will block until free space is available /// queue to exceed this size will block until free space is available
explicit ThreadSafeQueue(uint32_t maxSize) : maxSize_(maxSize) ThreadSafeQueue(uint32_t maxSize) : maxSize_(maxSize)
{ {
} }

View File

@@ -317,7 +317,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
// Database must be populated when this starts // Database must be populated when this starts
std::optional<uint32_t> std::optional<uint32_t>
ReportingETL::runETLPipeline(uint32_t startSequence, int offset) ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
{ {
/* /*
* Behold, mortals! This function spawns three separate threads, which talk * Behold, mortals! This function spawns three separate threads, which talk
@@ -356,68 +356,80 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int offset)
std::optional<uint32_t> lastPublishedSequence; std::optional<uint32_t> lastPublishedSequence;
constexpr uint32_t maxQueueSize = 1000; constexpr uint32_t maxQueueSize = 1000;
auto begin = std::chrono::system_clock::now(); auto begin = std::chrono::system_clock::now();
using QueueType =
ThreadSafeQueue<std::optional<org::xrpl::rpc::v1::GetLedgerResponse>>;
std::vector<std::shared_ptr<QueueType>> queues;
ThreadSafeQueue<std::optional<org::xrpl::rpc::v1::GetLedgerResponse>> auto getNext = [&queues, &startSequence, &numExtractors](
transformQueue{maxQueueSize}; uint32_t sequence) -> std::shared_ptr<QueueType> {
std::cout << std::to_string((sequence - startSequence) % numExtractors);
return queues[(sequence - startSequence) % numExtractors];
};
std::vector<std::thread> threads;
for (size_t i = 1; i < numExtractors + 1; ++i)
{
auto transformQueue = std::make_shared<QueueType>(maxQueueSize);
queues.push_back(transformQueue);
std::cout << "added to queues";
std::thread extracter{[this, threads.emplace_back(
&startSequence, [this, &startSequence, &writeConflict, transformQueue, &i]() {
&writeConflict, beast::setCurrentThreadName("rippled: ReportingETL extract");
&transformQueue, uint32_t currentSequence = startSequence + i;
&offset]() {
beast::setCurrentThreadName("rippled: ReportingETL extract");
uint32_t currentSequence = startSequence + offset;
// there are two stopping conditions here. // there are two stopping conditions here.
// First, if there is a write conflict in the load thread, the ETL // First, if there is a write conflict in the load thread, the
// mechanism should stop. // ETL mechanism should stop. The other stopping condition is if
// The other stopping condition is if the entire server is shutting // the entire server is shutting down. This can be detected in a
// down. This can be detected in a variety of ways. See the comment // variety of ways. See the comment at the top of the function
// at the top of the function while (networkValidatedLedgers_.waitUntilValidatedByNetwork(
while (networkValidatedLedgers_.waitUntilValidatedByNetwork( currentSequence) &&
currentSequence) && !writeConflict && !isStopping())
!writeConflict && !isStopping()) {
{ auto start = std::chrono::system_clock::now();
auto start = std::chrono::system_clock::now(); std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> fetchResponse{ fetchResponse{fetchLedgerDataAndDiff(currentSequence)};
fetchLedgerDataAndDiff(currentSequence)}; auto end = std::chrono::system_clock::now();
auto end = std::chrono::system_clock::now();
auto time = ((end - start).count()) / 1000000000.0; auto time = ((end - start).count()) / 1000000000.0;
auto tps = auto tps =
fetchResponse->transactions_list().transactions_size() / time; fetchResponse->transactions_list().transactions_size() /
time;
BOOST_LOG_TRIVIAL(info) << "Extract phase time = " << time BOOST_LOG_TRIVIAL(info) << "Extract phase time = " << time
<< " . Extract phase tps = " << tps; << " . Extract phase tps = " << tps;
// if the fetch is unsuccessful, stop. fetchLedger only returns // if the fetch is unsuccessful, stop. fetchLedger only
// false if the server is shutting down, or if the ledger was // returns false if the server is shutting down, or if the
// found in the database (which means another process already // ledger was found in the database (which means another
// wrote the ledger that this process was trying to extract; // process already wrote the ledger that this process was
// this is a form of a write conflict). Otherwise, // trying to extract; this is a form of a write conflict).
// fetchLedgerDataAndDiff will keep trying to fetch the // Otherwise, fetchLedgerDataAndDiff will keep trying to
// specified ledger until successful // fetch the specified ledger until successful
if (!fetchResponse) if (!fetchResponse)
{ {
break; break;
} }
transformQueue.push(std::move(fetchResponse)); transformQueue->push(std::move(fetchResponse));
currentSequence += offset; currentSequence += i;
} }
// empty optional tells the transformer to shut down // empty optional tells the transformer to shut down
transformQueue.push({}); transformQueue->push({});
}}; });
}
std::thread transformer{[this, std::thread transformer{[this,
&writeConflict, &writeConflict,
&transformQueue, &startSequence,
&getNext,
&lastPublishedSequence]() { &lastPublishedSequence]() {
beast::setCurrentThreadName("rippled: ReportingETL transform"); beast::setCurrentThreadName("rippled: ReportingETL transform");
uint32_t currentSequence = startSequence;
while (!writeConflict) while (!writeConflict)
{ {
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> fetchResponse{ std::optional<org::xrpl::rpc::v1::GetLedgerResponse> fetchResponse{
transformQueue.pop()}; getNext(currentSequence)->pop()};
// if fetchResponse is an empty optional, the extracter thread // if fetchResponse is an empty optional, the extracter thread
// has stopped and the transformer should stop as well // has stopped and the transformer should stop as well
if (!fetchResponse) if (!fetchResponse)
@@ -467,7 +479,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int offset)
}}; }};
// wait for all of the threads to stop // wait for all of the threads to stop
extracter.join(); for (auto& t : threads)
t.join();
transformer.join(); transformer.join();
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
@@ -600,10 +613,7 @@ ReportingETL::monitor()
// doContinousETLPipelined returns the most recent sequence // doContinousETLPipelined returns the most recent sequence
// published empty optional if no sequence was published // published empty optional if no sequence was published
std::optional<uint32_t> lastPublished = nextSequence; std::optional<uint32_t> lastPublished = nextSequence;
for (size_t i = 0; i < 10; ++i) runETLPipeline(nextSequence, 10);
{
runETLPipeline(nextSequence, i);
}
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< __func__ << " : " << __func__ << " : "
<< "Aborting ETL. Falling back to publishing"; << "Aborting ETL. Falling back to publishing";