add date to transaction tables

This commit is contained in:
CJ Cobb
2021-09-07 10:46:10 -04:00
parent 553be19882
commit cc76ece6f2
7 changed files with 55 additions and 45 deletions

View File

@@ -36,6 +36,7 @@ struct TransactionAndMetadata
Blob transaction; Blob transaction;
Blob metadata; Blob metadata;
uint32_t ledgerSequence; uint32_t ledgerSequence;
uint32_t date;
bool bool
operator==(const TransactionAndMetadata&) const = default; operator==(const TransactionAndMetadata&) const = default;
}; };
@@ -201,15 +202,14 @@ protected:
bool isFirst = false) const = 0; bool isFirst = false) const = 0;
void void
writeLedgerObject( writeLedgerObject(std::string&& key, uint32_t seq, std::string&& blob)
std::string&& key, const;
uint32_t seq,
std::string&& blob) const;
virtual void virtual void
writeTransaction( writeTransaction(
std::string&& hash, std::string&& hash,
uint32_t seq, uint32_t seq,
uint32_t date,
std::string&& transaction, std::string&& transaction,
std::string&& metadata) const = 0; std::string&& metadata) const = 0;
@@ -257,10 +257,8 @@ private:
std::uint32_t limit) const = 0; std::uint32_t limit) const = 0;
virtual void virtual void
doWriteLedgerObject( doWriteLedgerObject(std::string&& key, uint32_t seq, std::string&& blob)
std::string&& key, const = 0;
uint32_t seq,
std::string&& blob) const = 0;
virtual bool virtual bool
doFinishWrites() const = 0; doFinishWrites() const = 0;

View File

@@ -226,6 +226,7 @@ void
CassandraBackend::writeTransaction( CassandraBackend::writeTransaction(
std::string&& hash, std::string&& hash,
uint32_t seq, uint32_t seq,
uint32_t date,
std::string&& transaction, std::string&& transaction,
std::string&& metadata) const std::string&& metadata) const
{ {
@@ -242,12 +243,17 @@ CassandraBackend::writeTransaction(
makeAndExecuteAsyncWrite( makeAndExecuteAsyncWrite(
this, this,
std::move(std::make_tuple( std::move(std::make_tuple(
std::move(hash), seq, std::move(transaction), std::move(metadata))), std::move(hash),
seq,
date,
std::move(transaction),
std::move(metadata))),
[this](auto& params) { [this](auto& params) {
CassandraStatement statement{insertTransaction_}; CassandraStatement statement{insertTransaction_};
auto& [hash, sequence, transaction, metadata] = params.data; auto& [hash, sequence, date, transaction, metadata] = params.data;
statement.bindNextBytes(hash); statement.bindNextBytes(hash);
statement.bindNextInt(sequence); statement.bindNextInt(sequence);
statement.bindNextInt(date);
statement.bindNextBytes(transaction); statement.bindNextBytes(transaction);
statement.bindNextBytes(metadata); statement.bindNextBytes(metadata);
return statement; return statement;
@@ -346,6 +352,7 @@ CassandraBackend::fetchTransactions(
results[i] = { results[i] = {
result.getBytes(), result.getBytes(),
result.getBytes(), result.getBytes(),
result.getUInt32(),
result.getUInt32()}; result.getUInt32()};
})); }));
executeAsyncRead(statement, processAsyncRead, *cbs[i]); executeAsyncRead(statement, processAsyncRead, *cbs[i]);
@@ -1058,11 +1065,11 @@ CassandraBackend::open(bool readOnly)
continue; continue;
query.str(""); query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions" query
<< " ( hash blob PRIMARY KEY, ledger_sequence bigint, " << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions"
"transaction " << " ( hash blob PRIMARY KEY, ledger_sequence bigint, date bigint, "
"blob, metadata blob)" "transaction blob, metadata blob)"
<< " WITH default_time_to_live = " << std::to_string(ttl); << " WITH default_time_to_live = " << std::to_string(ttl);
if (!executeSimpleStatement(query.str())) if (!executeSimpleStatement(query.str()))
continue; continue;
query.str(""); query.str("");
@@ -1079,15 +1086,8 @@ CassandraBackend::open(bool readOnly)
<< " LIMIT 1"; << " LIMIT 1";
if (!executeSimpleStatement(query.str())) if (!executeSimpleStatement(query.str()))
continue; continue;
query.str(""); query.str("");
query << "CREATE INDEX ON " << tablePrefix query << "SELECT * FROM " << tablePrefix << "ledger_transactions"
<< "transactions(ledger_sequence)";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "SELECT * FROM " << tablePrefix
<< "transactions WHERE ledger_sequence = 1"
<< " LIMIT 1"; << " LIMIT 1";
if (!executeSimpleStatement(query.str())) if (!executeSimpleStatement(query.str()))
continue; continue;
@@ -1179,9 +1179,8 @@ CassandraBackend::open(bool readOnly)
query.str(""); query.str("");
query << "INSERT INTO " << tablePrefix << "transactions" query << "INSERT INTO " << tablePrefix << "transactions"
<< " (hash, ledger_sequence, transaction, metadata) VALUES " << " (hash, ledger_sequence, date, transaction, metadata) VALUES "
"(?, ?, " "(?, ?, ?, ?, ?)";
"?, ?)";
if (!insertTransaction_.prepareStatement(query, session_.get())) if (!insertTransaction_.prepareStatement(query, session_.get()))
continue; continue;
query.str(""); query.str("");
@@ -1213,7 +1212,7 @@ CassandraBackend::open(bool readOnly)
continue; continue;
query.str(""); query.str("");
query << "SELECT transaction, metadata, ledger_sequence FROM " query << "SELECT transaction, metadata, ledger_sequence, date FROM "
<< tablePrefix << "transactions" << tablePrefix << "transactions"
<< " WHERE hash = ?"; << " WHERE hash = ?";
if (!selectTransaction_.prepareStatement(query, session_.get())) if (!selectTransaction_.prepareStatement(query, session_.get()))

View File

@@ -806,7 +806,11 @@ public:
BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows"; BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows";
return {}; return {};
} }
return {{result.getBytes(), result.getBytes(), result.getUInt32()}}; return {
{result.getBytes(),
result.getBytes(),
result.getUInt32(),
result.getUInt32()}};
} }
LedgerPage LedgerPage
doFetchLedgerPage( doFetchLedgerPage(
@@ -841,6 +845,7 @@ public:
writeTransaction( writeTransaction(
std::string&& hash, std::string&& hash,
uint32_t seq, uint32_t seq,
uint32_t date,
std::string&& transaction, std::string&& transaction,
std::string&& metadata) const override; std::string&& metadata) const override;

View File

@@ -85,15 +85,16 @@ void
PostgresBackend::writeTransaction( PostgresBackend::writeTransaction(
std::string&& hash, std::string&& hash,
uint32_t seq, uint32_t seq,
uint32_t date,
std::string&& transaction, std::string&& transaction,
std::string&& metadata) const std::string&& metadata) const
{ {
if (abortWrite_) if (abortWrite_)
return; return;
transactionsBuffer_ << "\\\\x" << ripple::strHex(hash) << '\t' transactionsBuffer_ << "\\\\x" << ripple::strHex(hash) << '\t'
<< std::to_string(seq) << '\t' << "\\\\x" << std::to_string(seq) << '\t' << std::to_string(date)
<< ripple::strHex(transaction) << '\t' << "\\\\x" << '\t' << "\\\\x" << ripple::strHex(transaction)
<< ripple::strHex(metadata) << '\n'; << '\t' << "\\\\x" << ripple::strHex(metadata) << '\n';
} }
uint32_t uint32_t
@@ -275,7 +276,7 @@ PostgresBackend::fetchTransaction(ripple::uint256 const& hash) const
PgQuery pgQuery(pgPool_); PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000"); pgQuery("SET statement_timeout TO 10000");
std::stringstream sql; std::stringstream sql;
sql << "SELECT transaction,metadata,ledger_seq FROM transactions " sql << "SELECT transaction,metadata,ledger_seq,date FROM transactions "
"WHERE hash = " "WHERE hash = "
<< "\'\\x" << ripple::strHex(hash) << "\'"; << "\'\\x" << ripple::strHex(hash) << "\'";
auto res = pgQuery(sql.str().data()); auto res = pgQuery(sql.str().data());
@@ -284,7 +285,8 @@ PostgresBackend::fetchTransaction(ripple::uint256 const& hash) const
return { return {
{res.asUnHexedBlob(0, 0), {res.asUnHexedBlob(0, 0),
res.asUnHexedBlob(0, 1), res.asUnHexedBlob(0, 1),
res.asBigInt(0, 2)}}; res.asBigInt(0, 2),
res.asBigInt(0, 3)}};
} }
return {}; return {};
@@ -295,7 +297,8 @@ PostgresBackend::fetchAllTransactionsInLedger(uint32_t ledgerSequence) const
PgQuery pgQuery(pgPool_); PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000"); pgQuery("SET statement_timeout TO 10000");
std::stringstream sql; std::stringstream sql;
sql << "SELECT transaction, metadata, ledger_seq FROM transactions WHERE " sql << "SELECT transaction, metadata, ledger_seq,date FROM transactions "
"WHERE "
<< "ledger_seq = " << std::to_string(ledgerSequence); << "ledger_seq = " << std::to_string(ledgerSequence);
auto res = pgQuery(sql.str().data()); auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 3)) if (size_t numRows = checkResult(res, 3))
@@ -306,7 +309,8 @@ PostgresBackend::fetchAllTransactionsInLedger(uint32_t ledgerSequence) const
txns.push_back( txns.push_back(
{res.asUnHexedBlob(i, 0), {res.asUnHexedBlob(i, 0),
res.asUnHexedBlob(i, 1), res.asUnHexedBlob(i, 1),
res.asBigInt(i, 2)}); res.asBigInt(i, 2),
res.asBigInt(i, 3)});
} }
return txns; return txns;
} }
@@ -410,7 +414,7 @@ PostgresBackend::fetchTransactions(
<< __func__ << " getting txn = " << i; << __func__ << " getting txn = " << i;
PgQuery pgQuery(pgPool_); PgQuery pgQuery(pgPool_);
std::stringstream sql; std::stringstream sql;
sql << "SELECT transaction,metadata,ledger_seq FROM " sql << "SELECT transaction,metadata,ledger_seq,date FROM "
"transactions " "transactions "
"WHERE HASH = \'\\x" "WHERE HASH = \'\\x"
<< ripple::strHex(hash) << "\'"; << ripple::strHex(hash) << "\'";
@@ -421,7 +425,8 @@ PostgresBackend::fetchTransactions(
results[i] = { results[i] = {
res.asUnHexedBlob(0, 0), res.asUnHexedBlob(0, 0),
res.asUnHexedBlob(0, 1), res.asUnHexedBlob(0, 1),
res.asBigInt(0, 2)}; res.asBigInt(0, 2),
res.asBigInt(0, 3)};
} }
if (--numRemaining == 0) if (--numRemaining == 0)
{ {
@@ -447,7 +452,7 @@ PostgresBackend::fetchTransactions(
for (size_t i = 0; i < hashes.size(); ++i) for (size_t i = 0; i < hashes.size(); ++i)
{ {
auto const& hash = hashes[i]; auto const& hash = hashes[i];
sql << "SELECT transaction,metadata,ledger_seq FROM " sql << "SELECT transaction,metadata,ledger_seq,date FROM "
"transactions " "transactions "
"WHERE HASH = \'\\x" "WHERE HASH = \'\\x"
<< ripple::strHex(hash) << "\'"; << ripple::strHex(hash) << "\'";
@@ -468,7 +473,8 @@ PostgresBackend::fetchTransactions(
results.push_back( results.push_back(
{res.asUnHexedBlob(i, 0), {res.asUnHexedBlob(i, 0),
res.asUnHexedBlob(i, 1), res.asUnHexedBlob(i, 1),
res.asBigInt(i, 2)}); res.asBigInt(i, 2),
res.asBigInt(i, 3)});
} }
} }
return results; return results;

View File

@@ -76,15 +76,14 @@ public:
bool isFirst) const override; bool isFirst) const override;
void void
doWriteLedgerObject( doWriteLedgerObject(std::string&& key, uint32_t seq, std::string&& blob)
std::string&& key, const override;
uint32_t seq,
std::string&& blob) const override;
void void
writeTransaction( writeTransaction(
std::string&& hash, std::string&& hash,
uint32_t seq, uint32_t seq,
uint32_t date,
std::string&& transaction, std::string&& transaction,
std::string&& metadata) const override; std::string&& metadata) const override;

