Merge branch 'postgres_partitioning' into master_new

This commit is contained in:
CJ Cobb
2021-04-28 19:55:25 +00:00
10 changed files with 357 additions and 86 deletions

View File

@@ -99,15 +99,19 @@ def parseLogs(filename, interval, minTxnCount = 0):
+ objCount + " : " + objCount + " : "
+ txnsPerSecond + " : " + txnsPerSecond + " : "
+ objsPerSecond) + objsPerSecond)
print("Interval Aggregate ( " + str(interval) + " ) [ledgers, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]: ") print("Interval Aggregate ( " + str(interval) + " ) [ledgers, txns, objects, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]: ")
print(str(intervalLedgers) + " : " print(str(intervalLedgers) + " : "
+ str(intervalTxns) + " : "
+ str(intervalObjs) + " : "
+ str(intervalEnd - intervalStart) + " : " + str(intervalEnd - intervalStart) + " : "
+ str(intervalLedgersPerSecond) + " : " + str(intervalLedgersPerSecond) + " : "
+ str(intervalLoadTime/intervalLedgers) + " : " + str(intervalLoadTime/intervalLedgers) + " : "
+ str(intervalTxns/intervalTime) + " : " + str(intervalTxns/intervalTime) + " : "
+ str(intervalObjs/intervalTime)) + str(intervalObjs/intervalTime))
print("Total Aggregate: [ledgers, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]") print("Total Aggregate: [ledgers, txns, objects, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]")
print(str(totalLedgers) + " : " print(str(totalLedgers) + " : "
str(totalTxns) + " : "
+ str(totalObjs) + " : "
+ str(end-start) + " : " + str(end-start) + " : "
+ str(ledgersPerSecond) + " : " + str(ledgersPerSecond) + " : "
+ str(totalLoadTime/totalLedgers) + " : " + str(totalLoadTime/totalLedgers) + " : "

View File

@@ -132,7 +132,7 @@ public:
// Open the database. Set up all of the necessary objects and // Open the database. Set up all of the necessary objects and
// datastructures. After this call completes, the database is ready for use. // datastructures. After this call completes, the database is ready for use.
virtual void virtual void
open() = 0; open(bool readOnly) = 0;
// Close the database, releasing any resources // Close the database, releasing any resources
virtual void virtual void

View File

@@ -273,7 +273,7 @@ CassandraBackend::doOnlineDelete(uint32_t minLedgerToKeep) const
} }
void void
CassandraBackend::open() CassandraBackend::open(bool readOnly)
{ {
std::cout << config_ << std::endl; std::cout << config_ << std::endl;
auto getString = [this](std::string const& field) -> std::string { auto getString = [this](std::string const& field) -> std::string {

View File

@@ -665,7 +665,7 @@ public:
// Create the table if it doesn't exist already // Create the table if it doesn't exist already
// @param createIfMissing ignored // @param createIfMissing ignored
void void
open() override; open(bool readOnly) override;
// Close the connection to the database // Close the connection to the database
void void

View File

@@ -747,8 +747,17 @@ CREATE TABLE IF NOT EXISTS objects (
key bytea NOT NULL, key bytea NOT NULL,
ledger_seq bigint NOT NULL, ledger_seq bigint NOT NULL,
object bytea, object bytea,
PRIMARY KEY(key, ledger_seq) PRIMARY KEY(ledger_seq, key)
); ) PARTITION BY RANGE (ledger_seq);
create table if not exists objects1 partition of objects for values from (0) to (10000000);
create table if not exists objects2 partition of objects for values from (10000000) to (20000000);
create table if not exists objects3 partition of objects for values from (20000000) to (30000000);
create table if not exists objects4 partition of objects for values from (30000000) to (40000000);
create table if not exists objects5 partition of objects for values from (40000000) to (50000000);
create table if not exists objects6 partition of objects for values from (50000000) to (60000000);
create table if not exists objects7 partition of objects for values from (60000000) to (70000000);
-- Index for lookups by ledger hash. -- Index for lookups by ledger hash.
CREATE INDEX IF NOT EXISTS ledgers_ledger_hash_idx ON ledgers CREATE INDEX IF NOT EXISTS ledgers_ledger_hash_idx ON ledgers
@@ -757,24 +766,39 @@ CREATE INDEX IF NOT EXISTS ledgers_ledger_hash_idx ON ledgers
-- Transactions table. Deletes from the ledger table -- Transactions table. Deletes from the ledger table
-- cascade here based on ledger_seq. -- cascade here based on ledger_seq.
CREATE TABLE IF NOT EXISTS transactions ( CREATE TABLE IF NOT EXISTS transactions (
hash bytea PRIMARY KEY, hash bytea NOT NULL,
ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE, ledger_seq bigint NOT NULL ,
transaction bytea NOT NULL, transaction bytea NOT NULL,
metadata bytea NOT NULL metadata bytea NOT NULL
); ) PARTITION BY RANGE(ledger_seq);
-- Index for lookups by ledger hash. create table if not exists transactions1 partition of transactions for values from (0) to (10000000);
CREATE INDEX IF NOT EXISTS ledgers_ledger_seq_idx ON transactions create table if not exists transactions2 partition of transactions for values from (10000000) to (20000000);
USING hash (ledger_seq); create table if not exists transactions3 partition of transactions for values from (20000000) to (30000000);
create table if not exists transactions4 partition of transactions for values from (30000000) to (40000000);
create table if not exists transactions5 partition of transactions for values from (40000000) to (50000000);
create table if not exists transactions6 partition of transactions for values from (50000000) to (60000000);
create table if not exists transactions7 partition of transactions for values from (60000000) to (70000000);
create index if not exists tx_by_hash on transactions using hash (hash);
create index if not exists tx_by_lgr_seq on transactions using hash (ledger_seq);
-- Table that maps accounts to transactions affecting them. Deletes from the -- Table that maps accounts to transactions affecting them. Deletes from the
-- ledger table cascade here based on ledger_seq. -- ledger table cascade here based on ledger_seq.
CREATE TABLE IF NOT EXISTS account_transactions ( CREATE TABLE IF NOT EXISTS account_transactions (
account bytea NOT NULL, account bytea NOT NULL,
ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE, ledger_seq bigint NOT NULL ,
transaction_index bigint NOT NULL, transaction_index bigint NOT NULL,
hash bytea NOT NULL, hash bytea NOT NULL,
PRIMARY KEY (account, ledger_seq, transaction_index) PRIMARY KEY (account, ledger_seq, transaction_index, hash)
); ) PARTITION BY RANGE (ledger_seq);
create table if not exists account_transactions1 partition of account_transactions for values from (0) to (10000000);
create table if not exists account_transactions2 partition of account_transactions for values from (10000000) to (20000000);
create table if not exists account_transactions3 partition of account_transactions for values from (20000000) to (30000000);
create table if not exists account_transactions4 partition of account_transactions for values from (30000000) to (40000000);
create table if not exists account_transactions5 partition of account_transactions for values from (40000000) to (50000000);
create table if not exists account_transactions6 partition of account_transactions for values from (50000000) to (60000000);
create table if not exists account_transactions7 partition of account_transactions for values from (60000000) to (70000000);
-- Table that maps a book to a list of offers in that book. Deletes from the ledger table -- Table that maps a book to a list of offers in that book. Deletes from the ledger table
-- cascade here based on ledger_seq. -- cascade here based on ledger_seq.
CREATE TABLE IF NOT EXISTS books ( CREATE TABLE IF NOT EXISTS books (
@@ -785,6 +809,113 @@ CREATE TABLE IF NOT EXISTS books (
PRIMARY KEY(book, offer_key, deleted) PRIMARY KEY(book, offer_key, deleted)
); );
-- account_tx() RPC helper. From the rippled reporting process, only the
-- parameters without defaults are required. For the parameters with
-- defaults, validation should be done by rippled, such as:
-- _in_account_id should be a valid xrp base58 address.
-- _in_forward either true or false according to the published api
-- _in_limit should be validated and not simply passed through from
-- client.
--
-- For _in_ledger_index_min and _in_ledger_index_max, if passed in the
-- request, verify that their type is int and pass through as is.
-- For _ledger_hash, verify and convert from hex length 32 bytes and
-- prepend with \x (\\x C++).
--
-- For _in_ledger_index, if the input type is integer, then pass through
-- as is. If the type is string and contents = validated, then do not
-- set _in_ledger_index. Instead set _in_invalidated to TRUE.
--
-- There is no need for rippled to do any type of lookup on max/min
-- ledger range, lookup of hash, or the like. This functions does those
-- things, including error responses if bad input. Only the above must
-- be done to set the correct search range.
--
-- If a marker is present in the request, verify the members 'ledger'
-- and 'seq' are integers and they correspond to _in_marker_seq
-- _in_marker_index.
-- To reiterate:
-- JSON input field 'ledger' corresponds to _in_marker_seq
-- JSON input field 'seq' corresponds to _in_marker_index
CREATE OR REPLACE FUNCTION account_tx(
_in_account_id bytea,
_in_limit bigint,
_in_marker_seq bigint DEFAULT NULL::bigint,
_in_marker_index bigint DEFAULT NULL::bigint)
RETURNS jsonb
AS $$
DECLARE
_min bigint;
_max bigint;
_marker bool;
_between_min bigint;
_between_max bigint;
_sql text;
_cursor refcursor;
_result jsonb;
_record record;
_tally bigint := 0;
_ret_marker jsonb;
_transactions jsonb[] := '{}';
BEGIN
_min := min_ledger();
_max := max_ledger();
IF _in_marker_seq IS NOT NULL OR _in_marker_index IS NOT NULL THEN
_marker := TRUE;
IF _in_marker_seq IS NULL OR _in_marker_index IS NULL THEN
-- The rippled implementation returns no transaction results
-- if either of these values are missing.
_between_min := 0;
_between_max := 0;
ELSE
_between_min := _min;
_between_max := _in_marker_seq;
END IF;
ELSE
_marker := FALSE;
_between_min := _min;
_between_max := _max;
END IF;
_sql := format('SELECT hash, ledger_seq, transaction_index FROM account_transactions WHERE account = $1
AND ledger_seq BETWEEN $2 AND $3 ORDER BY ledger_seq DESC, transaction_index DESC');
OPEN _cursor FOR EXECUTE _sql USING _in_account_id, _between_min, _between_max;
LOOP
FETCH _cursor INTO _record;
IF _record IS NULL THEN EXIT; END IF;
IF _marker IS TRUE THEN
IF _in_marker_seq = _record.ledger_seq THEN
IF _in_marker_index < _record.transaction_index THEN
CONTINUE;
END IF;
END IF;
_marker := FALSE;
END IF;
_tally := _tally + 1;
IF _tally > _in_limit THEN
_ret_marker := jsonb_build_object(
'ledger_sequence', _record.ledger_seq,
'transaction_index', _record.transaction_index);
EXIT;
END IF;
-- Is the transaction index in the tx object?
_transactions := _transactions || jsonb_build_object('hash',_record.hash);
END LOOP;
CLOSE _cursor;
_result := jsonb_build_object('ledger_index_min', _min,
'ledger_index_max', _max,
'transactions', _transactions);
IF _ret_marker IS NOT NULL THEN
_result := _result || jsonb_build_object('cursor', _ret_marker);
END IF;
RETURN _result;
END;
$$ LANGUAGE plpgsql;
-- Avoid inadvertent administrative tampering with committed data. -- Avoid inadvertent administrative tampering with committed data.
CREATE OR REPLACE RULE ledgers_update_protect AS ON UPDATE TO CREATE OR REPLACE RULE ledgers_update_protect AS ON UPDATE TO
ledgers DO INSTEAD NOTHING; ledgers DO INSTEAD NOTHING;

View File

@@ -1,3 +1,4 @@
#include <boost/asio.hpp>
#include <boost/format.hpp> #include <boost/format.hpp>
#include <reporting/PostgresBackend.h> #include <reporting/PostgresBackend.h>
namespace Backend { namespace Backend {
@@ -5,6 +6,10 @@ namespace Backend {
PostgresBackend::PostgresBackend(boost::json::object const& config) PostgresBackend::PostgresBackend(boost::json::object const& config)
: pgPool_(make_PgPool(config)), writeConnection_(pgPool_) : pgPool_(make_PgPool(config)), writeConnection_(pgPool_)
{ {
if (config.contains("write_interval"))
{
writeInterval_ = config.at("write_interval").as_int64();
}
} }
void void
PostgresBackend::writeLedger( PostgresBackend::writeLedger(
@@ -66,9 +71,13 @@ PostgresBackend::writeLedgerObject(
numRowsInObjectsBuffer_++; numRowsInObjectsBuffer_++;
// If the buffer gets too large, the insert fails. Not sure why. So we // If the buffer gets too large, the insert fails. Not sure why. So we
// insert after 1 million records // insert after 1 million records
if (numRowsInObjectsBuffer_ % 1000000 == 0) if (numRowsInObjectsBuffer_ % writeInterval_ == 0)
{ {
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Flushing large buffer. num objects = "
<< numRowsInObjectsBuffer_;
writeConnection_.bulkInsert("objects", objectsBuffer_.str()); writeConnection_.bulkInsert("objects", objectsBuffer_.str());
BOOST_LOG_TRIVIAL(info) << __func__ << " Flushed large buffer";
objectsBuffer_ = {}; objectsBuffer_ = {};
} }
@@ -410,34 +419,90 @@ std::vector<TransactionAndMetadata>
PostgresBackend::fetchTransactions( PostgresBackend::fetchTransactions(
std::vector<ripple::uint256> const& hashes) const std::vector<ripple::uint256> const& hashes) const
{ {
PgQuery pgQuery(pgPool_); std::vector<TransactionAndMetadata> results;
pgQuery("SET statement_timeout TO 10000"); constexpr bool doAsync = true;
std::stringstream sql; if (doAsync)
sql << "SELECT transaction,metadata,ledger_seq FROM transactions "
"WHERE ";
bool first = true;
for (auto const& hash : hashes)
{ {
if (!first) auto start = std::chrono::system_clock::now();
sql << " OR "; auto end = std::chrono::system_clock::now();
sql << "HASH = \'\\x" << ripple::strHex(hash) << "\'"; auto duration = ((end - start).count()) / 1000000000.0;
first = false; BOOST_LOG_TRIVIAL(info) << __func__ << " created threadpool. took "
} << std::to_string(duration);
auto res = pgQuery(sql.str().data()); results.resize(hashes.size());
if (size_t numRows = checkResult(res, 3)) std::condition_variable cv;
{ std::mutex mtx;
std::vector<TransactionAndMetadata> results; std::atomic_uint numRemaining = hashes.size();
for (size_t i = 0; i < numRows; ++i) for (size_t i = 0; i < hashes.size(); ++i)
{ {
results.push_back( auto const& hash = hashes[i];
{res.asUnHexedBlob(i, 0), boost::asio::post(
res.asUnHexedBlob(i, 1), pool_, [this, &hash, &results, &numRemaining, &cv, &mtx, i]() {
res.asBigInt(i, 2)}); BOOST_LOG_TRIVIAL(debug)
} << __func__ << " getting txn = " << i;
return results; PgQuery pgQuery(pgPool_);
} std::stringstream sql;
sql << "SELECT transaction,metadata,ledger_seq FROM "
"transactions "
"WHERE HASH = \'\\x"
<< ripple::strHex(hash) << "\'";
return {}; auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 3))
{
results[i] = {
res.asUnHexedBlob(0, 0),
res.asUnHexedBlob(0, 1),
res.asBigInt(0, 2)};
}
if (--numRemaining == 0)
{
std::unique_lock lck(mtx);
cv.notify_one();
}
});
}
std::unique_lock lck(mtx);
cv.wait(lck, [&numRemaining]() { return numRemaining == 0; });
auto end2 = std::chrono::system_clock::now();
duration = ((end2 - end).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " fetched " << std::to_string(hashes.size())
<< " transactions with threadpool. took "
<< std::to_string(duration);
}
else
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
std::stringstream sql;
for (size_t i = 0; i < hashes.size(); ++i)
{
auto const& hash = hashes[i];
sql << "SELECT transaction,metadata,ledger_seq FROM "
"transactions "
"WHERE HASH = \'\\x"
<< ripple::strHex(hash) << "\'";
if (i + 1 < hashes.size())
sql << " UNION ALL ";
}
auto start = std::chrono::system_clock::now();
auto res = pgQuery(sql.str().data());
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " fetched " << std::to_string(hashes.size())
<< " transactions with union all. took "
<< std::to_string(duration);
if (size_t numRows = checkResult(res, 3))
{
for (size_t i = 0; i < numRows; ++i)
results.push_back(
{res.asUnHexedBlob(i, 0),
res.asUnHexedBlob(i, 1),
res.asBigInt(i, 2)});
}
}
return results;
} }
std::vector<Blob> std::vector<Blob>
@@ -493,44 +558,77 @@ PostgresBackend::fetchAccountTransactions(
{ {
PgQuery pgQuery(pgPool_); PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000"); pgQuery("SET statement_timeout TO 10000");
std::stringstream sql; pg_params dbParams;
sql << "SELECT hash, ledger_seq, transaction_index FROM "
"account_transactions WHERE account = "
<< "\'\\x" << ripple::strHex(account) << "\'";
if (cursor)
sql << " AND (ledger_seq < " << cursor->ledgerSequence
<< " OR (ledger_seq = " << cursor->ledgerSequence
<< " AND transaction_index < " << cursor->transactionIndex << "))";
sql << " ORDER BY ledger_seq DESC, transaction_index DESC";
sql << " LIMIT " << std::to_string(limit);
BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << sql.str();
auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 3))
{
std::vector<ripple::uint256> hashes;
for (size_t i = 0; i < numRows; ++i)
{
hashes.push_back(res.asUInt256(i, 0));
}
if (numRows == limit) char const*& command = dbParams.first;
{ std::vector<std::optional<std::string>>& values = dbParams.second;
AccountTransactionsCursor retCursor{ command =
res.asBigInt(numRows - 1, 1), res.asBigInt(numRows - 1, 2)}; "SELECT account_tx($1::bytea, $2::bigint, "
return {fetchTransactions(hashes), {retCursor}}; "$3::bigint, $4::bigint)";
} values.resize(4);
else values[0] = "\\x" + strHex(account);
{
return {fetchTransactions(hashes), {}}; values[1] = std::to_string(limit);
}
if (cursor)
{
values[2] = std::to_string(cursor->ledgerSequence);
values[3] = std::to_string(cursor->transactionIndex);
} }
return {}; for (size_t i = 0; i < values.size(); ++i)
} {
BOOST_LOG_TRIVIAL(debug) << "value " << std::to_string(i) << " = "
<< (values[i] ? values[i].value() : "null");
}
auto start = std::chrono::system_clock::now();
auto res = pgQuery(dbParams);
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " : executed stored_procedure in "
<< std::to_string(duration)
<< " num records = " << std::to_string(checkResult(res, 1));
checkResult(res, 1);
char const* resultStr = res.c_str();
BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "postgres result = " << resultStr
<< " : account = " << strHex(account);
boost::json::value raw = boost::json::parse(resultStr);
boost::json::object responseObj = raw.as_object();
BOOST_LOG_TRIVIAL(debug) << " parsed = " << responseObj;
if (responseObj.contains("transactions"))
{
auto txns = responseObj.at("transactions").as_array();
std::vector<ripple::uint256> hashes;
for (auto& hashHex : txns)
{
ripple::uint256 hash;
if (hash.parseHex(hashHex.at("hash").as_string().c_str() + 2))
hashes.push_back(hash);
}
if (responseObj.contains("cursor"))
{
return {
fetchTransactions(hashes),
{{responseObj.at("cursor").at("ledger_sequence").as_int64(),
responseObj.at("cursor")
.at("transaction_index")
.as_int64()}}};
}
return {fetchTransactions(hashes), {}};
}
return {{}, {}};
} // namespace Backend
void void
PostgresBackend::open() PostgresBackend::open(bool readOnly)
{ {
initSchema(pgPool_); if (!readOnly)
initSchema(pgPool_);
} }
void void
@@ -641,12 +739,13 @@ PostgresBackend::doOnlineDelete(uint32_t minLedgerToKeep) const
} }
else else
{ {
// This is rather unelegant. For a deleted object, we don't // This is rather unelegant. For a deleted object, we
// know its type just from the key (or do we?). So, we just // don't know its type just from the key (or do we?).
// assume it is an offer and try to delete it. The // So, we just assume it is an offer and try to delete
// alternative is to read the actual object out of the db // it. The alternative is to read the actual object out
// from before it was deleted. This could result in a lot of // of the db from before it was deleted. This could
// individual reads though, so we chose to just delete // result in a lot of individual reads though, so we
// chose to just delete
deleteOffer = true; deleteOffer = true;
} }
if (deleteOffer) if (deleteOffer)

View File

@@ -15,6 +15,8 @@ private:
std::shared_ptr<PgPool> pgPool_; std::shared_ptr<PgPool> pgPool_;
mutable PgQuery writeConnection_; mutable PgQuery writeConnection_;
mutable bool abortWrite_ = false; mutable bool abortWrite_ = false;
mutable boost::asio::thread_pool pool_{200};
uint32_t writeInterval_ = 1000000;
public: public:
PostgresBackend(boost::json::object const& config); PostgresBackend(boost::json::object const& config);
@@ -99,7 +101,7 @@ public:
std::vector<AccountTransactionsData>&& data) const override; std::vector<AccountTransactionsData>&& data) const override;
void void
open() override; open(bool readOnly) override;
void void
close() override; close() override;

View File

@@ -303,7 +303,24 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
std::move(bookDir)); std::move(bookDir));
} }
flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); flatMapBackend_->writeAccountTransactions(std::move(accountTxData));
bool success = flatMapBackend_->finishWrites(); accumTxns_ += rawData.transactions_list().transactions_size();
bool success = true;
if (accumTxns_ > txnThreshold_)
{
auto start = std::chrono::system_clock::now();
success = flatMapBackend_->finishWrites();
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Accumulated " << std::to_string(accumTxns_)
<< " transactions. Wrote in " << std::to_string(duration)
<< " transactions per second = "
<< std::to_string(accumTxns_ / duration);
accumTxns_ = 0;
}
else
BOOST_LOG_TRIVIAL(info) << __func__ << " skipping commit";
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : " << __func__ << " : "
<< "Inserted/modified/deleted all objects. Number of objects = " << "Inserted/modified/deleted all objects. Number of objects = "
@@ -694,7 +711,6 @@ ReportingETL::ReportingETL(
networkValidatedLedgers_, networkValidatedLedgers_,
ioc) ioc)
{ {
flatMapBackend_->open();
if (config.contains("start_sequence")) if (config.contains("start_sequence"))
startSequence_ = config.at("start_sequence").as_int64(); startSequence_ = config.at("start_sequence").as_int64();
if (config.contains("finish_sequence")) if (config.contains("finish_sequence"))
@@ -705,5 +721,8 @@ ReportingETL::ReportingETL(
onlineDeleteInterval_ = config.at("online_delete").as_int64(); onlineDeleteInterval_ = config.at("online_delete").as_int64();
if (config.contains("extractor_threads")) if (config.contains("extractor_threads"))
extractorThreads_ = config.at("extractor_threads").as_int64(); extractorThreads_ = config.at("extractor_threads").as_int64();
if (config.contains("txn_threshold"))
txnThreshold_ = config.at("txn_threshold").as_int64();
flatMapBackend_->open(readOnly_);
} }

View File

@@ -133,6 +133,9 @@ private:
std::optional<uint32_t> startSequence_; std::optional<uint32_t> startSequence_;
std::optional<uint32_t> finishSequence_; std::optional<uint32_t> finishSequence_;
size_t accumTxns_ = 0;
size_t txnThreshold_ = 0;
/// The time that the most recently published ledger was published. Used by /// The time that the most recently published ledger was published. Used by
/// server_info /// server_info
std::chrono::time_point<std::chrono::system_clock> lastPublish_; std::chrono::time_point<std::chrono::system_clock> lastPublish_;

17
test.py
View File

@@ -137,7 +137,6 @@ def getMinAndMax(res):
minSeq = None minSeq = None
maxSeq = None maxSeq = None
for x in res["transactions"]: for x in res["transactions"]:
print(x)
seq = None seq = None
if "ledger_sequence" in x: if "ledger_sequence" in x:
seq = int(x["ledger_sequence"]) seq = int(x["ledger_sequence"])
@@ -162,10 +161,11 @@ async def account_tx(ip, port, account, binary, minLedger=None, maxLedger=None):
res = json.loads(await ws.recv()) res = json.loads(await ws.recv())
print(json.dumps(res,indent=4,sort_keys=True)) print(json.dumps(res,indent=4,sort_keys=True))
print(res["cursor"])
return res return res
except websockets.exceptions.ConnectionClosedError as e: except websockets.exceptions.ConnectionClosedError as e:
print(e) print(e)
import datetime
async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=None,numPages=10): async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=None,numPages=10):
address = 'ws://' + str(ip) + ':' + str(port) address = 'ws://' + str(ip) + ':' + str(port)
try: try:
@@ -184,8 +184,12 @@ async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=No
if minLedger is not None and maxLedger is not None: if minLedger is not None and maxLedger is not None:
req["ledger_index_min"] = minLedger req["ledger_index_min"] = minLedger
req["ledger_index_max"] = maxLedger req["ledger_index_max"] = maxLedger
print(req)
start = datetime.datetime.now().timestamp()
await ws.send(json.dumps(req)) await ws.send(json.dumps(req))
res = json.loads(await ws.recv()) res = json.loads(await ws.recv())
end = datetime.datetime.now().timestamp()
print(end - start)
#print(json.dumps(res,indent=4,sort_keys=True)) #print(json.dumps(res,indent=4,sort_keys=True))
if "result" in res: if "result" in res:
print(len(res["result"]["transactions"])) print(len(res["result"]["transactions"]))
@@ -202,8 +206,10 @@ async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=No
marker={"ledger":res["result"]["marker"]["ledger"],"seq":res["result"]["marker"]["seq"]} marker={"ledger":res["result"]["marker"]["ledger"],"seq":res["result"]["marker"]["seq"]}
print(marker) print(marker)
else: else:
print(res)
break break
if numCalls > numPages: if numCalls > numPages:
print("breaking")
break break
return results return results
except websockets.exceptions.ConnectionClosedError as e: except websockets.exceptions.ConnectionClosedError as e:
@@ -615,10 +621,17 @@ def run(args):
res = asyncio.get_event_loop().run_until_complete(tx(args.ip,args.port,args.hash,False)) res = asyncio.get_event_loop().run_until_complete(tx(args.ip,args.port,args.hash,False))
args.account = res["transaction"]["Account"] args.account = res["transaction"]["Account"]
print("starting")
res = asyncio.get_event_loop().run_until_complete( res = asyncio.get_event_loop().run_until_complete(
account_tx_full(args.ip, args.port, args.account, args.binary,None,None,int(args.numPages))) account_tx_full(args.ip, args.port, args.account, args.binary,None,None,int(args.numPages)))
rng = getMinAndMax(res) rng = getMinAndMax(res)
print(len(res["transactions"])) print(len(res["transactions"]))
print(args.account)
txs = set()
for x in res["transactions"]:
txs.add((x["transaction"],x["ledger_sequence"]))
print(len(txs))
if args.verify: if args.verify:
print("requesting p2p node") print("requesting p2p node")
res2 = asyncio.get_event_loop().run_until_complete( res2 = asyncio.get_event_loop().run_until_complete(