Merge branch 'parallel_etl' into master_new

This commit is contained in:
CJ Cobb
2021-04-28 19:55:08 +00:00
10 changed files with 329 additions and 195 deletions

View File

@@ -11,13 +11,14 @@ def getTime(line):
return timestamp.timestamp() return timestamp.timestamp()
def parseLogs(filename, interval): def parseLogs(filename, interval, minTxnCount = 0):
with open(filename) as f: with open(filename) as f:
totalTime = 0 totalTime = 0
totalTxns = 0 totalTxns = 0
totalObjs = 0 totalObjs = 0
totalLoadTime = 0
start = 0 start = 0
@@ -27,6 +28,7 @@ def parseLogs(filename, interval):
intervalTime = 0 intervalTime = 0
intervalTxns = 0 intervalTxns = 0
intervalObjs = 0 intervalObjs = 0
intervalLoadTime = 0
intervalStart = 0 intervalStart = 0
intervalEnd = 0 intervalEnd = 0
@@ -52,6 +54,7 @@ def parseLogs(filename, interval):
loadTime = line[loadTimeIdx + len(loadTimeSubstr):txnsIdx] loadTime = line[loadTimeIdx + len(loadTimeSubstr):txnsIdx]
txnsPerSecond = line[txnsIdx + len(txnsSubstr):objsIdx] txnsPerSecond = line[txnsIdx + len(txnsSubstr):objsIdx]
objsPerSecond = line[objsIdx + len(objsSubstr):-1] objsPerSecond = line[objsIdx + len(objsSubstr):-1]
if int(txnCount) >= minTxnCount:
totalTime += float(loadTime); totalTime += float(loadTime);
totalTxns += float(txnCount) totalTxns += float(txnCount)
totalObjs += float(objCount) totalObjs += float(objCount)
@@ -59,6 +62,8 @@ def parseLogs(filename, interval):
intervalTxns += float(txnCount) intervalTxns += float(txnCount)
intervalObjs += float(objCount) intervalObjs += float(objCount)
totalLoadTime += float(loadTime)
intervalLoadTime += float(loadTime)
if start == 0: if start == 0:
@@ -86,22 +91,26 @@ def parseLogs(filename, interval):
if int(sequence) % interval == 0:
print("Sequence = " + sequence + " : [time, txCount, objCount, txPerSec, objsPerSec]") print("Sequence = " + sequence + " : [time, txCount, objCount, txPerSec, objsPerSec]")
print(loadTime + " : " print(loadTime + " : "
+ txnCount + " : " + txnCount + " : "
+ objCount + " : " + objCount + " : "
+ txnsPerSecond + " : " + txnsPerSecond + " : "
+ objsPerSecond) + objsPerSecond)
print("Interval Aggregate ( " + str(interval) + " ) [ledgers, elapsedTime, ledgersPerSec, txPerSec, objsPerSec]: ") print("Interval Aggregate ( " + str(interval) + " ) [ledgers, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]: ")
print(str(intervalLedgers) + " : " print(str(intervalLedgers) + " : "
+ str(intervalEnd - intervalStart) + " : " + str(intervalEnd - intervalStart) + " : "
+ str(intervalLedgersPerSecond) + " : " + str(intervalLedgersPerSecond) + " : "
+ str(intervalLoadTime/intervalLedgers) + " : "
+ str(intervalTxns/intervalTime) + " : " + str(intervalTxns/intervalTime) + " : "
+ str(intervalObjs/intervalTime)) + str(intervalObjs/intervalTime))
print("Total Aggregate: [ledgers, elapsedTime, ledgersPerSec, txPerSec, objsPerSec]") print("Total Aggregate: [ledgers, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]")
print(str(totalLedgers) + " : " print(str(totalLedgers) + " : "
+ str(end-start) + " : " + str(end-start) + " : "
+ str(ledgersPerSecond) + " : " + str(ledgersPerSecond) + " : "
+ str(totalLoadTime/totalLedgers) + " : "
+ str(totalTxns/totalTime) + " : " + str(totalTxns/totalTime) + " : "
+ str(totalObjs/totalTime)) + str(totalObjs/totalTime))
if int(sequence) % interval == 0: if int(sequence) % interval == 0:
@@ -111,6 +120,7 @@ def parseLogs(filename, interval):
intervalStart = 0 intervalStart = 0
intervalEnd = 0 intervalEnd = 0
intervalLedgers = 0 intervalLedgers = 0
intervalLoadTime = 0
@@ -118,10 +128,11 @@ def parseLogs(filename, interval):
parser = argparse.ArgumentParser(description='parses logs') parser = argparse.ArgumentParser(description='parses logs')
parser.add_argument("--filename") parser.add_argument("--filename")
parser.add_argument("--interval",default=100000) parser.add_argument("--interval",default=100000)
parser.add_argument("--minTxnCount",default=0)
args = parser.parse_args() args = parser.parse_args()
def run(args): def run(args):
parseLogs(args.filename, int(args.interval)) parseLogs(args.filename, int(args.interval), int(args.minTxnCount))
run(args) run(args)

View File