View File

@@ -79,6 +79,7 @@ ReportingETL::insertTransactions(
backend_->writeTransaction( backend_->writeTransaction(
std::move(keyStr), std::move(keyStr),
ledger.seq, ledger.seq,
ledger.closeTime.time_since_epoch().count(),
std::move(*raw), std::move(*raw),
std::move(*txn.mutable_metadata_blob())); std::move(*txn.mutable_metadata_blob()));
} }

View File

@@ -256,6 +256,7 @@ TEST(BackendTest, Basic)
backend->writeTransaction( backend->writeTransaction(
std::move(std::string{hashBlob}), std::move(std::string{hashBlob}),
lgrInfoNext.seq, lgrInfoNext.seq,
lgrInfoNext.closeTime.time_since_epoch().count(),
std::move(std::string{txnBlob}), std::move(std::string{txnBlob}),
std::move(std::string{metaBlob})); std::move(std::string{metaBlob}));
backend->writeAccountTransactions(std::move(accountTxData)); backend->writeAccountTransactions(std::move(accountTxData));
@@ -470,6 +471,7 @@ TEST(BackendTest, Basic)
backend->writeTransaction( backend->writeTransaction(
std::move(hash), std::move(hash),
lgrInfo.seq, lgrInfo.seq,
lgrInfo.closeTime.time_since_epoch().count(),
std::move(txn), std::move(txn),
std::move(meta)); std::move(meta));
} }
@@ -507,7 +509,7 @@ TEST(BackendTest, Basic)
for (auto [hash, txn, meta] : txns) for (auto [hash, txn, meta] : txns)
{ {
bool found = false; bool found = false;
for (auto [retTxn, retMeta, retSeq] : retTxns) for (auto [retTxn, retMeta, retSeq, retDate] : retTxns)
{ {
if (std::strncmp( if (std::strncmp(
(const char*)retTxn.data(), (const char*)retTxn.data(),
@@ -538,7 +540,7 @@ TEST(BackendTest, Basic)
EXPECT_EQ(retData.size(), data.size()); EXPECT_EQ(retData.size(), data.size());
for (size_t i = 0; i < retData.size(); ++i) for (size_t i = 0; i < retData.size(); ++i)
{ {
auto [txn, meta, seq] = retData[i]; auto [txn, meta, seq, date] = retData[i];
auto [hash, expTxn, expMeta] = data[i]; auto [hash, expTxn, expMeta] = data[i];
EXPECT_STREQ( EXPECT_STREQ(
(const char*)txn.data(), (const char*)expTxn.data()); (const char*)txn.data(), (const char*)expTxn.data());