mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
Fixed a file-not-found issue when a log file is moved to archive.
Summary: Fixed a file-not-found issue when a log file is moved to archive by doing a missing retry. Test Plan: make db_test export ROCKSDB_TEST=TransactionLogIteratorRace ./db_test Reviewers: sdong, haobo Reviewed By: sdong CC: igor, leveldb Differential Revision: https://reviews.facebook.net/D18669
This commit is contained in:
@@ -935,8 +935,19 @@ Status DBImpl::GetSortedWalsOfType(const std::string& path,
|
||||
continue;
|
||||
}
|
||||
|
||||
// Reproduce the race condition where a log file is moved
|
||||
// to archived dir, between these two sync points, used in
|
||||
// (DBTest,TransactionLogIteratorRace)
|
||||
TEST_SYNC_POINT("DBImpl::GetSortedWalsOfType:1");
|
||||
TEST_SYNC_POINT("DBImpl::GetSortedWalsOfType:2");
|
||||
|
||||
uint64_t size_bytes;
|
||||
s = env_->GetFileSize(LogFileName(path, number), &size_bytes);
|
||||
// re-try in case the alive log file has been moved to archive.
|
||||
if (!s.ok() && log_type == kAliveLogFile &&
|
||||
env_->FileExists(ArchivedLogFileName(path, number))) {
|
||||
s = env_->GetFileSize(ArchivedLogFileName(path, number), &size_bytes);
|
||||
}
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
@@ -5609,48 +5609,56 @@ TEST(DBTest, TransactionLogIterator) {
|
||||
|
||||
#ifndef NDEBUG // sync point is not included with DNDEBUG build
|
||||
TEST(DBTest, TransactionLogIteratorRace) {
|
||||
// Setup sync point dependency to reproduce the race condition of
|
||||
// a log file moved to archived dir, in the middle of GetSortedWalFiles
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{ { "DBImpl::GetSortedWalFiles:1", "DBImpl::PurgeObsoleteFiles:1" },
|
||||
{ "DBImpl::PurgeObsoleteFiles:2", "DBImpl::GetSortedWalFiles:2" },
|
||||
});
|
||||
static const int LOG_ITERATOR_RACE_TEST_COUNT = 2;
|
||||
static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] =
|
||||
{ { "DBImpl::GetSortedWalFiles:1", "DBImpl::PurgeObsoleteFiles:1",
|
||||
"DBImpl::PurgeObsoleteFiles:2", "DBImpl::GetSortedWalFiles:2" },
|
||||
{ "DBImpl::GetSortedWalsOfType:1", "DBImpl::PurgeObsoleteFiles:1",
|
||||
"DBImpl::PurgeObsoleteFiles:2", "DBImpl::GetSortedWalsOfType:2" }};
|
||||
for (int test = 0; test < LOG_ITERATOR_RACE_TEST_COUNT; ++test) {
|
||||
// Setup sync point dependency to reproduce the race condition of
|
||||
// a log file moved to archived dir, in the middle of GetSortedWalFiles
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{ { sync_points[test][0], sync_points[test][1] },
|
||||
{ sync_points[test][2], sync_points[test][3] },
|
||||
});
|
||||
|
||||
do {
|
||||
rocksdb::SyncPoint::GetInstance()->ClearTrace();
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
Options options = OptionsForLogIterTest();
|
||||
DestroyAndReopen(&options);
|
||||
Put("key1", DummyString(1024));
|
||||
dbfull()->Flush(FlushOptions());
|
||||
Put("key2", DummyString(1024));
|
||||
dbfull()->Flush(FlushOptions());
|
||||
Put("key3", DummyString(1024));
|
||||
dbfull()->Flush(FlushOptions());
|
||||
Put("key4", DummyString(1024));
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U);
|
||||
do {
|
||||
rocksdb::SyncPoint::GetInstance()->ClearTrace();
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
Options options = OptionsForLogIterTest();
|
||||
DestroyAndReopen(&options);
|
||||
Put("key1", DummyString(1024));
|
||||
dbfull()->Flush(FlushOptions());
|
||||
Put("key2", DummyString(1024));
|
||||
dbfull()->Flush(FlushOptions());
|
||||
Put("key3", DummyString(1024));
|
||||
dbfull()->Flush(FlushOptions());
|
||||
Put("key4", DummyString(1024));
|
||||
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U);
|
||||
|
||||
{
|
||||
auto iter = OpenTransactionLogIter(0);
|
||||
ExpectRecords(4, iter);
|
||||
}
|
||||
{
|
||||
auto iter = OpenTransactionLogIter(0);
|
||||
ExpectRecords(4, iter);
|
||||
}
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
// trigger async flush, and log move. Well, log move will
|
||||
// wait until the GetSortedWalFiles:1 to reproduce the race
|
||||
// condition
|
||||
FlushOptions flush_options;
|
||||
flush_options.wait = false;
|
||||
dbfull()->Flush(flush_options);
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
// trigger async flush, and log move. Well, log move will
|
||||
// wait until the GetSortedWalFiles:1 to reproduce the race
|
||||
// condition
|
||||
FlushOptions flush_options;
|
||||
flush_options.wait = false;
|
||||
dbfull()->Flush(flush_options);
|
||||
|
||||
// "key5" would be written in a new memtable and log
|
||||
Put("key5", DummyString(1024));
|
||||
{
|
||||
// this iter would miss "key4" if not fixed
|
||||
auto iter = OpenTransactionLogIter(0);
|
||||
ExpectRecords(5, iter);
|
||||
}
|
||||
} while (ChangeCompactOptions());
|
||||
// "key5" would be written in a new memtable and log
|
||||
Put("key5", DummyString(1024));
|
||||
{
|
||||
// this iter would miss "key4" if not fixed
|
||||
auto iter = OpenTransactionLogIter(0);
|
||||
ExpectRecords(5, iter);
|
||||
}
|
||||
} while (ChangeCompactOptions());
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
@@ -93,6 +93,7 @@ void TransactionLogIteratorImpl::SeekToStartSequence(
|
||||
Status s = OpenLogReader(files_->at(startFileIndex).get());
|
||||
if (!s.ok()) {
|
||||
currentStatus_ = s;
|
||||
reporter_.Info(currentStatus_.ToString().c_str());
|
||||
return;
|
||||
}
|
||||
while (RestrictedRead(&record, &scratch)) {
|
||||
|
||||
Reference in New Issue
Block a user