@@ -369,13 +369,15 @@ CassandraBackend::open()
cass_cluster_set_credentials( cass_cluster_set_credentials(
cluster, username.c_str(), getString("password").c_str()); cluster, username.c_str(), getString("password").c_str());
} }
int threads = config_.contains("threads")
? config_["threads"].as_int64()
: std::thread::hardware_concurrency();
unsigned int const workers = std::thread::hardware_concurrency(); rc = cass_cluster_set_num_threads_io(cluster, threads);
rc = cass_cluster_set_num_threads_io(cluster, workers);
if (rc != CASS_OK) if (rc != CASS_OK)
{ {
std::stringstream ss; std::stringstream ss;
ss << "nodestore: Error setting Cassandra io threads to " << workers ss << "nodestore: Error setting Cassandra io threads to " << threads
<< ", result: " << rc << ", " << cass_error_desc(rc); << ", result: " << rc << ", " << cass_error_desc(rc);
throw std::runtime_error(ss.str()); throw std::runtime_error(ss.str());
} }
@@ -538,19 +540,6 @@ CassandraBackend::open()
continue; continue;
query = {}; query = {};
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "keys"
<< " ( key blob, created bigint, deleted bigint, PRIMARY KEY "
"(key, created)) with clustering order by (created "
"desc) ";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query << "SELECT * FROM " << tablePrefix << "keys"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books" query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books"
<< " ( book blob, sequence bigint, key blob, deleted_at " << " ( book blob, sequence bigint, key blob, deleted_at "
"bigint, PRIMARY KEY " "bigint, PRIMARY KEY "
@@ -636,12 +625,6 @@ CassandraBackend::open()
if (!insertTransaction_.prepareStatement(query, session_.get())) if (!insertTransaction_.prepareStatement(query, session_.get()))
continue; continue;
query = {};
query << "INSERT INTO " << tablePrefix << "keys"
<< " (key, created, deleted) VALUES (?, ?, ?)";
if (!insertKey_.prepareStatement(query, session_.get()))
continue;
query = {}; query = {};
query << "INSERT INTO " << tablePrefix << "books" query << "INSERT INTO " << tablePrefix << "books"
<< " (book, key, sequence, deleted_at) VALUES (?, ?, ?, ?)"; << " (book, key, sequence, deleted_at) VALUES (?, ?, ?, ?)";
@@ -653,12 +636,6 @@ CassandraBackend::open()
if (!deleteBook_.prepareStatement(query, session_.get())) if (!deleteBook_.prepareStatement(query, session_.get()))
continue; continue;
query = {};
query << "SELECT created FROM " << tablePrefix << "keys"
<< " WHERE key = ? ORDER BY created desc LIMIT 1";
if (!getCreated_.prepareStatement(query, session_.get()))
continue;
query = {}; query = {};
query << "SELECT object, sequence FROM " << tablePrefix << "objects" query << "SELECT object, sequence FROM " << tablePrefix << "objects"
<< " WHERE key = ? AND sequence <= ? ORDER BY sequence DESC " << " WHERE key = ? AND sequence <= ? ORDER BY sequence DESC "
@@ -689,16 +666,7 @@ CassandraBackend::open()
continue; continue;
query = {}; query = {};
query << "SELECT key FROM " << tablePrefix << "keys " query << "SELECT object,key FROM " << tablePrefix << "objects "
<< " WHERE TOKEN(key) >= ? and created <= ?"
<< " and deleted > ?"
<< " PER PARTITION LIMIT 1 LIMIT ?"
<< " ALLOW FILTERING";
if (!selectLedgerPageKeys_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "SELECT key,object FROM " << tablePrefix << "objects "
<< " WHERE TOKEN(key) >= ? and sequence <= ? " << " WHERE TOKEN(key) >= ? and sequence <= ? "
<< " PER PARTITION LIMIT 1 LIMIT ? ALLOW FILTERING"; << " PER PARTITION LIMIT 1 LIMIT ? ALLOW FILTERING";
@@ -757,8 +725,9 @@ CassandraBackend::open()
continue; continue;
query = {}; query = {};
query << " update " << tablePrefix << "ledger_range" query
<< " set sequence = ? where is_latest = ? if sequence != ?"; << " update " << tablePrefix << "ledger_range"
<< " set sequence = ? where is_latest = ? if sequence in (?,null)";
if (!updateLedgerRange_.prepareStatement(query, session_.get())) if (!updateLedgerRange_.prepareStatement(query, session_.get()))
continue; continue;
@@ -784,6 +753,7 @@ CassandraBackend::open()
setupPreparedStatements = true; setupPreparedStatements = true;
} }
/*
while (true) while (true)
{ {
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
@@ -820,6 +790,7 @@ CassandraBackend::open()
} }
break; break;
} }
*/
if (config_.contains("max_requests_outstanding")) if (config_.contains("max_requests_outstanding"))
{ {

View File

@@ -385,6 +385,26 @@ public:
curGetIndex_++; curGetIndex_++;
return {buf, buf + bufSize}; return {buf, buf + bufSize};
} }
/*
uint32_t
getNumBytes()
{
if (!row_)
throw std::runtime_error("CassandraResult::getBytes - no result");
cass_byte_t const* buf;
std::size_t bufSize;
CassError rc = cass_value_get_bytes(
cass_row_get_column(row_, curGetIndex_), &buf, &bufSize);
if (rc != CASS_OK)
{
std::stringstream msg;
msg << "CassandraResult::getBytes - error getting value: " << rc
<< ", " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << msg.str();
throw std::runtime_error(msg.str());
}
return bufSize;
}*/
ripple::uint256 ripple::uint256
getUInt256() getUInt256()
@@ -759,7 +779,7 @@ public:
CassandraStatement statement{updateLedgerRange_}; CassandraStatement statement{updateLedgerRange_};
statement.bindInt(ledgerSequence_); statement.bindInt(ledgerSequence_);
statement.bindBoolean(true); statement.bindBoolean(true);
statement.bindInt(ledgerSequence_); statement.bindInt(ledgerSequence_ - 1);
return executeSyncUpdate(statement); return executeSyncUpdate(statement);
} }
void void
@@ -979,11 +999,11 @@ public:
size_t prevSize = objects.size(); size_t prevSize = objects.size();
do do
{ {
ripple::uint256 key = result.getUInt256();
std::vector<unsigned char> object = result.getBytes(); std::vector<unsigned char> object = result.getBytes();
if (object.size()) if (object.size())
{ {
objects.push_back({std::move(key), std::move(object)}); objects.push_back(
{result.getUInt256(), std::move(object)});
} }
} while (result.nextRow()); } while (result.nextRow());
size_t prevBatchSize = objects.size() - prevSize; size_t prevBatchSize = objects.size() - prevSize;
@@ -997,17 +1017,7 @@ public:
} }
if (objects.size() < limit) if (objects.size() < limit)
{ {
BOOST_LOG_TRIVIAL(debug) curLimit = 2048;
<< __func__
<< " cur limit = " << std::to_string(curLimit)
<< " , numRows = " << std::to_string(prevBatchSize);
double sparsity =
(double)(curLimit + 1) / (double)(prevBatchSize + 1);
curLimit = (limit - objects.size()) * sparsity;
BOOST_LOG_TRIVIAL(debug)
<< __func__
<< " - sparsity = " << std::to_string(sparsity)
<< " , curLimit = " << std::to_string(curLimit);
} }
assert(objects.size()); assert(objects.size());
currentCursor = objects[objects.size() - 1].key; currentCursor = objects[objects.size() - 1].key;
@@ -1517,13 +1527,6 @@ public:
throw std::runtime_error("decrementing num outstanding below 0"); throw std::runtime_error("decrementing num outstanding below 0");
} }
size_t cur = (--numRequestsOutstanding_); size_t cur = (--numRequestsOutstanding_);
// sanity check
if (!canAddRequest())
{
assert(false);
throw std::runtime_error(
"decremented num outstanding but can't add more");
}
{ {
// mutex lock required to prevent race condition around spurious // mutex lock required to prevent race condition around spurious
// wakeup // wakeup

View File

@@ -110,7 +110,7 @@ class ThreadSafeQueue
public: public:
/// @param maxSize maximum size of the queue. Calls that would cause the /// @param maxSize maximum size of the queue. Calls that would cause the
/// queue to exceed this size will block until free space is available /// queue to exceed this size will block until free space is available
explicit ThreadSafeQueue(uint32_t maxSize) : maxSize_(maxSize) ThreadSafeQueue(uint32_t maxSize) : maxSize_(maxSize)
{ {
} }

View File

@@ -622,7 +622,7 @@ ETLLoadBalancer::fetchLedger(uint32_t ledgerSequence, bool getObjects)
auto [status, data] = auto [status, data] =
source->fetchLedger(ledgerSequence, getObjects); source->fetchLedger(ledgerSequence, getObjects);
response = std::move(data); response = std::move(data);
if (status.ok() && response.validated()) if (status.ok() && (response.validated()|| true))
{ {
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< "Successfully fetched ledger = " << ledgerSequence << "Successfully fetched ledger = " << ledgerSequence
@@ -819,7 +819,7 @@ ETLLoadBalancer::execute(Func f, uint32_t ledgerSequence)
<< __func__ << " : " << __func__ << " : "
<< "Attempting to execute func. ledger sequence = " << "Attempting to execute func. ledger sequence = "
<< ledgerSequence << " - source = " << source->toString(); << ledgerSequence << " - source = " << source->toString();
if (source->hasLedger(ledgerSequence)) if (source->hasLedger(ledgerSequence)|| true)
{ {
bool res = f(source); bool res = f(source);
if (res) if (res)

View File

@@ -863,6 +863,8 @@ BEGIN
RETURN NEW; RETURN NEW;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
CREATE TRIGGER verify_ancestry BEFORE INSERT OR UPDATE on ledgers
FOR EACH ROW EXECUTE PROCEDURE insert_ancestry();
-- Trigger function prior to delete on ledgers table. Disallow gaps from -- Trigger function prior to delete on ledgers table. Disallow gaps from
-- forming. Do not allow deletions if both the previous and next ledgers -- forming. Do not allow deletions if both the previous and next ledgers

View File

@@ -581,7 +581,7 @@ PostgresBackend::finishWrites() const
accountTxBuffer_.str(""); accountTxBuffer_.str("");
accountTxBuffer_.clear(); accountTxBuffer_.clear();
numRowsInObjectsBuffer_ = 0; numRowsInObjectsBuffer_ = 0;
return true; return !abortWrite_;
} }
bool bool
PostgresBackend::doOnlineDelete(uint32_t minLedgerToKeep) const PostgresBackend::doOnlineDelete(uint32_t minLedgerToKeep) const

View File

@@ -154,9 +154,9 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
size_t numAttempts = 0; size_t numAttempts = 0;
while (!stopping_) while (!stopping_)
{ {
auto ledger = flatMapBackend_->fetchLedgerBySequence(ledgerSequence); auto range = flatMapBackend_->fetchLedgerRange();
if (!ledger) if (!range || range->maxSequence < ledgerSequence)
{ {
BOOST_LOG_TRIVIAL(warning) BOOST_LOG_TRIVIAL(warning)
<< __func__ << " : " << __func__ << " : "
@@ -317,8 +317,10 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
// Database must be populated when this starts // Database must be populated when this starts
std::optional<uint32_t> std::optional<uint32_t>
ReportingETL::runETLPipeline(uint32_t startSequence) ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
{ {
if (startSequence > finishSequence_)
return {};
/* /*
* Behold, mortals! This function spawns three separate threads, which talk * Behold, mortals! This function spawns three separate threads, which talk
* to each other via 2 different thread safe queues and 1 atomic variable. * to each other via 2 different thread safe queues and 1 atomic variable.
@@ -354,69 +356,99 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
std::atomic_bool writeConflict = false; std::atomic_bool writeConflict = false;
std::optional<uint32_t> lastPublishedSequence; std::optional<uint32_t> lastPublishedSequence;
constexpr uint32_t maxQueueSize = 1000; uint32_t maxQueueSize = 1000 / numExtractors;
auto begin = std::chrono::system_clock::now(); auto begin = std::chrono::system_clock::now();
using QueueType =
ThreadSafeQueue<std::optional<org::xrpl::rpc::v1::GetLedgerResponse>>;
std::vector<std::shared_ptr<QueueType>> queues;
ThreadSafeQueue<std::optional<org::xrpl::rpc::v1::GetLedgerResponse>> auto getNext = [&queues, &startSequence, &numExtractors](
transformQueue{maxQueueSize}; uint32_t sequence) -> std::shared_ptr<QueueType> {
std::cout << std::to_string((sequence - startSequence) % numExtractors);
return queues[(sequence - startSequence) % numExtractors];
};
std::vector<std::thread> threads;
for (size_t i = 0; i < numExtractors; ++i)
{
auto transformQueue = std::make_shared<QueueType>(maxQueueSize);
queues.push_back(transformQueue);
std::cout << "added to queues";
std::thread extracter{[this, threads.emplace_back([this,
&startSequence, &startSequence,
&writeConflict, &writeConflict,
&transformQueue]() { transformQueue,
i,
numExtractors]() {
beast::setCurrentThreadName("rippled: ReportingETL extract"); beast::setCurrentThreadName("rippled: ReportingETL extract");
uint32_t currentSequence = startSequence; uint32_t currentSequence = startSequence + i;
double totalTime = 0;
// there are two stopping conditions here. // there are two stopping conditions here.
// First, if there is a write conflict in the load thread, the ETL // First, if there is a write conflict in the load thread, the
// mechanism should stop. // ETL mechanism should stop. The other stopping condition is if
// The other stopping condition is if the entire server is shutting // the entire server is shutting down. This can be detected in a
// down. This can be detected in a variety of ways. See the comment // variety of ways. See the comment at the top of the function
// at the top of the function while (currentSequence <= finishSequence_ &&
while (networkValidatedLedgers_.waitUntilValidatedByNetwork( networkValidatedLedgers_.waitUntilValidatedByNetwork(
currentSequence) && currentSequence) &&
!writeConflict && !isStopping()) !writeConflict && !isStopping())
{ {
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> fetchResponse{ std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
fetchLedgerDataAndDiff(currentSequence)}; fetchResponse{fetchLedgerDataAndDiff(currentSequence)};
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
auto time = ((end - start).count()) / 1000000000.0; auto time = ((end - start).count()) / 1000000000.0;
auto tps = totalTime += time;
fetchResponse->transactions_list().transactions_size() / time;
BOOST_LOG_TRIVIAL(info) << "Extract phase time = " << time auto tps =
<< " . Extract phase tps = " << tps; fetchResponse->transactions_list().transactions_size() /
// if the fetch is unsuccessful, stop. fetchLedger only returns time;
// false if the server is shutting down, or if the ledger was
// found in the database (which means another process already BOOST_LOG_TRIVIAL(info)
// wrote the ledger that this process was trying to extract; << "Extract phase time = " << time
// this is a form of a write conflict). Otherwise, << " . Extract phase tps = " << tps
// fetchLedgerDataAndDiff will keep trying to fetch the << " . Avg extract time = "
// specified ledger until successful << totalTime / (currentSequence - startSequence + 1)
<< " . thread num = " << i
<< " . seq = " << currentSequence;
// if the fetch is unsuccessful, stop. fetchLedger only
// returns false if the server is shutting down, or if the
// ledger was found in the database (which means another
// process already wrote the ledger that this process was
// trying to extract; this is a form of a write conflict).
// Otherwise, fetchLedgerDataAndDiff will keep trying to
// fetch the specified ledger until successful
if (!fetchResponse) if (!fetchResponse)
{ {
break; break;
} }
transformQueue.push(std::move(fetchResponse)); transformQueue->push(std::move(fetchResponse));
++currentSequence; currentSequence += numExtractors;
if (currentSequence > finishSequence_)
break;
} }
// empty optional tells the transformer to shut down // empty optional tells the transformer to shut down
transformQueue.push({}); transformQueue->push({});
}}; });
}
std::thread transformer{[this, std::thread transformer{[this,
&writeConflict, &writeConflict,
&transformQueue, &startSequence,
&getNext,
&lastPublishedSequence]() { &lastPublishedSequence]() {
beast::setCurrentThreadName("rippled: ReportingETL transform"); beast::setCurrentThreadName("rippled: ReportingETL transform");
uint32_t currentSequence = startSequence;
while (!writeConflict) while (!writeConflict)
{ {
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> fetchResponse{ std::optional<org::xrpl::rpc::v1::GetLedgerResponse> fetchResponse{
transformQueue.pop()}; getNext(currentSequence)->pop()};
++currentSequence;
// if fetchResponse is an empty optional, the extracter thread // if fetchResponse is an empty optional, the extracter thread
// has stopped and the transformer should stop as well // has stopped and the transformer should stop as well
if (!fetchResponse) if (!fetchResponse)
@@ -434,6 +466,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0; auto duration = ((end - start).count()) / 1000000000.0;
if (success)
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< "Load phase of etl : " << "Load phase of etl : "
<< "Successfully published ledger! Ledger info: " << "Successfully published ledger! Ledger info: "
@@ -442,12 +475,16 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
<< ". load time = " << duration << ". load time = " << duration
<< ". load txns per second = " << numTxns / duration << ". load txns per second = " << numTxns / duration
<< ". load objs per second = " << numObjects / duration; << ". load objs per second = " << numObjects / duration;
else
BOOST_LOG_TRIVIAL(error)
<< "Error writing ledger. " << detail::toString(lgrInfo);
// success is false if the ledger was already written // success is false if the ledger was already written
if (success) if (success)
{ {
publishLedger(lgrInfo); publishLedger(lgrInfo);
lastPublishedSequence = lgrInfo.seq; lastPublishedSequence = lgrInfo.seq;
} }
writeConflict = !success;
auto range = flatMapBackend_->fetchLedgerRange(); auto range = flatMapBackend_->fetchLedgerRange();
if (onlineDeleteInterval_ && !deleting_ && if (onlineDeleteInterval_ && !deleting_ &&
range->maxSequence - range->minSequence > range->maxSequence - range->minSequence >
@@ -466,7 +503,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
}}; }};
// wait for all of the threads to stop // wait for all of the threads to stop
extracter.join(); for (auto& t : threads)
t.join();
transformer.join(); transformer.join();
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
@@ -599,7 +637,7 @@ ReportingETL::monitor()
// doContinousETLPipelined returns the most recent sequence // doContinousETLPipelined returns the most recent sequence
// published empty optional if no sequence was published // published empty optional if no sequence was published
std::optional<uint32_t> lastPublished = std::optional<uint32_t> lastPublished =
runETLPipeline(nextSequence); runETLPipeline(nextSequence, extractorThreads_);
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< __func__ << " : " << __func__ << " : "
<< "Aborting ETL. Falling back to publishing"; << "Aborting ETL. Falling back to publishing";
@@ -659,9 +697,13 @@ ReportingETL::ReportingETL(
flatMapBackend_->open(); flatMapBackend_->open();
if (config.contains("start_sequence")) if (config.contains("start_sequence"))
startSequence_ = config.at("start_sequence").as_int64(); startSequence_ = config.at("start_sequence").as_int64();
if (config.contains("finish_sequence"))
finishSequence_ = config.at("finish_sequence").as_int64();
if (config.contains("read_only")) if (config.contains("read_only"))
readOnly_ = config.at("read_only").as_bool(); readOnly_ = config.at("read_only").as_bool();
if (config.contains("online_delete")) if (config.contains("online_delete"))
onlineDeleteInterval_ = config.at("online_delete").as_int64(); onlineDeleteInterval_ = config.at("online_delete").as_int64();
if (config.contains("extractor_threads"))
extractorThreads_ = config.at("extractor_threads").as_int64();
} }

View File

@@ -61,6 +61,7 @@ class ReportingETL
private: private:
std::unique_ptr<BackendInterface> flatMapBackend_; std::unique_ptr<BackendInterface> flatMapBackend_;
std::optional<uint32_t> onlineDeleteInterval_; std::optional<uint32_t> onlineDeleteInterval_;
uint32_t extractorThreads_ = 1;
std::thread worker_; std::thread worker_;
boost::asio::io_context& ioContext_; boost::asio::io_context& ioContext_;
@@ -130,6 +131,7 @@ private:
/// the next ledger validated by the network. If this is set, and the /// the next ledger validated by the network. If this is set, and the
/// database is already populated, an error is thrown. /// database is already populated, an error is thrown.
std::optional<uint32_t> startSequence_; std::optional<uint32_t> startSequence_;
std::optional<uint32_t> finishSequence_;
/// The time that the most recently published ledger was published. Used by /// The time that the most recently published ledger was published. Used by
/// server_info /// server_info
@@ -166,7 +168,7 @@ private:
/// @param startSequence the first ledger to extract /// @param startSequence the first ledger to extract
/// @return the last ledger written to the database, if any /// @return the last ledger written to the database, if any
std::optional<uint32_t> std::optional<uint32_t>
runETLPipeline(uint32_t startSequence); runETLPipeline(uint32_t startSequence, int offset);
/// Monitor the network for newly validated ledgers. Also monitor the /// Monitor the network for newly validated ledgers. Also monitor the
/// database to see if any process is writing those ledgers. This function /// database to see if any process is writing those ledgers. This function

169
test.py
View File

@@ -33,24 +33,23 @@ def compareAccountInfo(aldous, p2p):
print("Response mismatch") print("Response mismatch")
print(aldous) print(aldous)
print(p2p) print(p2p)
def compareTx(aldous, p2p): def compareTx(aldous, p2p):
p2p = p2p["result"] p2p = p2p["result"]
if aldous["transaction"] != p2p["tx"]: if aldous["transaction"] != p2p["tx"]:
print("Transaction mismatch") print("transaction mismatch")
print(aldous["transaction"]) print(aldous["transaction"])
print(p2p["tx"]) print(p2p["tx"])
return False return False
if aldous["metadata"] != p2p["meta"]: if aldous["metadata"] != p2p["meta"] and not isinstance(p2p["meta"],dict):
print("Metadata mismatch") print("metadata mismatch")
print(aldous["metadata"]) print("aldous : " + aldous["metadata"])
print(p2p["metadata"]) print("p2p : " + str(p2p["meta"]))
return False return False
if aldous["ledger_sequence"] != p2p["ledger_sequence"]: if aldous["ledger_sequence"] != p2p["ledger_index"]:
print("ledger sequence mismatch") print("ledger sequence mismatch")
print(aldous["ledger_sequence"]) print(aldous["ledger_sequence"])
print(p2p["ledger_sequence"]) print(p2p["ledger_index"])
print("Responses match!!") print("responses match!!")
return True return True
def compareAccountTx(aldous, p2p): def compareAccountTx(aldous, p2p):
@@ -63,8 +62,6 @@ def compareAccountTx(aldous, p2p):
p2pMetas = [] p2pMetas = []
p2pLedgerSequences = [] p2pLedgerSequences = []
for x in p2p["transactions"]: for x in p2p["transactions"]:
if int(x["ledger_index"]) > maxLedger:
continue
p2pTxns.append(x["tx_blob"]) p2pTxns.append(x["tx_blob"])
p2pMetas.append(x["meta"]) p2pMetas.append(x["meta"])
p2pLedgerSequences.append(x["ledger_index"]) p2pLedgerSequences.append(x["ledger_index"])
@@ -72,8 +69,6 @@ def compareAccountTx(aldous, p2p):
aldousMetas = [] aldousMetas = []
aldousLedgerSequences = [] aldousLedgerSequences = []
for x in aldous["transactions"]: for x in aldous["transactions"]:
if int(x["ledger_sequence"]) < minLedger:
continue
aldousTxns.append(x["transaction"]) aldousTxns.append(x["transaction"])
aldousMetas.append(x["metadata"]) aldousMetas.append(x["metadata"])
aldousLedgerSequences.append(x["ledger_sequence"]) aldousLedgerSequences.append(x["ledger_sequence"])
@@ -171,7 +166,7 @@ async def account_tx(ip, port, account, binary, minLedger=None, maxLedger=None):
except websockets.exceptions.ConnectionClosedError as e: except websockets.exceptions.ConnectionClosedError as e:
print(e) print(e)
async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=None): async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=None,numPages=10):
address = 'ws://' + str(ip) + ':' + str(port) address = 'ws://' + str(ip) + ':' + str(port)
try: try:
cursor = None cursor = None
@@ -202,11 +197,14 @@ async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=No
results["transactions"].extend(res["transactions"]) results["transactions"].extend(res["transactions"])
if "cursor" in res: if "cursor" in res:
cursor = {"ledger_sequence":res["cursor"]["ledger_sequence"],"transaction_index":res["cursor"]["transaction_index"]} cursor = {"ledger_sequence":res["cursor"]["ledger_sequence"],"transaction_index":res["cursor"]["transaction_index"]}
print(cursor)
elif "result" in res and "marker" in res["result"]: elif "result" in res and "marker" in res["result"]:
print(res["result"]["marker"])
marker={"ledger":res["result"]["marker"]["ledger"],"seq":res["result"]["marker"]["seq"]} marker={"ledger":res["result"]["marker"]["ledger"],"seq":res["result"]["marker"]["seq"]}
print(marker)
else: else:
break break
if numCalls > numPages:
break
return results return results
except websockets.exceptions.ConnectionClosedError as e: except websockets.exceptions.ConnectionClosedError as e:
print(e) print(e)
@@ -218,6 +216,7 @@ async def tx(ip, port, tx_hash, binary):
await ws.send(json.dumps({"command":"tx","transaction":tx_hash,"binary":bool(binary)})) await ws.send(json.dumps({"command":"tx","transaction":tx_hash,"binary":bool(binary)}))
res = json.loads(await ws.recv()) res = json.loads(await ws.recv())
print(json.dumps(res,indent=4,sort_keys=True)) print(json.dumps(res,indent=4,sort_keys=True))
return res
except websockets.exceptions.connectionclosederror as e: except websockets.exceptions.connectionclosederror as e:
print(e) print(e)
@@ -228,17 +227,41 @@ async def ledger_entry(ip, port, index, ledger, binary):
await ws.send(json.dumps({"command":"ledger_entry","index":index,"binary":bool(binary),"ledger_index":int(ledger)})) await ws.send(json.dumps({"command":"ledger_entry","index":index,"binary":bool(binary),"ledger_index":int(ledger)}))
res = json.loads(await ws.recv()) res = json.loads(await ws.recv())
print(json.dumps(res,indent=4,sort_keys=True)) print(json.dumps(res,indent=4,sort_keys=True))
if "result" in res:
res = res["result"]
if "object" in res:
return (index,res["object"])
else:
return (index,res["node_binary"])
except websockets.exceptions.connectionclosederror as e: except websockets.exceptions.connectionclosederror as e:
print(e) print(e)
async def ledger_entries(ip, port,ledger):
address = 'ws://' + str(ip) + ':' + str(port)
entries = await ledger_data(ip, port, ledger, 200, True)
try:
async with websockets.connect(address) as ws:
objects = []
for x,y in zip(entries[0],entries[1]):
await ws.send(json.dumps({"command":"ledger_entry","index":x,"binary":True,"ledger_index":int(ledger)}))
res = json.loads(await ws.recv())
objects.append((x,res["object"]))
if res["object"] != y:
print("data mismatch")
return None
print("Data matches!")
return objects
except websockets.exceptions.connectionclosederror as e:
print(e)
async def ledger_data(ip, port, ledger, limit, binary): async def ledger_data(ip, port, ledger, limit, binary):
address = 'ws://' + str(ip) + ':' + str(port) address = 'ws://' + str(ip) + ':' + str(port)
try: try:
async with websockets.connect(address) as ws: async with websockets.connect(address) as ws:
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary)})) await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"limit":int(limit)}))
res = json.loads(await ws.recv()) res = json.loads(await ws.recv())
print(json.dumps(res,indent=4,sort_keys=True))
objects = [] objects = []
blobs = [] blobs = []
keys = [] keys = []
@@ -246,10 +269,14 @@ async def ledger_data(ip, port, ledger, limit, binary):
objects = res["result"]["state"] objects = res["result"]["state"]
else: else:
objects = res["objects"] objects = res["objects"]
if binary:
for x in objects: for x in objects:
blobs.append(x["data"]) blobs.append(x["data"])
keys.append(x["index"]) keys.append(x["index"])
if len(x["index"]) != 64:
print("bad key")
return (keys,blobs) return (keys,blobs)
except websockets.exceptions.connectionclosederror as e: except websockets.exceptions.connectionclosederror as e:
print(e) print(e)
@@ -271,11 +298,13 @@ async def ledger_data_full(ip, port, ledger, binary, limit):
blobs = [] blobs = []
keys = [] keys = []
async with websockets.connect(address) as ws: async with websockets.connect(address) as ws:
if int(limit) < 2048:
limit = 2048
marker = None marker = None
while True: while True:
res = {} res = {}
if marker is None: if marker is None:
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary), "limit":int(limit)})) await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":binary, "limit":int(limit)}))
res = json.loads(await ws.recv()) res = json.loads(await ws.recv())
else: else:
@@ -334,7 +363,7 @@ def compare_book_offers(aldous, p2p):
async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency, get_issuer, binary): async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency, get_issuer, binary, limit):
address = 'ws://' + str(ip) + ':' + str(port) address = 'ws://' + str(ip) + ':' + str(port)
try: try:
@@ -348,7 +377,7 @@ async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency,
taker_pays = json.loads("{\"currency\":\"" + pay_currency + "\"}") taker_pays = json.loads("{\"currency\":\"" + pay_currency + "\"}")
if pay_issuer is not None: if pay_issuer is not None:
taker_pays["issuer"] = pay_issuer taker_pays["issuer"] = pay_issuer
req = {"command":"book_offers","ledger_index":int(ledger), "taker_pays":taker_pays, "taker_gets":taker_gets, "binary":bool(binary)} req = {"command":"book_offers","ledger_index":int(ledger), "taker_pays":taker_pays, "taker_gets":taker_gets, "binary":bool(binary), "limit":int(limit)}
if cursor is not None: if cursor is not None:
req["cursor"] = cursor req["cursor"] = cursor
await ws.send(json.dumps(req)) await ws.send(json.dumps(req))
@@ -360,6 +389,7 @@ async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency,
offers.append(x) offers.append(x)
if "cursor" in res: if "cursor" in res:
cursor = res["cursor"] cursor = res["cursor"]
print(cursor)
else: else:
print(len(offers)) print(len(offers))
return offers return offers
@@ -443,36 +473,70 @@ async def ledger_range(ip, port):
if "error" in res: if "error" in res:
await ws.send(json.dumps({"command":"server_info"})) await ws.send(json.dumps({"command":"server_info"}))
res = json.loads(await ws.recv()) res = json.loads(await ws.recv())
print(res)
rng = res["result"]["info"]["complete_ledgers"] rng = res["result"]["info"]["complete_ledgers"]
if rng == "empty":
return (0,0)
idx = rng.find("-") idx = rng.find("-")
return (int(rng[0:idx]),int(rng[idx+1:-1])) return (int(rng[0:idx]),int(rng[idx+1:-1]))
return (res["ledger_index_min"],res["ledger_index_max"]) return (res["ledger_index_min"],res["ledger_index_max"])
except websockets.exceptions.connectionclosederror as e: except websockets.exceptions.connectionclosederror as e:
print(e) print(e)
async def fee(ip, port):
address = 'ws://' + str(ip) + ':' + str(port)
try:
async with websockets.connect(address) as ws:
await ws.send(json.dumps({"command":"fee"}))
res = json.loads(await ws.recv())
print(json.dumps(res,indent=4,sort_keys=True))
except websockets.exceptions.connectionclosederror as e:
print(e)
async def ledger_diff(ip, port, base, desired, includeBlobs):
address = 'ws://' + str(ip) + ':' + str(port)
try:
async with websockets.connect(address) as ws:
await ws.send(json.dumps({"command":"ledger_diff","base_ledger":int(base),"desired_ledger":int(desired),"include_blobs":bool(includeBlobs)}))
res = json.loads(await ws.recv())
print(json.dumps(res,indent=4,sort_keys=True))
except websockets.exceptions.connectionclosederror as e:
print(e)
async def perf(ip, port):
res = await ledger_range(ip,port)
time.sleep(10)
res2 = await ledger_range(ip,port)
lps = ((int(res2[1]) - int(res[1])) / 10.0)
print(lps)
parser = argparse.ArgumentParser(description='test script for xrpl-reporting') parser = argparse.ArgumentParser(description='test script for xrpl-reporting')
parser.add_argument('action', choices=["account_info", "tx", "account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry"]) parser.add_argument('action', choices=["account_info", "tx", "account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry","ledger_diff","ledger_entries","perf","fee"])
parser.add_argument('--ip', default='127.0.0.1') parser.add_argument('--ip', default='127.0.0.1')
parser.add_argument('--port', default='8080') parser.add_argument('--port', default='8080')
parser.add_argument('--hash') parser.add_argument('--hash')
parser.add_argument('--account', default="rw2ciyaNshpHe7bCHo4bRWq6pqqynnWKQg") parser.add_argument('--account')
parser.add_argument('--ledger') parser.add_argument('--ledger')
parser.add_argument('--limit', default='200') parser.add_argument('--limit', default='200')
parser.add_argument('--taker_pays_issuer',default='rvYAfWj5gh67oV6fW32ZzP3Aw4Eubs59B') parser.add_argument('--taker_pays_issuer',default='rvYAfWj5gh67oV6fW32ZzP3Aw4Eubs59B')
parser.add_argument('--taker_pays_currency',default='USD') parser.add_argument('--taker_pays_currency',default='USD')
parser.add_argument('--taker_gets_issuer') parser.add_argument('--taker_gets_issuer')
parser.add_argument('--taker_gets_currency',default='XRP') parser.add_argument('--taker_gets_currency',default='XRP')
parser.add_argument('--p2pIp', default='127.0.0.1') parser.add_argument('--p2pIp', default='s2.ripple.com')
parser.add_argument('--p2pPort', default='6005') parser.add_argument('--p2pPort', default='51233')
parser.add_argument('--verify',default=False) parser.add_argument('--verify',default=False)
parser.add_argument('--binary',default=False) parser.add_argument('--binary',default=True)
parser.add_argument('--expand',default=False) parser.add_argument('--expand',default=False)
parser.add_argument('--transactions',default=False) parser.add_argument('--transactions',default=False)
parser.add_argument('--minLedger',default=-1) parser.add_argument('--minLedger',default=-1)
parser.add_argument('--maxLedger',default=-1) parser.add_argument('--maxLedger',default=-1)
parser.add_argument('--filename',default=None) parser.add_argument('--filename',default=None)
parser.add_argument('--index') parser.add_argument('--index')
parser.add_argument('--numPages',default=3)
parser.add_argument('--base')
parser.add_argument('--desired')
parser.add_argument('--includeBlobs',default=False)
@@ -483,7 +547,12 @@ def run(args):
asyncio.set_event_loop(asyncio.new_event_loop()) asyncio.set_event_loop(asyncio.new_event_loop())
if(args.ledger is None): if(args.ledger is None):
args.ledger = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port))[1] args.ledger = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port))[1]
if args.action == "account_info": if args.action == "fee":
asyncio.get_event_loop().run_until_complete(fee(args.ip, args.port))
elif args.action == "perf":
asyncio.get_event_loop().run_until_complete(
perf(args.ip,args.port))
elif args.action == "account_info":
res1 = asyncio.get_event_loop().run_until_complete( res1 = asyncio.get_event_loop().run_until_complete(
account_info(args.ip, args.port, args.account, args.ledger, args.binary)) account_info(args.ip, args.port, args.account, args.ledger, args.binary))
if args.verify: if args.verify:
@@ -493,18 +562,46 @@ def run(args):
elif args.action == "ledger_entry": elif args.action == "ledger_entry":
asyncio.get_event_loop().run_until_complete( asyncio.get_event_loop().run_until_complete(
ledger_entry(args.ip, args.port, args.index, args.ledger, args.binary)) ledger_entry(args.ip, args.port, args.index, args.ledger, args.binary))
elif args.action == "ledger_entries":
res = asyncio.get_event_loop().run_until_complete(
ledger_entries(args.ip, args.port, args.ledger))
if args.verify:
objects = []
for x in res:
res2 = asyncio.get_event_loop().run_until_complete(
ledger_entry(args.p2pIp, args.p2pPort,x[0] , args.ledger, True))
if res2[1] != x[1]:
print("mismatch!")
return
print("Data matches!")
elif args.action == "ledger_diff":
asyncio.get_event_loop().run_until_complete(
ledger_diff(args.ip, args.port, args.base, args.desired, args.includeBlobs))
elif args.action == "tx": elif args.action == "tx":
if args.verify:
args.binary = True
if args.hash is None: if args.hash is None:
args.hash = getHashes(asyncio.get_event_loop().run_until_complete(ledger(args.ip,args.port,args.ledger,False,True,False)))[0] args.hash = getHashes(asyncio.get_event_loop().run_until_complete(ledger(args.ip,args.port,args.ledger,False,True,False)))[0]
asyncio.get_event_loop().run_until_complete( res = asyncio.get_event_loop().run_until_complete(
tx(args.ip, args.port, args.hash, args.binary)) tx(args.ip, args.port, args.hash, args.binary))
if args.verify:
res2 = asyncio.get_event_loop().run_until_complete(
tx(args.p2pIp, args.p2pPort, args.hash, args.binary))
print(compareTx(res,res2))
elif args.action == "account_tx": elif args.action == "account_tx":
if args.verify: if args.verify:
args.binary=True args.binary=True
if args.account is None:
args.hash = getHashes(asyncio.get_event_loop().run_until_complete(ledger(args.ip,args.port,args.ledger,False,True,False)))[0]
res = asyncio.get_event_loop().run_until_complete(tx(args.ip,args.port,args.hash,False))
args.account = res["transaction"]["Account"]
rng = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port))
res = asyncio.get_event_loop().run_until_complete( res = asyncio.get_event_loop().run_until_complete(
account_tx(args.ip, args.port, args.account, args.binary)) account_tx(args.ip, args.port, args.account, args.binary))
rng = getMinAndMax(res)
if args.verify: if args.verify:
res2 = asyncio.get_event_loop().run_until_complete( res2 = asyncio.get_event_loop().run_until_complete(
@@ -513,13 +610,19 @@ def run(args):
elif args.action == "account_tx_full": elif args.action == "account_tx_full":
if args.verify: if args.verify:
args.binary=True args.binary=True
rng = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port)) if args.account is None:
args.hash = getHashes(asyncio.get_event_loop().run_until_complete(ledger(args.ip,args.port,args.ledger,False,True,False)))[0]
res = asyncio.get_event_loop().run_until_complete(tx(args.ip,args.port,args.hash,False))
args.account = res["transaction"]["Account"]
res = asyncio.get_event_loop().run_until_complete( res = asyncio.get_event_loop().run_until_complete(
account_tx_full(args.ip, args.port, args.account, args.binary)) account_tx_full(args.ip, args.port, args.account, args.binary,None,None,int(args.numPages)))
rng = getMinAndMax(res)
print(len(res["transactions"])) print(len(res["transactions"]))
if args.verify: if args.verify:
print("requesting p2p node")
res2 = asyncio.get_event_loop().run_until_complete( res2 = asyncio.get_event_loop().run_until_complete(
account_tx_full(args.p2pIp, args.p2pPort, args.account, args.binary, rng[0],rng[1])) account_tx_full(args.p2pIp, args.p2pPort, args.account, args.binary, rng[0],rng[1],int(args.numPages)))
print(compareAccountTx(res,res2)) print(compareAccountTx(res,res2))
elif args.action == "ledger_data": elif args.action == "ledger_data":
@@ -559,10 +662,10 @@ def run(args):
if args.verify: if args.verify:
args.binary=True args.binary=True
res = asyncio.get_event_loop().run_until_complete( res = asyncio.get_event_loop().run_until_complete(
book_offers(args.ip, args.port, args.ledger, args.taker_pays_currency, args.taker_pays_issuer, args.taker_gets_currency, args.taker_gets_issuer, args.binary)) book_offers(args.ip, args.port, args.ledger, args.taker_pays_currency, args.taker_pays_issuer, args.taker_gets_currency, args.taker_gets_issuer, args.binary,args.limit))
if args.verify: if args.verify:
res2 = asyncio.get_event_loop().run_until_complete( res2 = asyncio.get_event_loop().run_until_complete(
book_offers(args.p2pIp, args.p2pPort, args.ledger, args.taker_pays_currency, args.taker_pays_issuer, args.taker_gets_currency, args.taker_gets_issuer, args.binary)) book_offers(args.p2pIp, args.p2pPort, args.ledger, args.taker_pays_currency, args.taker_pays_issuer, args.taker_gets_currency, args.taker_gets_issuer, args.binary, args.limit))
print(compare_book_offers(res,res2)) print(compare_book_offers(res,res2))
else: else: