diff --git a/reporting/Pg.cpp b/reporting/Pg.cpp index ed2104dd..6224779d 100644 --- a/reporting/Pg.cpp +++ b/reporting/Pg.cpp @@ -743,9 +743,10 @@ CREATE TABLE IF NOT EXISTS ledgers ( CREATE TABLE IF NOT EXISTS objects ( - key bytea PRIMARY KEY, + key bytea NOT NULL, ledger_seq bigint NOT NULL, - object bytea NOT NULL + object bytea NOT NULL, + PRIMARY KEY(key, ledger_seq) ); -- Index for lookups by ledger hash. @@ -759,11 +760,7 @@ CREATE TABLE IF NOT EXISTS transactions ( ledger_seq bigint, transaction bytea, metadata bytea, - transaction_index bigint NOT NULL, - trans_id bytea NOT NULL, - nodestore_hash bytea NOT NULL, - constraint transactions_pkey PRIMARY KEY (ledger_seq, transaction_index), - constraint transactions_fkey FOREIGN KEY (ledger_seq) + FOREIGN KEY (ledger_seq) REFERENCES ledgers (ledger_seq) ON DELETE CASCADE ); diff --git a/reporting/ReportingBackend.h b/reporting/ReportingBackend.h index 443f6036..0d926ec4 100644 --- a/reporting/ReportingBackend.h +++ b/reporting/ReportingBackend.h @@ -138,6 +138,9 @@ private: boost::json::object config_; + mutable uint32_t ledgerSequence_ = 0; + mutable bool isFirstLedger_ = false; + public: CassandraFlatMapBackend(boost::json::object const& config) : config_(config) { @@ -410,33 +413,22 @@ public: { } }; + bool - writeLedger( - ripple::LedgerInfo const& ledgerInfo, - std::string&& header, - bool isFirst = false) const override + finishWrites() const override { - WriteLedgerHeaderCallbackData* headerCb = - new WriteLedgerHeaderCallbackData( - this, ledgerInfo.seq, std::move(header)); - WriteLedgerHashCallbackData* hashCb = new WriteLedgerHashCallbackData( - this, ledgerInfo.hash, ledgerInfo.seq); - ++numRequestsOutstanding_; - ++numRequestsOutstanding_; - writeLedgerHeader(*headerCb, false); - writeLedgerHash(*hashCb, false); // wait for all other writes to finish sync(); // write range - if (isFirst) + if (isFirstLedger_) { CassStatement* statement = cass_prepared_bind(updateLedgerRange_); cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); CassError rc = - cass_statement_bind_int64(statement, 0, ledgerInfo.seq); + cass_statement_bind_int64(statement, 0, ledgerSequence_); rc = cass_statement_bind_bool(statement, 1, cass_false); - rc = cass_statement_bind_int64(statement, 2, ledgerInfo.seq); + rc = cass_statement_bind_int64(statement, 2, ledgerSequence_); CassFuture* fut; do { @@ -456,11 +448,11 @@ public: CassStatement* statement = cass_prepared_bind(updateLedgerRange_); cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); // TODO check rc - CassError rc = cass_statement_bind_int64(statement, 0, ledgerInfo.seq); + CassError rc = cass_statement_bind_int64(statement, 0, ledgerSequence_); assert(rc == CASS_OK); rc = cass_statement_bind_bool(statement, 1, cass_true); assert(rc == CASS_OK); - rc = cass_statement_bind_int64(statement, 2, ledgerInfo.seq); + rc = cass_statement_bind_int64(statement, 2, ledgerSequence_); assert(rc == CASS_OK); CassFuture* fut; do @@ -500,6 +492,22 @@ public: return success == cass_true; } void + writeLedger( + ripple::LedgerInfo const& ledgerInfo, + std::string&& header, + bool isFirst = false) const override + { + WriteLedgerHeaderCallbackData* headerCb = + new WriteLedgerHeaderCallbackData( + this, ledgerInfo.seq, std::move(header)); + WriteLedgerHashCallbackData* hashCb = new WriteLedgerHashCallbackData( + this, ledgerInfo.hash, ledgerInfo.seq); + ++numRequestsOutstanding_; + ++numRequestsOutstanding_; + writeLedgerHeader(*headerCb, false); + writeLedgerHash(*hashCb, false); + } + void writeLedgerHash(WriteLedgerHashCallbackData& cb, bool isRetry) const { { @@ -1958,7 +1966,12 @@ public: } void - sync() const override + startWrites() const override + { + } + + void + sync() const { std::unique_lock lck(syncMutex_); diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 43097bf4..6ec1d3ab 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -114,6 +114,7 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) << __func__ << " : " << "Deserialized ledger header. " << detail::toString(lgrInfo); + flatMapBackend_->startWrites(); std::vector accountTxData = insertTransactions(lgrInfo, *ledgerData); @@ -128,9 +129,10 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) if (!stopping_) { flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); - bool success = flatMapBackend_->writeLedger( + flatMapBackend_->writeLedger( lgrInfo, std::move(*ledgerData->mutable_ledger_header())); } + flatMapBackend_->finishWrites(); auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(debug) << "Time to download and store ledger = " << ((end - start).count()) / 1000000000.0; @@ -254,6 +256,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "Deserialized ledger header. " << detail::toString(lgrInfo); + flatMapBackend_->startWrites(); std::vector accountTxData{ insertTransactions(lgrInfo, rawData)}; @@ -299,8 +302,9 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) std::move(bookDir)); } flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); - bool success = flatMapBackend_->writeLedger( + flatMapBackend_->writeLedger( lgrInfo, std::move(*rawData.mutable_ledger_header())); + bool success = flatMapBackend_->finishWrites(); BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "Inserted/modified/deleted all objects. Number of objects = "