make sync interval for Cassandra configurable

This commit is contained in:
CJ Cobb
2022-02-09 15:17:46 +00:00
committed by CJ Cobb
parent 3db9db9354
commit b170ecc990
2 changed files with 38 additions and 21 deletions

View File

@@ -916,6 +916,12 @@ CassandraBackend::open(bool readOnly)
if (getInt("max_requests_outstanding"))
maxRequestsOutstanding = *getInt("max_requests_outstanding");
if (getInt("sync_interval"))
syncInterval_ = *getInt("sync_interval");
BOOST_LOG_TRIVIAL(info)
<< __func__ << " sync interval is " << syncInterval_
<< ". max requests outstanding is " << maxRequestsOutstanding;
cass_cluster_set_request_timeout(cluster, 10000);
rc = cass_cluster_set_queue_size_io(

View File

@@ -609,6 +609,9 @@ private:
CassandraPreparedStatement selectLatestLedger_;
CassandraPreparedStatement selectLedgerRange_;
uint32_t syncInterval_ = 1;
uint32_t lastSync_ = 0;
// maximum number of concurrent in flight requests. New requests will wait
// for earlier requests to finish if this limit is exceeded
std::uint32_t maxRequestsOutstanding = 10000;
@@ -691,30 +694,38 @@ public:
bool
doFinishWrites() const override
{
// wait for all other writes to finish
sync();
// write range
if (!range)
if (!range || lastSync_ == 0 ||
ledgerSequence_ - syncInterval_ == lastSync_)
{
// wait for all other writes to finish
sync();
// write range
if (!range)
{
CassandraStatement statement{updateLedgerRange_};
statement.bindNextInt(ledgerSequence_);
statement.bindNextBoolean(false);
statement.bindNextInt(ledgerSequence_);
executeSyncWrite(statement);
}
CassandraStatement statement{updateLedgerRange_};
statement.bindNextInt(ledgerSequence_);
statement.bindNextBoolean(false);
statement.bindNextInt(ledgerSequence_);
executeSyncWrite(statement);
statement.bindNextBoolean(true);
if (lastSync_ == 0)
statement.bindNextInt(ledgerSequence_ - 1);
else
statement.bindNextInt(lastSync_);
if (!executeSyncUpdate(statement))
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Update failed for ledger "
<< std::to_string(ledgerSequence_) << ". Returning";
return false;
}
BOOST_LOG_TRIVIAL(info) << __func__ << " Committed ledger "
<< std::to_string(ledgerSequence_);
lastSync_ = ledgerSequence_;
}
CassandraStatement statement{updateLedgerRange_};
statement.bindNextInt(ledgerSequence_);
statement.bindNextBoolean(true);
statement.bindNextInt(ledgerSequence_ - 1);
if (!executeSyncUpdate(statement))
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Update failed for ledger "
<< std::to_string(ledgerSequence_) << ". Returning";
return false;
}
BOOST_LOG_TRIVIAL(debug) << __func__ << " Committed ledger "
<< std::to_string(ledgerSequence_);
return true;
}
void
@@ -911,7 +922,7 @@ public:
std::unique_lock<std::mutex> lck(throttleMutex_);
if (!canAddRequest())
{
BOOST_LOG_TRIVIAL(trace)
BOOST_LOG_TRIVIAL(info)
<< __func__ << " : "
<< "Max outstanding requests reached. "
<< "Waiting for other requests to finish";