diff --git a/db/c.cc b/db/c.cc index 915a3e80e2..b50e59ecc5 100644 --- a/db/c.cc +++ b/db/c.cc @@ -330,6 +330,20 @@ rocksdb_t* rocksdb_open( return result; } +rocksdb_t* rocksdb_open_for_read_only( + const rocksdb_options_t* options, + const char* name, + unsigned char error_if_log_file_exist, + char** errptr) { + DB* db; + if (SaveError(errptr, DB::OpenForReadOnly(options->rep, std::string(name), &db, error_if_log_file_exist))) { + return nullptr; + } + rocksdb_t* result = new rocksdb_t; + result->rep = db; + return result; +} + void rocksdb_close(rocksdb_t* db) { delete db->rep; delete db; diff --git a/db/db_test.cc b/db/db_test.cc index 188cfff3da..20fdbd290f 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5555,9 +5555,6 @@ TEST(DBTest, TransactionLogIteratorMoveOverZeroFiles) { } while (ChangeCompactOptions()); } -// TODO(kailiu) disable the in non-linux platforms to temporarily solve -// // the unit test failure. -#ifdef OS_LINUX TEST(DBTest, TransactionLogIteratorStallAtLastRecord) { do { Options options = OptionsForLogIterTest(); @@ -5575,7 +5572,6 @@ TEST(DBTest, TransactionLogIteratorStallAtLastRecord) { ASSERT_TRUE(iter->Valid()); } while (ChangeCompactOptions()); } -#endif TEST(DBTest, TransactionLogIteratorJustEmptyFile) { do { diff --git a/db/merge_test.cc b/db/merge_test.cc index 8858977eaa..9bdf54332e 100644 --- a/db/merge_test.cc +++ b/db/merge_test.cc @@ -17,7 +17,7 @@ #include "db/write_batch_internal.h" #include "utilities/merge_operators.h" #include "util/testharness.h" -#include "utilities/utility_db.h" +#include "utilities/db_ttl.h" using namespace std; using namespace rocksdb; @@ -80,7 +80,6 @@ std::shared_ptr OpenDb(const string& dbname, const bool ttl = false, const size_t max_successive_merges = 0, const uint32_t min_partial_merge_operands = 2) { DB* db; - StackableDB* sdb; Options options; options.create_if_missing = true; options.merge_operator = std::make_shared(); @@ -90,8 +89,9 @@ std::shared_ptr OpenDb(const string& dbname, const bool ttl = false, DestroyDB(dbname, Options()); if (ttl) { cout << "Opening database with TTL\n"; - s = UtilityDB::OpenTtlDB(options, dbname, &sdb); - db = sdb; + DBWithTTL* db_with_ttl; + s = DBWithTTL::Open(options, dbname, &db_with_ttl); + db = db_with_ttl; } else { s = DB::Open(options, dbname, &db); } diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 7f2c082d00..013ee5d2ab 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -83,6 +83,12 @@ extern rocksdb_t* rocksdb_open( const char* name, char** errptr); +extern rocksdb_t* rocksdb_open_for_read_only( + const rocksdb_options_t* options, + const char* name, + unsigned char error_if_log_file_exist, + char** errptr); + extern void rocksdb_close(rocksdb_t* db); extern void rocksdb_put( diff --git a/include/utilities/db_ttl.h b/include/utilities/db_ttl.h new file mode 100644 index 0000000000..e99744d8f9 --- /dev/null +++ b/include/utilities/db_ttl.h @@ -0,0 +1,68 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once +#ifndef ROCKSDB_LITE + +#include +#include + +#include "utilities/stackable_db.h" +#include "rocksdb/db.h" + +namespace rocksdb { + +// Database with TTL support. +// +// USE-CASES: +// This API should be used to open the db when key-values inserted are +// meant to be removed from the db in a non-strict 'ttl' amount of time +// Therefore, this guarantees that key-values inserted will remain in the +// db for >= ttl amount of time and the db will make efforts to remove the +// key-values as soon as possible after ttl seconds of their insertion. +// +// BEHAVIOUR: +// TTL is accepted in seconds +// (int32_t)Timestamp(creation) is suffixed to values in Put internally +// Expired TTL values deleted in compaction only:(Timestamp+ttl=5 +// read_only=true opens in the usual read-only mode. Compactions will not be +// triggered(neither manual nor automatic), so no expired entries removed +// +// CONSTRAINTS: +// Not specifying/passing or non-positive TTL behaves like TTL = infinity +// +// !!!WARNING!!!: +// Calling DB::Open directly to re-open a db created by this API will get +// corrupt values(timestamp suffixed) and no ttl effect will be there +// during the second Open, so use this API consistently to open the db +// Be careful when passing ttl with a small positive value because the +// whole database may be deleted in a small amount of time + +class DBWithTTL : public StackableDB { + public: + virtual Status CreateColumnFamilyWithTtl( + const ColumnFamilyOptions& options, const std::string& column_family_name, + ColumnFamilyHandle** handle, int ttl) = 0; + + static Status Open(const Options& options, const std::string& dbname, + DBWithTTL** dbptr, int32_t ttl = 0, + bool read_only = false); + + static Status Open(const DBOptions& db_options, const std::string& dbname, + const std::vector& column_families, + std::vector* handles, + DBWithTTL** dbptr, std::vector ttls, + bool read_only = false); + + protected: + explicit DBWithTTL(DB* db) : StackableDB(db) {} +}; + +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/include/utilities/stackable_db.h b/include/utilities/stackable_db.h index 57f444802b..7927c2a88f 100644 --- a/include/utilities/stackable_db.h +++ b/include/utilities/stackable_db.h @@ -21,6 +21,16 @@ class StackableDB : public DB { return db_; } + virtual Status CreateColumnFamily(const ColumnFamilyOptions& options, + const std::string& column_family_name, + ColumnFamilyHandle** handle) { + return db_->CreateColumnFamily(options, column_family_name, handle); + } + + virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) { + return db_->DropColumnFamily(column_family); + } + using DB::Put; virtual Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, diff --git a/include/utilities/utility_db.h b/include/utilities/utility_db.h index 0d2e11fa8c..f2b99cedf0 100644 --- a/include/utilities/utility_db.h +++ b/include/utilities/utility_db.h @@ -8,55 +8,22 @@ #include #include "utilities/stackable_db.h" +#include "utilities/db_ttl.h" #include "rocksdb/db.h" namespace rocksdb { -// This class contains APIs to open rocksdb with specific support eg. TTL +// Please don't use this class. It's deprecated class UtilityDB { - - public: - // Open the database with TTL support. - // - // USE-CASES: - // This API should be used to open the db when key-values inserted are - // meant to be removed from the db in a non-strict 'ttl' amount of time - // Therefore, this guarantees that key-values inserted will remain in the - // db for >= ttl amount of time and the db will make efforts to remove the - // key-values as soon as possible after ttl seconds of their insertion. - // - // BEHAVIOUR: - // TTL is accepted in seconds - // (int32_t)Timestamp(creation) is suffixed to values in Put internally - // Expired TTL values deleted in compaction only:(Timestamp+ttl=5 - // read_only=true opens in the usual read-only mode. Compactions will not be - // triggered(neither manual nor automatic), so no expired entries removed - // - // CONSTRAINTS: - // Not specifying/passing or non-positive TTL behaves like TTL = infinity - // - // !!!WARNING!!!: - // Calling DB::Open directly to re-open a db created by this API will get - // corrupt values(timestamp suffixed) and no ttl effect will be there - // during the second Open, so use this API consistently to open the db - // Be careful when passing ttl with a small positive value because the - // whole database may be deleted in a small amount of time - static Status OpenTtlDB(const Options& options, - const std::string& name, - StackableDB** dbptr, - int32_t ttl = 0, - bool read_only = false); - - // OpenTtlDB with column family support - static Status OpenTtlDB( - const DBOptions& db_options, const std::string& name, - const std::vector& column_families, - std::vector* handles, StackableDB** dbptr, - std::vector ttls, bool read_only = false); + public: + // This function is here only for backwards compatibility. Please use the + // functions defined in DBWithTTl (utilities/db_ttl.h) + // (deprecated) + __attribute__((deprecated)) static Status OpenTtlDB(const Options& options, + const std::string& name, + StackableDB** dbptr, + int32_t ttl = 0, + bool read_only = false); }; } // namespace rocksdb diff --git a/java/jdb_bench.sh b/java/jdb_bench.sh index 3fb610c203..dba7dbd319 100755 --- a/java/jdb_bench.sh +++ b/java/jdb_bench.sh @@ -1 +1 @@ -java -Djava.library.path=.:../ -cp "rocksdbjni.jar:.:./*" org.rocksdb.benchmark.DbBenchmark $@ +java -server -d64 -XX:NewSize=4m -XX:+AggressiveOpts -Djava.library.path=.:../ -cp "rocksdbjni.jar:.:./*" org.rocksdb.benchmark.DbBenchmark $@ diff --git a/java/org/rocksdb/ReadOptions.java b/java/org/rocksdb/ReadOptions.java index aead12ec86..ffd741a20d 100644 --- a/java/org/rocksdb/ReadOptions.java +++ b/java/org/rocksdb/ReadOptions.java @@ -93,34 +93,6 @@ public class ReadOptions { private native void setFillCache( long handle, boolean fillCache); - /** - * If this option is set and memtable implementation allows, Seek - * might only return keys with the same prefix as the seek-key - * Default: false - * - * @return true if prefix-seek is enabled. - */ - public boolean prefixSeek() { - assert(isInitialized()); - return prefixSeek(nativeHandle_); - } - private native boolean prefixSeek(long handle); - - /** - * If this option is set and memtable implementation allows, Seek - * might only return keys with the same prefix as the seek-key - * - * @param prefixSeek if true, then prefix-seek will be enabled. - * @return the reference to the current ReadOptions. - */ - public ReadOptions setPrefixSeek(boolean prefixSeek) { - assert(isInitialized()); - setPrefixSeek(nativeHandle_, prefixSeek); - return this; - } - private native void setPrefixSeek( - long handle, boolean prefixSeek); - /** * Specify to create a tailing iterator -- a special iterator that has a * view of the complete database (i.e. it can also be used to read newly diff --git a/java/org/rocksdb/benchmark/DbBenchmark.java b/java/org/rocksdb/benchmark/DbBenchmark.java index 37b08bc15e..84ee34bf1e 100644 --- a/java/org/rocksdb/benchmark/DbBenchmark.java +++ b/java/org/rocksdb/benchmark/DbBenchmark.java @@ -162,6 +162,15 @@ public class DbBenchmark { EXISTING } + enum CompressionType { + NONE, + SNAPPY, + ZLIB, + BZIP2, + LZ4, + LZ4HC + } + static { System.loadLibrary("rocksdbjni"); } @@ -435,7 +444,6 @@ public class DbBenchmark { databaseDir_ = (String) flags.get(Flag.db); writesPerSeconds_ = (Integer) flags.get(Flag.writes_per_second); cacheSize_ = (Long) flags.get(Flag.cache_size); - gen_ = new RandomGenerator(randSeed_, compressionRatio_); memtable_ = (String) flags.get(Flag.memtablerep); maxWriteBufferNumber_ = (Integer) flags.get(Flag.max_write_buffer_number); prefixSize_ = (Integer) flags.get(Flag.prefix_size); @@ -446,6 +454,28 @@ public class DbBenchmark { finishLock_ = new Object(); // options.setPrefixSize((Integer)flags_.get(Flag.prefix_size)); // options.setKeysPerPrefix((Long)flags_.get(Flag.keys_per_prefix)); + compressionType_ = (String) flags.get(Flag.compression_type); + compression_ = CompressionType.NONE; + try { + if (compressionType_.equals("snappy")) { + System.loadLibrary("snappy"); + } else if (compressionType_.equals("zlib")) { + System.loadLibrary("zlib"); + } else if (compressionType_.equals("bzip2")) { + System.loadLibrary("bzip2"); + } else if (compressionType_.equals("lz4")) { + System.loadLibrary("lz4"); + } else if (compressionType_.equals("lz4hc")) { + System.loadLibrary("lz4hc"); + } + } catch (UnsatisfiedLinkError e) { + System.err.format("Unable to load %s library:%s%n" + + "No compression is used.%n", + compressionType_, e.toString()); + compressionType_ = "none"; + compressionRatio_ = 1.0; + } + gen_ = new RandomGenerator(randSeed_, compressionRatio_); } private void prepareReadOptions(ReadOptions options) { @@ -462,6 +492,8 @@ public class DbBenchmark { options.setCacheSize(cacheSize_); if (!useExisting_) { options.setCreateIfMissing(true); + } else { + options.setCreateIfMissing(false); } if (memtable_.equals("skip_list")) { options.setMemTableConfig(new SkipListMemTableConfig()); @@ -488,6 +520,8 @@ public class DbBenchmark { options.setTableFormatConfig( new PlainTableConfig().setKeySize(keySize_)); } + options.setWriteBufferSize( + (Long)flags_.get(Flag.write_buffer_size)); options.setMaxWriteBufferNumber( (Integer)flags_.get(Flag.max_write_buffer_number)); options.setMaxBackgroundCompactions( @@ -513,7 +547,7 @@ public class DbBenchmark { options.setDisableSeekCompaction( (Boolean)flags_.get(Flag.disable_seek_compaction)); options.setDeleteObsoleteFilesPeriodMicros( - (Long)flags_.get(Flag.delete_obsolete_files_period_micros)); + (Integer)flags_.get(Flag.delete_obsolete_files_period_micros)); options.setTableCacheNumshardbits( (Integer)flags_.get(Flag.table_cache_numshardbits)); options.setAllowMmapReads( @@ -640,12 +674,12 @@ public class DbBenchmark { } else if (benchmark.equals("readseq")) { for (int t = 0; t < threadNum_; ++t) { tasks.add(new ReadSequentialTask( - currentTaskId++, randSeed_, reads_, num_)); + currentTaskId++, randSeed_, reads_ / threadNum_, num_)); } } else if (benchmark.equals("readrandom")) { for (int t = 0; t < threadNum_; ++t) { tasks.add(new ReadRandomTask( - currentTaskId++, randSeed_, reads_, num_)); + currentTaskId++, randSeed_, reads_ / threadNum_, num_)); } } else if (benchmark.equals("readwhilewriting")) { WriteTask writeTask = new WriteRandomTask( @@ -717,12 +751,12 @@ public class DbBenchmark { (int) (valueSize_ * compressionRatio_ + 0.5)); System.out.printf("Entries: %d\n", num_); System.out.printf("RawSize: %.1f MB (estimated)\n", - ((kKeySize + valueSize_) * num_) / 1048576.0); + ((double)(kKeySize + valueSize_) * num_) / SizeUnit.MB); System.out.printf("FileSize: %.1f MB (estimated)\n", - (((kKeySize + valueSize_ * compressionRatio_) * num_) - / 1048576.0)); + (((kKeySize + valueSize_ * compressionRatio_) * num_) / SizeUnit.MB)); System.out.format("Memtable Factory: %s%n", options.memTableFactoryName()); System.out.format("Prefix: %d bytes%n", prefixSize_); + System.out.format("Compression: %s%n", compressionType_); printWarnings(); System.out.printf("------------------------------------------------\n"); } @@ -769,7 +803,7 @@ public class DbBenchmark { System.out.printf( "%-16s : %11.5f micros/op; %6.1f MB/s; %d / %d task(s) finished.\n", - benchmark, elapsedSeconds * 1e6 / stats.done_, + benchmark, (double) elapsedSeconds / stats.done_ * 1e6, (stats.bytes_ / 1048576.0) / elapsedSeconds, taskFinishedCount, concurrentThreads); } @@ -932,7 +966,7 @@ public class DbBenchmark { return Integer.parseInt(value); } }, - write_buffer_size(4 << 20, + write_buffer_size(4 * SizeUnit.MB, "Number of bytes to buffer in memtable before compacting\n" + "\t(initialized to default value by 'main'.)") { @Override public Object parseValue(String value) { @@ -1275,11 +1309,17 @@ public class DbBenchmark { return Boolean.parseBoolean(value); } }, - delete_obsolete_files_period_micros(0L,"Option to delete\n" + + delete_obsolete_files_period_micros(0,"Option to delete\n" + "\tobsolete files periodically. 0 means that obsolete files are\n" + "\tdeleted after every compaction run.") { @Override public Object parseValue(String value) { - return Long.parseLong(value); + return Integer.parseInt(value); + } + }, + compression_type("snappy", + "Algorithm used to compress the database.") { + @Override public Object parseValue(String value) { + return value; } }, compression_level(-1, @@ -1512,7 +1552,7 @@ public class DbBenchmark { final long cacheSize_; final boolean useExisting_; final String databaseDir_; - final double compressionRatio_; + double compressionRatio_; RandomGenerator gen_; long startTime_; @@ -1532,4 +1572,6 @@ public class DbBenchmark { // as the scope of a static member equals to the scope of the problem, // we let its c++ pointer to be disposed in its finalizer. static Options defaultOptions_ = new Options(); + String compressionType_; + CompressionType compression_; } diff --git a/java/org/rocksdb/test/ReadOptionsTest.java b/java/org/rocksdb/test/ReadOptionsTest.java index 1fb6c51f16..501eda6cf7 100644 --- a/java/org/rocksdb/test/ReadOptionsTest.java +++ b/java/org/rocksdb/test/ReadOptionsTest.java @@ -27,12 +27,6 @@ public class ReadOptionsTest { assert(opt.fillCache() == boolValue); } - { // PrefixSeek test - boolean boolValue = rand.nextBoolean(); - opt.setPrefixSeek(boolValue); - assert(opt.prefixSeek() == boolValue); - } - { // Tailing test boolean boolValue = rand.nextBoolean(); opt.setTailing(boolValue); diff --git a/java/rocksjni/options.cc b/java/rocksjni/options.cc index a05a74e7a6..c5849ce395 100644 --- a/java/rocksjni/options.cc +++ b/java/rocksjni/options.cc @@ -1785,27 +1785,6 @@ void Java_org_rocksdb_ReadOptions_setFillCache( static_cast(jfill_cache); } -/* - * Class: org_rocksdb_ReadOptions - * Method: prefixSeek - * Signature: (J)Z - */ -jboolean Java_org_rocksdb_ReadOptions_prefixSeek( - JNIEnv* env, jobject jobj, jlong jhandle) { - return reinterpret_cast(jhandle)->prefix_seek; -} - -/* - * Class: org_rocksdb_ReadOptions - * Method: setPrefixSeek - * Signature: (JZ)V - */ -void Java_org_rocksdb_ReadOptions_setPrefixSeek( - JNIEnv* env, jobject jobj, jlong jhandle, jboolean jprefix_seek) { - reinterpret_cast(jhandle)->prefix_seek = - static_cast(jprefix_seek); -} - /* * Class: org_rocksdb_ReadOptions * Method: tailing diff --git a/java/rocksjni/write_batch.cc b/java/rocksjni/write_batch.cc index 9d8ba10745..035b35f6f3 100644 --- a/java/rocksjni/write_batch.cc +++ b/java/rocksjni/write_batch.cc @@ -212,7 +212,7 @@ jbyteArray Java_org_rocksdb_WriteBatchTest_getContents( rocksdb::Status s = rocksdb::WriteBatchInternal::InsertInto(b, &cf_mems_default); int count = 0; - rocksdb::Iterator* iter = mem->NewIterator(); + rocksdb::Iterator* iter = mem->NewIterator(rocksdb::ReadOptions()); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { rocksdb::ParsedInternalKey ikey; memset(reinterpret_cast(&ikey), 0, sizeof(ikey)); diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 8b8523f898..c774171d88 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -28,7 +28,7 @@ #include "db/version_set.h" #include "rocksdb/statistics.h" #include "rocksdb/cache.h" -#include "utilities/utility_db.h" +#include "utilities/db_ttl.h" #include "rocksdb/env.h" #include "rocksdb/write_batch.h" #include "rocksdb/slice.h" @@ -42,7 +42,6 @@ #include "util/random.h" #include "util/testutil.h" #include "util/logging.h" -#include "utilities/ttl/db_ttl.h" #include "hdfs/env_hdfs.h" #include "utilities/merge_operators.h" @@ -1620,9 +1619,9 @@ class StressTest { assert(!s.ok() || column_families_.size() == static_cast(FLAGS_column_families)); } else { - StackableDB* sdb; - s = UtilityDB::OpenTtlDB(options_, FLAGS_db, &sdb, FLAGS_ttl); - db_ = sdb; + DBWithTTL* db_with_ttl; + s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl); + db_ = db_with_ttl; } if (!s.ok()) { fprintf(stderr, "open error: %s\n", s.ToString().c_str()); diff --git a/util/env_posix.cc b/util/env_posix.cc index 9e76a126dc..5cbd5bd009 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -182,6 +182,9 @@ class PosixSequentialFile: public SequentialFile { if (r < n) { if (feof(file_)) { // We leave status as ok if we hit the end of the file + // We also clear the error so that the reads can continue + // if a new data is written to the file + clearerr(file_); } else { // A partial read with an error: return a non-ok status s = IOError(filename_, errno); diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 98e7bf0868..597179fd94 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -14,6 +14,7 @@ #include "rocksdb/write_batch.h" #include "rocksdb/cache.h" #include "util/coding.h" +#include "utilities/ttl/db_ttl_impl.h" #include #include @@ -909,11 +910,11 @@ void DBDumperCommand::DoCommand() { int max_keys = max_keys_; int ttl_start; if (!ParseIntOption(option_map_, ARG_TTL_START, ttl_start, exec_state_)) { - ttl_start = DBWithTTL::kMinTimestamp; // TTL introduction time + ttl_start = DBWithTTLImpl::kMinTimestamp; // TTL introduction time } int ttl_end; if (!ParseIntOption(option_map_, ARG_TTL_END, ttl_end, exec_state_)) { - ttl_end = DBWithTTL::kMaxTimestamp; // Max time allowed by TTL feature + ttl_end = DBWithTTLImpl::kMaxTimestamp; // Max time allowed by TTL feature } if (ttl_end < ttl_start) { fprintf(stderr, "Error: End time can't be less than start time\n"); @@ -1600,11 +1601,11 @@ void ScanCommand::DoCommand() { } int ttl_start; if (!ParseIntOption(option_map_, ARG_TTL_START, ttl_start, exec_state_)) { - ttl_start = DBWithTTL::kMinTimestamp; // TTL introduction time + ttl_start = DBWithTTLImpl::kMinTimestamp; // TTL introduction time } int ttl_end; if (!ParseIntOption(option_map_, ARG_TTL_END, ttl_end, exec_state_)) { - ttl_end = DBWithTTL::kMaxTimestamp; // Max time allowed by TTL feature + ttl_end = DBWithTTLImpl::kMaxTimestamp; // Max time allowed by TTL feature } if (ttl_end < ttl_start) { fprintf(stderr, "Error: End time can't be less than start time\n"); diff --git a/util/ldb_cmd.h b/util/ldb_cmd.h index f0ac59158c..4f760e0ceb 100644 --- a/util/ldb_cmd.h +++ b/util/ldb_cmd.h @@ -19,8 +19,8 @@ #include "util/logging.h" #include "util/ldb_cmd_execute_result.h" #include "util/string_util.h" -#include "utilities/utility_db.h" -#include "utilities/ttl/db_ttl.h" +#include "utilities/db_ttl.h" +#include "utilities/ttl/db_ttl_impl.h" using std::string; using std::map; @@ -149,7 +149,7 @@ protected: LDBCommandExecuteResult exec_state_; string db_path_; DB* db_; - StackableDB* sdb_; + DBWithTTL* db_ttl_; /** * true implies that this command can work if the db is opened in read-only @@ -217,11 +217,11 @@ protected: Status st; if (is_db_ttl_) { if (is_read_only_) { - st = UtilityDB::OpenTtlDB(opt, db_path_, &sdb_, 0, true); + st = DBWithTTL::Open(opt, db_path_, &db_ttl_, 0, true); } else { - st = UtilityDB::OpenTtlDB(opt, db_path_, &sdb_); + st = DBWithTTL::Open(opt, db_path_, &db_ttl_); } - db_ = sdb_; + db_ = db_ttl_; } else if (is_read_only_) { st = DB::OpenForReadOnly(opt, db_path_, &db_); } else { diff --git a/utilities/merge_operators/string_append/stringappend_test.cc b/utilities/merge_operators/string_append/stringappend_test.cc index b0b5c5b593..a68186a3af 100644 --- a/utilities/merge_operators/string_append/stringappend_test.cc +++ b/utilities/merge_operators/string_append/stringappend_test.cc @@ -14,7 +14,7 @@ #include "utilities/merge_operators.h" #include "utilities/merge_operators/string_append/stringappend.h" #include "utilities/merge_operators/string_append/stringappend2.h" -#include "utilities/ttl/db_ttl.h" +#include "utilities/db_ttl.h" #include "util/testharness.h" #include "util/random.h" @@ -38,11 +38,11 @@ std::shared_ptr OpenNormalDb(char delim_char) { // Open a TtlDB with a non-associative StringAppendTESTOperator std::shared_ptr OpenTtlDb(char delim_char) { - StackableDB* db; + DBWithTTL* db; Options options; options.create_if_missing = true; options.merge_operator.reset(new StringAppendTESTOperator(delim_char)); - ASSERT_OK(UtilityDB::OpenTtlDB(options, kDbName, &db, 123456)); + ASSERT_OK(DBWithTTL::Open(options, kDbName, &db, 123456)); return std::shared_ptr(db); } } // namespace diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl_impl.cc similarity index 57% rename from utilities/ttl/db_ttl.cc rename to utilities/ttl/db_ttl_impl.cc index fef2ec0214..0006b56a43 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl_impl.cc @@ -3,16 +3,18 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #ifndef ROCKSDB_LITE -#include "utilities/ttl/db_ttl.h" +#include "utilities/ttl/db_ttl_impl.h" + +#include "utilities/db_ttl.h" #include "db/filename.h" #include "db/write_batch_internal.h" #include "util/coding.h" -#include "include/rocksdb/env.h" -#include "include/rocksdb/iterator.h" +#include "rocksdb/env.h" +#include "rocksdb/iterator.h" namespace rocksdb { -void DBWithTTL::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options) { +void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options) { if (options->compaction_filter) { options->compaction_filter = new TtlCompactionFilter(ttl, options->compaction_filter); @@ -28,19 +30,25 @@ void DBWithTTL::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options) { } } -// Open the db inside DBWithTTL because options needs pointer to its ttl -DBWithTTL::DBWithTTL(DB* db) : StackableDB(db) {} +// Open the db inside DBWithTTLImpl because options needs pointer to its ttl +DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db) {} -DBWithTTL::~DBWithTTL() { - delete GetOptions().compaction_filter; +DBWithTTLImpl::~DBWithTTLImpl() { delete GetOptions().compaction_filter; } + +Status UtilityDB::OpenTtlDB(const Options& options, const std::string& dbname, + StackableDB** dbptr, int32_t ttl, bool read_only) { + DBWithTTL* db; + Status s = DBWithTTL::Open(options, dbname, &db, ttl, read_only); + if (s.ok()) { + *dbptr = db; + } else { + *dbptr = nullptr; + } + return s; } -Status UtilityDB::OpenTtlDB( - const Options& options, - const std::string& dbname, - StackableDB** dbptr, - int32_t ttl, - bool read_only) { +Status DBWithTTL::Open(const Options& options, const std::string& dbname, + DBWithTTL** dbptr, int32_t ttl, bool read_only) { DBOptions db_options(options); ColumnFamilyOptions cf_options(options); @@ -48,8 +56,8 @@ Status UtilityDB::OpenTtlDB( column_families.push_back( ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); std::vector handles; - Status s = UtilityDB::OpenTtlDB(db_options, dbname, column_families, &handles, - dbptr, {ttl}, read_only); + Status s = DBWithTTL::Open(db_options, dbname, column_families, &handles, + dbptr, {ttl}, read_only); if (s.ok()) { assert(handles.size() == 1); // i can delete the handle since DBImpl is always holding a reference to @@ -59,10 +67,10 @@ Status UtilityDB::OpenTtlDB( return s; } -Status UtilityDB::OpenTtlDB( +Status DBWithTTL::Open( const DBOptions& db_options, const std::string& dbname, const std::vector& column_families, - std::vector* handles, StackableDB** dbptr, + std::vector* handles, DBWithTTL** dbptr, std::vector ttls, bool read_only) { if (ttls.size() != column_families.size()) { @@ -73,7 +81,8 @@ Status UtilityDB::OpenTtlDB( std::vector column_families_sanitized = column_families; for (size_t i = 0; i < column_families_sanitized.size(); ++i) { - DBWithTTL::SanitizeOptions(ttls[i], &column_families_sanitized[i].options); + DBWithTTLImpl::SanitizeOptions(ttls[i], + &column_families_sanitized[i].options); } DB* db; @@ -85,66 +94,81 @@ Status UtilityDB::OpenTtlDB( st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db); } if (st.ok()) { - *dbptr = new DBWithTTL(db); + *dbptr = new DBWithTTLImpl(db); } else { *dbptr = nullptr; } return st; } +Status DBWithTTLImpl::CreateColumnFamilyWithTtl( + const ColumnFamilyOptions& options, const std::string& column_family_name, + ColumnFamilyHandle** handle, int ttl) { + ColumnFamilyOptions sanitized_options = options; + DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options); + + return DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name, + handle); +} + +Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options, + const std::string& column_family_name, + ColumnFamilyHandle** handle) { + return CreateColumnFamilyWithTtl(options, column_family_name, handle, 0); +} + // Gives back the current time -Status DBWithTTL::GetCurrentTime(int64_t& curtime) { - return Env::Default()->GetCurrentTime(&curtime); +Status DBWithTTLImpl::GetCurrentTime(int64_t* curtime) { + return Env::Default()->GetCurrentTime(curtime); } // Appends the current timestamp to the string. // Returns false if could not get the current_time, true if append succeeds -Status DBWithTTL::AppendTS(const Slice& val, std::string& val_with_ts) { - val_with_ts.reserve(kTSLength + val.size()); +Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts) { + val_with_ts->reserve(kTSLength + val.size()); char ts_string[kTSLength]; int64_t curtime; - Status st = GetCurrentTime(curtime); + Status st = GetCurrentTime(&curtime); if (!st.ok()) { return st; } EncodeFixed32(ts_string, (int32_t)curtime); - val_with_ts.append(val.data(), val.size()); - val_with_ts.append(ts_string, kTSLength); + val_with_ts->append(val.data(), val.size()); + val_with_ts->append(ts_string, kTSLength); return st; } // Returns corruption if the length of the string is lesser than timestamp, or // timestamp refers to a time lesser than ttl-feature release time -Status DBWithTTL::SanityCheckTimestamp(const Slice& str) { +Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) { if (str.size() < kTSLength) { return Status::Corruption("Error: value's length less than timestamp's\n"); } // Checks that TS is not lesser than kMinTimestamp // Gaurds against corruption & normal database opened incorrectly in ttl mode - int32_t timestamp_value = - DecodeFixed32(str.data() + str.size() - kTSLength); - if (timestamp_value < kMinTimestamp){ + int32_t timestamp_value = DecodeFixed32(str.data() + str.size() - kTSLength); + if (timestamp_value < kMinTimestamp) { return Status::Corruption("Error: Timestamp < ttl feature release time!\n"); } return Status::OK(); } // Checks if the string is stale or not according to TTl provided -bool DBWithTTL::IsStale(const Slice& value, int32_t ttl) { - if (ttl <= 0) { // Data is fresh if TTL is non-positive +bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl) { + if (ttl <= 0) { // Data is fresh if TTL is non-positive return false; } int64_t curtime; - if (!GetCurrentTime(curtime).ok()) { - return false; // Treat the data as fresh if could not get current time + if (!GetCurrentTime(&curtime).ok()) { + return false; // Treat the data as fresh if could not get current time } int32_t timestamp_value = - DecodeFixed32(value.data() + value.size() - kTSLength); + DecodeFixed32(value.data() + value.size() - kTSLength); return (timestamp_value + ttl) < curtime; } // Strips the TS from the end of the string -Status DBWithTTL::StripTS(std::string* str) { +Status DBWithTTLImpl::StripTS(std::string* str) { Status st; if (str->length() < kTSLength) { return Status::Corruption("Bad timestamp in key-value"); @@ -154,17 +178,17 @@ Status DBWithTTL::StripTS(std::string* str) { return st; } -Status DBWithTTL::Put(const WriteOptions& options, - ColumnFamilyHandle* column_family, const Slice& key, - const Slice& val) { +Status DBWithTTLImpl::Put(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& val) { WriteBatch batch; batch.Put(column_family, key, val); return Write(options, &batch); } -Status DBWithTTL::Get(const ReadOptions& options, - ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) { +Status DBWithTTLImpl::Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value) { Status st = db_->Get(options, column_family, key, value); if (!st.ok()) { return st; @@ -176,18 +200,18 @@ Status DBWithTTL::Get(const ReadOptions& options, return StripTS(value); } -std::vector DBWithTTL::MultiGet( +std::vector DBWithTTLImpl::MultiGet( const ReadOptions& options, const std::vector& column_family, const std::vector& keys, std::vector* values) { - return std::vector(keys.size(), - Status::NotSupported("MultiGet not\ - supported with TTL")); + return std::vector( + keys.size(), Status::NotSupported("MultiGet not supported with TTL")); } -bool DBWithTTL::KeyMayExist(const ReadOptions& options, - ColumnFamilyHandle* column_family, const Slice& key, - std::string* value, bool* value_found) { +bool DBWithTTLImpl::KeyMayExist(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key, std::string* value, + bool* value_found) { bool ret = db_->KeyMayExist(options, column_family, key, value, value_found); if (ret && value != nullptr && value_found != nullptr && *value_found) { if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) { @@ -197,15 +221,15 @@ bool DBWithTTL::KeyMayExist(const ReadOptions& options, return ret; } -Status DBWithTTL::Merge(const WriteOptions& options, - ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) { +Status DBWithTTLImpl::Merge(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) { WriteBatch batch; batch.Merge(column_family, key, value); return Write(options, &batch); } -Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) { +Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) { class Handler : public WriteBatch::Handler { public: WriteBatch updates_ttl; @@ -213,7 +237,7 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) { virtual Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) { std::string value_with_ts; - Status st = AppendTS(value, value_with_ts); + Status st = AppendTS(value, &value_with_ts); if (!st.ok()) { batch_rewrite_status = st; } else { @@ -225,7 +249,7 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) { virtual Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) { std::string value_with_ts; - Status st = AppendTS(value, value_with_ts); + Status st = AppendTS(value, &value_with_ts); if (!st.ok()) { batch_rewrite_status = st; } else { @@ -238,9 +262,7 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) { WriteBatchInternal::Delete(&updates_ttl, column_family_id, key); return Status::OK(); } - virtual void LogData(const Slice& blob) { - updates_ttl.PutLogData(blob); - } + virtual void LogData(const Slice& blob) { updates_ttl.PutLogData(blob); } }; Handler handler; updates->Iterate(&handler); @@ -251,8 +273,8 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) { } } -Iterator* DBWithTTL::NewIterator(const ReadOptions& opts, - ColumnFamilyHandle* column_family) { +Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& opts, + ColumnFamilyHandle* column_family) { return new TtlIterator(db_->NewIterator(opts, column_family)); } diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl_impl.h similarity index 60% rename from utilities/ttl/db_ttl.h rename to utilities/ttl/db_ttl_impl.h index 28fd3b41b6..9f7b658226 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl_impl.h @@ -14,17 +14,27 @@ #include "rocksdb/compaction_filter.h" #include "rocksdb/merge_operator.h" #include "utilities/utility_db.h" +#include "utilities/db_ttl.h" #include "db/db_impl.h" namespace rocksdb { -class DBWithTTL : public StackableDB { +class DBWithTTLImpl : public DBWithTTL { public: static void SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options); - explicit DBWithTTL(DB* db); + explicit DBWithTTLImpl(DB* db); - virtual ~DBWithTTL(); + virtual ~DBWithTTLImpl(); + + Status CreateColumnFamilyWithTtl(const ColumnFamilyOptions& options, + const std::string& column_family_name, + ColumnFamilyHandle** handle, + int ttl) override; + + Status CreateColumnFamily(const ColumnFamilyOptions& options, + const std::string& column_family_name, + ColumnFamilyHandle** handle) override; using StackableDB::Put; virtual Status Put(const WriteOptions& options, @@ -60,83 +70,60 @@ class DBWithTTL : public StackableDB { virtual Iterator* NewIterator(const ReadOptions& opts, ColumnFamilyHandle* column_family) override; - virtual DB* GetBaseDB() { - return db_; - } + virtual DB* GetBaseDB() { return db_; } static bool IsStale(const Slice& value, int32_t ttl); - static Status AppendTS(const Slice& val, std::string& val_with_ts); + static Status AppendTS(const Slice& val, std::string* val_with_ts); static Status SanityCheckTimestamp(const Slice& str); static Status StripTS(std::string* str); - static Status GetCurrentTime(int64_t& curtime); + static Status GetCurrentTime(int64_t* curtime); - static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp + static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp - static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8 + static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8 - static const int32_t kMaxTimestamp = 2147483647; // 01/18/2038:7:14PM GMT-8 + static const int32_t kMaxTimestamp = 2147483647; // 01/18/2038:7:14PM GMT-8 }; class TtlIterator : public Iterator { public: - explicit TtlIterator(Iterator* iter) - : iter_(iter) { - assert(iter_); - } + explicit TtlIterator(Iterator* iter) : iter_(iter) { assert(iter_); } - ~TtlIterator() { - delete iter_; - } + ~TtlIterator() { delete iter_; } - bool Valid() const { - return iter_->Valid(); - } + bool Valid() const { return iter_->Valid(); } - void SeekToFirst() { - iter_->SeekToFirst(); - } + void SeekToFirst() { iter_->SeekToFirst(); } - void SeekToLast() { - iter_->SeekToLast(); - } + void SeekToLast() { iter_->SeekToLast(); } - void Seek(const Slice& target) { - iter_->Seek(target); - } + void Seek(const Slice& target) { iter_->Seek(target); } - void Next() { - iter_->Next(); - } + void Next() { iter_->Next(); } - void Prev() { - iter_->Prev(); - } + void Prev() { iter_->Prev(); } - Slice key() const { - return iter_->key(); - } + Slice key() const { return iter_->key(); } int32_t timestamp() const { - return DecodeFixed32( - iter_->value().data() + iter_->value().size() - DBWithTTL::kTSLength); + return DecodeFixed32(iter_->value().data() + iter_->value().size() - + DBWithTTLImpl::kTSLength); } Slice value() const { - //TODO: handle timestamp corruption like in general iterator semantics - assert(DBWithTTL::SanityCheckTimestamp(iter_->value()).ok()); + // TODO: handle timestamp corruption like in general iterator semantics + assert(DBWithTTLImpl::SanityCheckTimestamp(iter_->value()).ok()); Slice trimmed_value = iter_->value(); - trimmed_value.size_ -= DBWithTTL::kTSLength; + trimmed_value.size_ -= DBWithTTLImpl::kTSLength; return trimmed_value; } - Status status() const { - return iter_->status(); - } + Status status() const { return iter_->status(); } private: Iterator* iter_; @@ -146,13 +133,13 @@ class TtlCompactionFilter : public CompactionFilter { public: TtlCompactionFilter( - int32_t ttl, - const CompactionFilter* user_comp_filter, - std::unique_ptr - user_comp_filter_from_factory = nullptr) - : ttl_(ttl), - user_comp_filter_(user_comp_filter), - user_comp_filter_from_factory_(std::move(user_comp_filter_from_factory)) { + int32_t ttl, const CompactionFilter* user_comp_filter, + std::unique_ptr user_comp_filter_from_factory = + nullptr) + : ttl_(ttl), + user_comp_filter_(user_comp_filter), + user_comp_filter_from_factory_( + std::move(user_comp_filter_from_factory)) { // Unlike the merge operator, compaction filter is necessary for TTL, hence // this would be called even if user doesn't specify any compaction-filter if (!user_comp_filter_) { @@ -160,34 +147,31 @@ class TtlCompactionFilter : public CompactionFilter { } } - virtual bool Filter(int level, - const Slice& key, - const Slice& old_val, - std::string* new_val, - bool* value_changed) const override { - if (DBWithTTL::IsStale(old_val, ttl_)) { + virtual bool Filter(int level, const Slice& key, const Slice& old_val, + std::string* new_val, bool* value_changed) const + override { + if (DBWithTTLImpl::IsStale(old_val, ttl_)) { return true; } if (user_comp_filter_ == nullptr) { return false; } - assert(old_val.size() >= DBWithTTL::kTSLength); + assert(old_val.size() >= DBWithTTLImpl::kTSLength); Slice old_val_without_ts(old_val.data(), - old_val.size() - DBWithTTL::kTSLength); + old_val.size() - DBWithTTLImpl::kTSLength); if (user_comp_filter_->Filter(level, key, old_val_without_ts, new_val, value_changed)) { return true; } if (*value_changed) { - new_val->append(old_val.data() + old_val.size() - DBWithTTL::kTSLength, - DBWithTTL::kTSLength); + new_val->append( + old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength, + DBWithTTLImpl::kTSLength); } return false; } - virtual const char* Name() const override { - return "Delete By TTL"; - } + virtual const char* Name() const override { return "Delete By TTL"; } private: int32_t ttl_; @@ -196,47 +180,40 @@ class TtlCompactionFilter : public CompactionFilter { }; class TtlCompactionFilterFactory : public CompactionFilterFactory { - public: - TtlCompactionFilterFactory( - int32_t ttl, - std::shared_ptr comp_filter_factory) - : ttl_(ttl), - user_comp_filter_factory_(comp_filter_factory) { } + public: + TtlCompactionFilterFactory( + int32_t ttl, std::shared_ptr comp_filter_factory) + : ttl_(ttl), user_comp_filter_factory_(comp_filter_factory) {} - virtual std::unique_ptr CreateCompactionFilter( - const CompactionFilter::Context& context) { - return std::unique_ptr( - new TtlCompactionFilter( - ttl_, - nullptr, - std::move(user_comp_filter_factory_->CreateCompactionFilter(context)) - ) - ); - } + virtual std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) { + return std::unique_ptr(new TtlCompactionFilter( + ttl_, nullptr, + std::move(user_comp_filter_factory_->CreateCompactionFilter(context)))); + } - virtual const char* Name() const override { - return "TtlCompactionFilterFactory"; - } + virtual const char* Name() const override { + return "TtlCompactionFilterFactory"; + } - private: - int32_t ttl_; - std::shared_ptr user_comp_filter_factory_; + private: + int32_t ttl_; + std::shared_ptr user_comp_filter_factory_; }; class TtlMergeOperator : public MergeOperator { public: explicit TtlMergeOperator(const std::shared_ptr merge_op) - : user_merge_op_(merge_op) { + : user_merge_op_(merge_op) { assert(merge_op); } - virtual bool FullMerge(const Slice& key, - const Slice* existing_value, + virtual bool FullMerge(const Slice& key, const Slice* existing_value, const std::deque& operands, - std::string* new_value, - Logger* logger) const override { - const uint32_t ts_len = DBWithTTL::kTSLength; + std::string* new_value, Logger* logger) const + override { + const uint32_t ts_len = DBWithTTLImpl::kTSLength; if (existing_value && existing_value->size() < ts_len) { Log(logger, "Error: Could not remove timestamp from existing value."); return false; @@ -244,7 +221,7 @@ class TtlMergeOperator : public MergeOperator { // Extract time-stamp from each operand to be passed to user_merge_op_ std::deque operands_without_ts; - for (const auto &operand : operands) { + for (const auto& operand : operands) { if (operand.size() < ts_len) { Log(logger, "Error: Could not remove timestamp from operand value."); return false; @@ -271,9 +248,10 @@ class TtlMergeOperator : public MergeOperator { // Augment the *new_value with the ttl time-stamp int64_t curtime; - if (!DBWithTTL::GetCurrentTime(curtime).ok()) { - Log(logger, "Error: Could not get current time to be attached internally " - "to the new value."); + if (!DBWithTTLImpl::GetCurrentTime(&curtime).ok()) { + Log(logger, + "Error: Could not get current time to be attached internally " + "to the new value."); return false; } else { char ts_string[ts_len]; @@ -287,7 +265,7 @@ class TtlMergeOperator : public MergeOperator { const std::deque& operand_list, std::string* new_value, Logger* logger) const override { - const uint32_t ts_len = DBWithTTL::kTSLength; + const uint32_t ts_len = DBWithTTLImpl::kTSLength; std::deque operands_without_ts; for (const auto& operand : operand_list) { @@ -309,9 +287,10 @@ class TtlMergeOperator : public MergeOperator { // Augment the *new_value with the ttl time-stamp int64_t curtime; - if (!DBWithTTL::GetCurrentTime(curtime).ok()) { - Log(logger, "Error: Could not get current time to be attached internally " - "to the new value."); + if (!DBWithTTLImpl::GetCurrentTime(&curtime).ok()) { + Log(logger, + "Error: Could not get current time to be attached internally " + "to the new value."); return false; } else { char ts_string[ts_len]; @@ -319,16 +298,12 @@ class TtlMergeOperator : public MergeOperator { new_value->append(ts_string, ts_len); return true; } - } - virtual const char* Name() const override { - return "Merge By TTL"; - } + virtual const char* Name() const override { return "Merge By TTL"; } private: std::shared_ptr user_merge_op_; }; - } #endif // ROCKSDB_LITE diff --git a/utilities/ttl/ttl_test.cc b/utilities/ttl/ttl_test.cc index 660ebf7801..7af5985af6 100644 --- a/utilities/ttl/ttl_test.cc +++ b/utilities/ttl/ttl_test.cc @@ -4,7 +4,7 @@ #include #include "rocksdb/compaction_filter.h" -#include "utilities/utility_db.h" +#include "utilities/db_ttl.h" #include "util/testharness.h" #include "util/logging.h" #include @@ -45,13 +45,13 @@ class TtlTest { void OpenTtl() { ASSERT_TRUE(db_ttl_ == nullptr); // db should be closed before opening again - ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_)); + ASSERT_OK(DBWithTTL::Open(options_, dbname_, &db_ttl_)); } // Open database with TTL support when TTL provided with db_ttl_ pointer void OpenTtl(int32_t ttl) { ASSERT_TRUE(db_ttl_ == nullptr); - ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_, ttl)); + ASSERT_OK(DBWithTTL::Open(options_, dbname_, &db_ttl_, ttl)); } // Open with TestFilter compaction filter @@ -65,7 +65,7 @@ class TtlTest { // Open database with TTL support in read_only mode void OpenReadOnlyTtl(int32_t ttl) { ASSERT_TRUE(db_ttl_ == nullptr); - ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_, ttl, true)); + ASSERT_OK(DBWithTTL::Open(options_, dbname_, &db_ttl_, ttl, true)); } void CloseTtl() { @@ -317,7 +317,7 @@ class TtlTest { // Choose carefully so that Put, Gets & Compaction complete in 1 second buffer const int64_t kSampleSize_ = 100; std::string dbname_; - StackableDB* db_ttl_; + DBWithTTL* db_ttl_; private: Options options_; @@ -532,25 +532,33 @@ TEST(TtlTest, ColumnFamiliesTest) { std::vector handles; - ASSERT_OK(UtilityDB::OpenTtlDB(DBOptions(options), dbname_, column_families, - &handles, &db_ttl_, {2, 4}, false)); + ASSERT_OK(DBWithTTL::Open(DBOptions(options), dbname_, column_families, + &handles, &db_ttl_, {2, 4}, false)); ASSERT_EQ(handles.size(), 2U); + ColumnFamilyHandle* new_handle; + ASSERT_OK(db_ttl_->CreateColumnFamilyWithTtl(options, "ttl_column_family_2", + &new_handle, 2)); + handles.push_back(new_handle); MakeKVMap(kSampleSize_); PutValues(0, kSampleSize_, false, handles[0]); PutValues(0, kSampleSize_, false, handles[1]); + PutValues(0, kSampleSize_, false, handles[2]); // everything should be there after 1 second SleepCompactCheck(1, 0, kSampleSize_, true, false, handles[0]); SleepCompactCheck(0, 0, kSampleSize_, true, false, handles[1]); + SleepCompactCheck(0, 0, kSampleSize_, true, false, handles[2]); // only column family 1 should be alive after 3 seconds SleepCompactCheck(2, 0, kSampleSize_, false, false, handles[0]); SleepCompactCheck(0, 0, kSampleSize_, true, false, handles[1]); + SleepCompactCheck(0, 0, kSampleSize_, false, false, handles[2]); // nothing should be there after 5 seconds SleepCompactCheck(2, 0, kSampleSize_, false, false, handles[0]); SleepCompactCheck(0, 0, kSampleSize_, false, false, handles[1]); + SleepCompactCheck(0, 0, kSampleSize_, false, false, handles[2]); for (auto h : handles) { delete h;