From b170ecc9903b9cb5076fb51862cd2120c3bf0eca Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Wed, 9 Feb 2022 15:17:46 +0000 Subject: [PATCH] make sync interval for Cassandra configurable --- src/backend/CassandraBackend.cpp | 6 ++++ src/backend/CassandraBackend.h | 53 +++++++++++++++++++------------- 2 files changed, 38 insertions(+), 21 deletions(-) diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index 00a0a4698..3112fe2a5 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -916,6 +916,12 @@ CassandraBackend::open(bool readOnly) if (getInt("max_requests_outstanding")) maxRequestsOutstanding = *getInt("max_requests_outstanding"); + if (getInt("sync_interval")) + syncInterval_ = *getInt("sync_interval"); + BOOST_LOG_TRIVIAL(info) + << __func__ << " sync interval is " << syncInterval_ + << ". max requests outstanding is " << maxRequestsOutstanding; + cass_cluster_set_request_timeout(cluster, 10000); rc = cass_cluster_set_queue_size_io( diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index cbbd6bae2..ad53074c7 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -609,6 +609,9 @@ private: CassandraPreparedStatement selectLatestLedger_; CassandraPreparedStatement selectLedgerRange_; + uint32_t syncInterval_ = 1; + uint32_t lastSync_ = 0; + // maximum number of concurrent in flight requests. New requests will wait // for earlier requests to finish if this limit is exceeded std::uint32_t maxRequestsOutstanding = 10000; @@ -691,30 +694,38 @@ public: bool doFinishWrites() const override { - // wait for all other writes to finish - sync(); - // write range - if (!range) + if (!range || lastSync_ == 0 || + ledgerSequence_ - syncInterval_ == lastSync_) { + // wait for all other writes to finish + sync(); + // write range + if (!range) + { + CassandraStatement statement{updateLedgerRange_}; + statement.bindNextInt(ledgerSequence_); + statement.bindNextBoolean(false); + statement.bindNextInt(ledgerSequence_); + executeSyncWrite(statement); + } CassandraStatement statement{updateLedgerRange_}; statement.bindNextInt(ledgerSequence_); - statement.bindNextBoolean(false); - statement.bindNextInt(ledgerSequence_); - executeSyncWrite(statement); + statement.bindNextBoolean(true); + if (lastSync_ == 0) + statement.bindNextInt(ledgerSequence_ - 1); + else + statement.bindNextInt(lastSync_); + if (!executeSyncUpdate(statement)) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " Update failed for ledger " + << std::to_string(ledgerSequence_) << ". Returning"; + return false; + } + BOOST_LOG_TRIVIAL(info) << __func__ << " Committed ledger " + << std::to_string(ledgerSequence_); + lastSync_ = ledgerSequence_; } - CassandraStatement statement{updateLedgerRange_}; - statement.bindNextInt(ledgerSequence_); - statement.bindNextBoolean(true); - statement.bindNextInt(ledgerSequence_ - 1); - if (!executeSyncUpdate(statement)) - { - BOOST_LOG_TRIVIAL(warning) - << __func__ << " Update failed for ledger " - << std::to_string(ledgerSequence_) << ". Returning"; - return false; - } - BOOST_LOG_TRIVIAL(debug) << __func__ << " Committed ledger " - << std::to_string(ledgerSequence_); return true; } void @@ -911,7 +922,7 @@ public: std::unique_lock lck(throttleMutex_); if (!canAddRequest()) { - BOOST_LOG_TRIVIAL(trace) + BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Max outstanding requests reached. " << "Waiting for other requests to finish";