From cc76ece6f287ca2ea1385c22f49e23e68940319e Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Tue, 7 Sep 2021 10:46:10 -0400 Subject: [PATCH] add date to transaction tables --- src/backend/BackendInterface.h | 14 ++++++------ src/backend/CassandraBackend.cpp | 37 ++++++++++++++++---------------- src/backend/CassandraBackend.h | 7 +++++- src/backend/PostgresBackend.cpp | 28 ++++++++++++++---------- src/backend/PostgresBackend.h | 7 +++--- src/etl/ReportingETL.cpp | 1 + unittests/main.cpp | 6 ++++-- 7 files changed, 55 insertions(+), 45 deletions(-) diff --git a/src/backend/BackendInterface.h b/src/backend/BackendInterface.h index 9a6b5b6b..f444cb85 100644 --- a/src/backend/BackendInterface.h +++ b/src/backend/BackendInterface.h @@ -36,6 +36,7 @@ struct TransactionAndMetadata Blob transaction; Blob metadata; uint32_t ledgerSequence; + uint32_t date; bool operator==(const TransactionAndMetadata&) const = default; }; @@ -201,15 +202,14 @@ protected: bool isFirst = false) const = 0; void - writeLedgerObject( - std::string&& key, - uint32_t seq, - std::string&& blob) const; + writeLedgerObject(std::string&& key, uint32_t seq, std::string&& blob) + const; virtual void writeTransaction( std::string&& hash, uint32_t seq, + uint32_t date, std::string&& transaction, std::string&& metadata) const = 0; @@ -257,10 +257,8 @@ private: std::uint32_t limit) const = 0; virtual void - doWriteLedgerObject( - std::string&& key, - uint32_t seq, - std::string&& blob) const = 0; + doWriteLedgerObject(std::string&& key, uint32_t seq, std::string&& blob) + const = 0; virtual bool doFinishWrites() const = 0; diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index 312e91b4..a53e2e98 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -226,6 +226,7 @@ void CassandraBackend::writeTransaction( std::string&& hash, uint32_t seq, + uint32_t date, std::string&& transaction, std::string&& metadata) const { @@ -242,12 +243,17 @@ CassandraBackend::writeTransaction( makeAndExecuteAsyncWrite( this, std::move(std::make_tuple( - std::move(hash), seq, std::move(transaction), std::move(metadata))), + std::move(hash), + seq, + date, + std::move(transaction), + std::move(metadata))), [this](auto& params) { CassandraStatement statement{insertTransaction_}; - auto& [hash, sequence, transaction, metadata] = params.data; + auto& [hash, sequence, date, transaction, metadata] = params.data; statement.bindNextBytes(hash); statement.bindNextInt(sequence); + statement.bindNextInt(date); statement.bindNextBytes(transaction); statement.bindNextBytes(metadata); return statement; @@ -346,6 +352,7 @@ CassandraBackend::fetchTransactions( results[i] = { result.getBytes(), result.getBytes(), + result.getUInt32(), result.getUInt32()}; })); executeAsyncRead(statement, processAsyncRead, *cbs[i]); @@ -1058,11 +1065,11 @@ CassandraBackend::open(bool readOnly) continue; query.str(""); - query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions" - << " ( hash blob PRIMARY KEY, ledger_sequence bigint, " - "transaction " - "blob, metadata blob)" - << " WITH default_time_to_live = " << std::to_string(ttl); + query + << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions" + << " ( hash blob PRIMARY KEY, ledger_sequence bigint, date bigint, " + "transaction blob, metadata blob)" + << " WITH default_time_to_live = " << std::to_string(ttl); if (!executeSimpleStatement(query.str())) continue; query.str(""); @@ -1079,15 +1086,8 @@ CassandraBackend::open(bool readOnly) << " LIMIT 1"; if (!executeSimpleStatement(query.str())) continue; - query.str(""); - query << "CREATE INDEX ON " << tablePrefix - << "transactions(ledger_sequence)"; - if (!executeSimpleStatement(query.str())) - continue; - query.str(""); - query << "SELECT * FROM " << tablePrefix - << "transactions WHERE ledger_sequence = 1" + query << "SELECT * FROM " << tablePrefix << "ledger_transactions" << " LIMIT 1"; if (!executeSimpleStatement(query.str())) continue; @@ -1179,9 +1179,8 @@ CassandraBackend::open(bool readOnly) query.str(""); query << "INSERT INTO " << tablePrefix << "transactions" - << " (hash, ledger_sequence, transaction, metadata) VALUES " - "(?, ?, " - "?, ?)"; + << " (hash, ledger_sequence, date, transaction, metadata) VALUES " + "(?, ?, ?, ?, ?)"; if (!insertTransaction_.prepareStatement(query, session_.get())) continue; query.str(""); @@ -1213,7 +1212,7 @@ CassandraBackend::open(bool readOnly) continue; query.str(""); - query << "SELECT transaction, metadata, ledger_sequence FROM " + query << "SELECT transaction, metadata, ledger_sequence, date FROM " << tablePrefix << "transactions" << " WHERE hash = ?"; if (!selectTransaction_.prepareStatement(query, session_.get())) diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index 429a3170..840af709 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -806,7 +806,11 @@ public: BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows"; return {}; } - return {{result.getBytes(), result.getBytes(), result.getUInt32()}}; + return { + {result.getBytes(), + result.getBytes(), + result.getUInt32(), + result.getUInt32()}}; } LedgerPage doFetchLedgerPage( @@ -841,6 +845,7 @@ public: writeTransaction( std::string&& hash, uint32_t seq, + uint32_t date, std::string&& transaction, std::string&& metadata) const override; diff --git a/src/backend/PostgresBackend.cpp b/src/backend/PostgresBackend.cpp index d846a705..73b68d72 100644 --- a/src/backend/PostgresBackend.cpp +++ b/src/backend/PostgresBackend.cpp @@ -85,15 +85,16 @@ void PostgresBackend::writeTransaction( std::string&& hash, uint32_t seq, + uint32_t date, std::string&& transaction, std::string&& metadata) const { if (abortWrite_) return; transactionsBuffer_ << "\\\\x" << ripple::strHex(hash) << '\t' - << std::to_string(seq) << '\t' << "\\\\x" - << ripple::strHex(transaction) << '\t' << "\\\\x" - << ripple::strHex(metadata) << '\n'; + << std::to_string(seq) << '\t' << std::to_string(date) + << '\t' << "\\\\x" << ripple::strHex(transaction) + << '\t' << "\\\\x" << ripple::strHex(metadata) << '\n'; } uint32_t @@ -275,7 +276,7 @@ PostgresBackend::fetchTransaction(ripple::uint256 const& hash) const PgQuery pgQuery(pgPool_); pgQuery("SET statement_timeout TO 10000"); std::stringstream sql; - sql << "SELECT transaction,metadata,ledger_seq FROM transactions " + sql << "SELECT transaction,metadata,ledger_seq,date FROM transactions " "WHERE hash = " << "\'\\x" << ripple::strHex(hash) << "\'"; auto res = pgQuery(sql.str().data()); @@ -284,7 +285,8 @@ PostgresBackend::fetchTransaction(ripple::uint256 const& hash) const return { {res.asUnHexedBlob(0, 0), res.asUnHexedBlob(0, 1), - res.asBigInt(0, 2)}}; + res.asBigInt(0, 2), + res.asBigInt(0, 3)}}; } return {}; @@ -295,7 +297,8 @@ PostgresBackend::fetchAllTransactionsInLedger(uint32_t ledgerSequence) const PgQuery pgQuery(pgPool_); pgQuery("SET statement_timeout TO 10000"); std::stringstream sql; - sql << "SELECT transaction, metadata, ledger_seq FROM transactions WHERE " + sql << "SELECT transaction, metadata, ledger_seq,date FROM transactions " + "WHERE " << "ledger_seq = " << std::to_string(ledgerSequence); auto res = pgQuery(sql.str().data()); if (size_t numRows = checkResult(res, 3)) @@ -306,7 +309,8 @@ PostgresBackend::fetchAllTransactionsInLedger(uint32_t ledgerSequence) const txns.push_back( {res.asUnHexedBlob(i, 0), res.asUnHexedBlob(i, 1), - res.asBigInt(i, 2)}); + res.asBigInt(i, 2), + res.asBigInt(i, 3)}); } return txns; } @@ -410,7 +414,7 @@ PostgresBackend::fetchTransactions( << __func__ << " getting txn = " << i; PgQuery pgQuery(pgPool_); std::stringstream sql; - sql << "SELECT transaction,metadata,ledger_seq FROM " + sql << "SELECT transaction,metadata,ledger_seq,date FROM " "transactions " "WHERE HASH = \'\\x" << ripple::strHex(hash) << "\'"; @@ -421,7 +425,8 @@ PostgresBackend::fetchTransactions( results[i] = { res.asUnHexedBlob(0, 0), res.asUnHexedBlob(0, 1), - res.asBigInt(0, 2)}; + res.asBigInt(0, 2), + res.asBigInt(0, 3)}; } if (--numRemaining == 0) { @@ -447,7 +452,7 @@ PostgresBackend::fetchTransactions( for (size_t i = 0; i < hashes.size(); ++i) { auto const& hash = hashes[i]; - sql << "SELECT transaction,metadata,ledger_seq FROM " + sql << "SELECT transaction,metadata,ledger_seq,date FROM " "transactions " "WHERE HASH = \'\\x" << ripple::strHex(hash) << "\'"; @@ -468,7 +473,8 @@ PostgresBackend::fetchTransactions( results.push_back( {res.asUnHexedBlob(i, 0), res.asUnHexedBlob(i, 1), - res.asBigInt(i, 2)}); + res.asBigInt(i, 2), + res.asBigInt(i, 3)}); } } return results; diff --git a/src/backend/PostgresBackend.h b/src/backend/PostgresBackend.h index dcf06f2c..248be5c8 100644 --- a/src/backend/PostgresBackend.h +++ b/src/backend/PostgresBackend.h @@ -76,15 +76,14 @@ public: bool isFirst) const override; void - doWriteLedgerObject( - std::string&& key, - uint32_t seq, - std::string&& blob) const override; + doWriteLedgerObject(std::string&& key, uint32_t seq, std::string&& blob) + const override; void writeTransaction( std::string&& hash, uint32_t seq, + uint32_t date, std::string&& transaction, std::string&& metadata) const override; diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 82d6a121..3bb3a9fb 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -79,6 +79,7 @@ ReportingETL::insertTransactions( backend_->writeTransaction( std::move(keyStr), ledger.seq, + ledger.closeTime.time_since_epoch().count(), std::move(*raw), std::move(*txn.mutable_metadata_blob())); } diff --git a/unittests/main.cpp b/unittests/main.cpp index 7539c105..4dd41d1b 100644 --- a/unittests/main.cpp +++ b/unittests/main.cpp @@ -256,6 +256,7 @@ TEST(BackendTest, Basic) backend->writeTransaction( std::move(std::string{hashBlob}), lgrInfoNext.seq, + lgrInfoNext.closeTime.time_since_epoch().count(), std::move(std::string{txnBlob}), std::move(std::string{metaBlob})); backend->writeAccountTransactions(std::move(accountTxData)); @@ -470,6 +471,7 @@ TEST(BackendTest, Basic) backend->writeTransaction( std::move(hash), lgrInfo.seq, + lgrInfo.closeTime.time_since_epoch().count(), std::move(txn), std::move(meta)); } @@ -507,7 +509,7 @@ TEST(BackendTest, Basic) for (auto [hash, txn, meta] : txns) { bool found = false; - for (auto [retTxn, retMeta, retSeq] : retTxns) + for (auto [retTxn, retMeta, retSeq, retDate] : retTxns) { if (std::strncmp( (const char*)retTxn.data(), @@ -538,7 +540,7 @@ TEST(BackendTest, Basic) EXPECT_EQ(retData.size(), data.size()); for (size_t i = 0; i < retData.size(); ++i) { - auto [txn, meta, seq] = retData[i]; + auto [txn, meta, seq, date] = retData[i]; auto [hash, expTxn, expMeta] = data[i]; EXPECT_STREQ( (const char*)txn.data(), (const char*)expTxn.data());