diff --git a/.gitignore b/.gitignore index 974991fd8d..a3a70ee311 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,10 @@ build_config.mk *_bench *_stress *.out +*.class +*.jar +*.*jnilib* +*.d-e ldb manifest_dump @@ -23,3 +27,5 @@ coverage/COVERAGE_REPORT .gdbhistory .phutil_module_cache tags +java/*.log +java/include/org_rocksdb_*.h diff --git a/HISTORY.md b/HISTORY.md index a3348c054b..0033137671 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,17 +1,15 @@ # Rocksdb Change Log ## Unreleased (will be released in 3.0) -* By default, max_background_flushes is 1 and flush process is - removed from background compaction process. Flush process is now always - executed in high priority thread pool. * Column family support -## Unreleased (will be relased in 2.8) - ### Public API changes +## 2.8.0 (04/04/2014) + * Removed arena.h from public header files. * By default, checksums are verified on every read from database +* Change default value of several options, including: paranoid_checks=true, max_open_files=5000, level0_slowdown_writes_trigger=20, level0_stop_writes_trigger=24, disable_seek_compaction=true, max_background_flushes=1 and allow_mmap_writes=false * Added is_manual_compaction to CompactionFilter::Context * Added "virtual void WaitForJoin()" in class Env. Default operation is no-op. * Removed BackupEngine::DeleteBackupsNewerThan() function @@ -21,13 +19,19 @@ * Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools * Added a command "checkconsistency" in ldb tool, which checks if file system state matches DB state (file existence and file sizes) -* CompactionFilter::Context is now CompactionFilterContext. It is shared by CompactionFilter and CompactionFilterV2 +* Separate options related to block based table to a new struct BlockBasedTableOptions +* WriteBatch has a new function Count() to return total size in the batch, and Data() now returns a reference instead of a copy +* Add more counters to perf context. +* Supports several more DB properties: compaction-pending, background-errors and cur-size-active-mem-table. ### New Features * If we find one truncated record at the end of the MANIFEST or WAL files, we will ignore it. We assume that writers of these records were interrupted and that we can safely ignore it. -* Now compaction filter has a V2 interface. It buffers the kv-pairs sharing the same key prefix, process them in batches, and return the batched results back to DB. +* A new SST format "PlainTable" is added, which is optimized for memory-only workloads. It can be created through NewPlainTableFactory() or NewTotalOrderPlainTableFactory(). +* A new mem table implementation hash linked list optimizing for the case that there are only few keys for each prefix, which can be created through NewHashLinkListRepFactory(). +* Merge operator supports a new function PartialMergeMulti() to allow users to do partial merges against multiple operands. +* Now compaction filter has a V2 interface. It buffers the kv-pairs sharing the same key prefix, process them in batches, and return the batched results back to DB. The new interface uses a new structure CompactionFilterContext for the same purpose as CompactionFilter::Context in V1. * Geo-spatial support for locations and radial-search. ## 2.7.0 (01/28/2014) diff --git a/INSTALL.md b/INSTALL.md index 86934db69c..2a91be6974 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -67,6 +67,9 @@ libraries. You are on your own. * Please note that some of the optimizations/features are disabled in OSX. We did not run any production workloads on it. +* **iOS**: + * Run: `TARGET_OS=IOS make static_lib` + ## Compilation `make clean; make` will compile librocksdb.a (RocksDB static library) and all the unit tests. You can run all unit tests with `make check`. diff --git a/Makefile b/Makefile index 07eb4d2831..9ccd8e4e00 100644 --- a/Makefile +++ b/Makefile @@ -23,6 +23,14 @@ $(shell (export ROCKSDB_ROOT=$(CURDIR); $(CURDIR)/build_tools/build_detect_platf # this file is generated by the previous line to set build flags and sources include build_config.mk +ifneq ($(PLATFORM), IOS) +CFLAGS += -g +CXXFLAGS += -g +else +# no debug info for IOS, that will make our library big +OPT += -DNDEBUG +endif + # ASAN doesn't work well with jemalloc. If we're compiling with ASAN, we should use regular malloc. ifdef COMPILE_WITH_ASAN # ASAN compile flags @@ -37,8 +45,8 @@ else endif WARNING_FLAGS = -Wall -Werror -Wno-sign-compare -CFLAGS += -g $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT) -CXXFLAGS += -g $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -Woverloaded-virtual +CFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT) +CXXFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -Woverloaded-virtual LDFLAGS += $(PLATFORM_LDFLAGS) @@ -149,11 +157,15 @@ $(SHARED3): endif # PLATFORM_SHARED_EXT .PHONY: blackbox_crash_test check clean coverage crash_test ldb_tests \ - release tags valgrind_check whitebox_crash_test format shared_lib all \ + release tags valgrind_check whitebox_crash_test format static_lib shared_lib all \ dbg all: $(LIBRARY) $(PROGRAMS) +static_lib: $(LIBRARY) + +shared_lib: $(SHARED) + dbg: $(LIBRARY) $(PROGRAMS) # Will also generate shared libraries. @@ -219,8 +231,6 @@ tags: format: build_tools/format-diff.sh -shared_lib: $(SHARED) - # --------------------------------------------------------------------------- # Unit tests and tools # --------------------------------------------------------------------------- @@ -406,7 +416,7 @@ ldb: tools/ldb.o $(LIBOBJECTS) # --------------------------------------------------------------------------- # Jni stuff # --------------------------------------------------------------------------- -JNI_NATIVE_SOURCES = ./java/rocksjni/rocksjni.cc ./java/rocksjni/options.cc +JNI_NATIVE_SOURCES = ./java/rocksjni/rocksjni.cc ./java/rocksjni/options.cc ./java/rocksjni/write_batch.cc JAVA_INCLUDE = -I/usr/lib/jvm/java-openjdk/include/ -I/usr/lib/jvm/java-openjdk/include/linux ROCKSDBJNILIB = ./java/librocksdbjni.so @@ -419,6 +429,7 @@ endif jni: clean OPT="-fPIC -DNDEBUG -O2" $(MAKE) $(LIBRARY) -j32 cd java;$(MAKE) java; + rm -f $(ROCKSDBJNILIB) $(CXX) $(CXXFLAGS) -I./java/. $(JAVA_INCLUDE) -shared -fPIC -o $(ROCKSDBJNILIB) $(JNI_NATIVE_SOURCES) $(LIBOBJECTS) $(LDFLAGS) $(COVERAGEFLAGS) jclean: @@ -426,7 +437,7 @@ jclean: rm -f $(ROCKSDBJNILIB) jtest: - cd java;$(MAKE) sample; + cd java;$(MAKE) sample;$(MAKE) test; # --------------------------------------------------------------------------- # Platform-specific compilation @@ -438,20 +449,20 @@ ifeq ($(PLATFORM), IOS) PLATFORMSROOT=/Applications/Xcode.app/Contents/Developer/Platforms SIMULATORROOT=$(PLATFORMSROOT)/iPhoneSimulator.platform/Developer DEVICEROOT=$(PLATFORMSROOT)/iPhoneOS.platform/Developer -IOSVERSION=$(shell defaults read $(PLATFORMSROOT)/iPhoneOS.platform/versionCFBundleShortVersionString) +IOSVERSION=$(shell defaults read $(PLATFORMSROOT)/iPhoneOS.platform/version CFBundleShortVersionString) .cc.o: mkdir -p ios-x86/$(dir $@) - $(SIMULATORROOT)/usr/bin/$(CXX) $(CXXFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -c $< -o ios-x86/$@ $(COVERAGEFLAGS) + $(CXX) $(CXXFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -arch x86_64 -c $< -o ios-x86/$@ mkdir -p ios-arm/$(dir $@) - $(DEVICEROOT)/usr/bin/$(CXX) $(CXXFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -c $< -o ios-arm/$@ $(COVERAGEFLAGS) + xcrun -sdk iphoneos $(CXX) $(CXXFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -arch armv7s -arch arm64 -c $< -o ios-arm/$@ lipo ios-x86/$@ ios-arm/$@ -create -output $@ .c.o: mkdir -p ios-x86/$(dir $@) - $(SIMULATORROOT)/usr/bin/$(CC) $(CFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -c $< -o ios-x86/$@ + $(CC) $(CFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -arch x86_64 -c $< -o ios-x86/$@ mkdir -p ios-arm/$(dir $@) - $(DEVICEROOT)/usr/bin/$(CC) $(CFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -c $< -o ios-arm/$@ + xcrun -sdk iphoneos $(CC) $(CFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -arch armv7s -arch arm64 -c $< -o ios-arm/$@ lipo ios-x86/$@ ios-arm/$@ -create -output $@ else diff --git a/build_tools/build_detect_platform b/build_tools/build_detect_platform index 5a15aca333..94aafd62ef 100755 --- a/build_tools/build_detect_platform +++ b/build_tools/build_detect_platform @@ -87,7 +87,7 @@ PLATFORM_SHARED_CFLAGS="-fPIC" PLATFORM_SHARED_VERSIONED=false # generic port files (working on all platform by #ifdef) go directly in /port -GENERIC_PORT_FILES=`find $ROCKSDB_ROOT/port -name '*.cc' | tr "\n" " "` +GENERIC_PORT_FILES=`cd $ROCKSDB_ROOT; find port -name '*.cc' | tr "\n" " "` # On GCC, we pick libc's memcmp over GCC's memcmp via -fno-builtin-memcmp case "$TARGET_OS" in @@ -98,6 +98,13 @@ case "$TARGET_OS" in PLATFORM_SHARED_LDFLAGS="-dynamiclib -install_name " # PORT_FILES=port/darwin/darwin_specific.cc ;; + IOS) + PLATFORM=IOS + COMMON_FLAGS="$COMMON_FLAGS -DOS_MACOSX -DIOS_CROSS_COMPILE" + PLATFORM_SHARED_EXT=dylib + PLATFORM_SHARED_LDFLAGS="-dynamiclib -install_name " + CROSS_COMPILE=true + ;; Linux) PLATFORM=OS_LINUX COMMON_FLAGS="$COMMON_FLAGS -DOS_LINUX" diff --git a/db/c.cc b/db/c.cc index 2e55c0ea1a..b566daf648 100644 --- a/db/c.cc +++ b/db/c.cc @@ -25,12 +25,14 @@ #include "rocksdb/universal_compaction.h" #include "rocksdb/statistics.h" #include "rocksdb/slice_transform.h" +#include "rocksdb/table.h" using rocksdb::Cache; using rocksdb::Comparator; using rocksdb::CompressionType; using rocksdb::DB; using rocksdb::Env; +using rocksdb::InfoLogLevel; using rocksdb::FileLock; using rocksdb::FilterPolicy; using rocksdb::FlushOptions; @@ -656,6 +658,11 @@ void rocksdb_options_set_info_log(rocksdb_options_t* opt, rocksdb_logger_t* l) { } } +void rocksdb_options_set_info_log_level( + rocksdb_options_t* opt, int v) { + opt->rep.info_log_level = static_cast(v); +} + void rocksdb_options_set_write_buffer_size(rocksdb_options_t* opt, size_t s) { opt->rep.write_buffer_size = s; } @@ -714,6 +721,14 @@ void rocksdb_options_set_max_grandparent_overlap_factor( opt->rep.max_grandparent_overlap_factor = n; } +void rocksdb_options_set_max_bytes_for_level_multiplier_additional( + rocksdb_options_t* opt, int* level_values, size_t num_levels) { + opt->rep.max_bytes_for_level_multiplier_additional.resize(num_levels); + for (size_t i = 0; i < num_levels; ++i) { + opt->rep.max_bytes_for_level_multiplier_additional[i] = level_values[i]; + } +} + void rocksdb_options_enable_statistics(rocksdb_options_t* opt) { opt->rep.statistics = rocksdb::CreateDBStatistics(); } @@ -857,6 +872,24 @@ void rocksdb_options_set_advise_random_on_open( opt->rep.advise_random_on_open = v; } +void rocksdb_options_set_access_hint_on_compaction_start( + rocksdb_options_t* opt, int v) { + switch(v) { + case 0: + opt->rep.access_hint_on_compaction_start = rocksdb::Options::NONE; + break; + case 1: + opt->rep.access_hint_on_compaction_start = rocksdb::Options::NORMAL; + break; + case 2: + opt->rep.access_hint_on_compaction_start = rocksdb::Options::SEQUENTIAL; + break; + case 3: + opt->rep.access_hint_on_compaction_start = rocksdb::Options::WILLNEED; + break; + } +} + void rocksdb_options_set_use_adaptive_mutex( rocksdb_options_t* opt, unsigned char v) { opt->rep.use_adaptive_mutex = v; @@ -867,6 +900,11 @@ void rocksdb_options_set_bytes_per_sync( opt->rep.bytes_per_sync = v; } +void rocksdb_options_set_verify_checksums_in_compaction( + rocksdb_options_t* opt, unsigned char v) { + opt->rep.verify_checksums_in_compaction = v; +} + void rocksdb_options_set_filter_deletes( rocksdb_options_t* opt, unsigned char v) { opt->rep.filter_deletes = v; @@ -1003,11 +1041,48 @@ void rocksdb_options_set_hash_link_list_rep( opt->rep.memtable_factory.reset(factory); } +void rocksdb_options_set_plain_table_factory( + rocksdb_options_t *opt, uint32_t user_key_len, int bloom_bits_per_key, + double hash_table_ratio, size_t index_sparseness) { + static rocksdb::TableFactory* factory = 0; + if (!factory) { + factory = rocksdb::NewPlainTableFactory( + user_key_len, bloom_bits_per_key, + hash_table_ratio, index_sparseness); + } + opt->rep.table_factory.reset(factory); +} + void rocksdb_options_set_max_successive_merges( rocksdb_options_t* opt, size_t v) { opt->rep.max_successive_merges = v; } +void rocksdb_options_set_min_partial_merge_operands( + rocksdb_options_t* opt, uint32_t v) { + opt->rep.min_partial_merge_operands = v; +} + +void rocksdb_options_set_bloom_locality( + rocksdb_options_t* opt, uint32_t v) { + opt->rep.bloom_locality = v; +} + +void rocksdb_options_set_allow_thread_local( + rocksdb_options_t* opt, unsigned char v) { + opt->rep.allow_thread_local = v; +} + +void rocksdb_options_set_inplace_update_support( + rocksdb_options_t* opt, unsigned char v) { + opt->rep.inplace_update_support = v; +} + +void rocksdb_options_set_inplace_update_num_locks( + rocksdb_options_t* opt, size_t v) { + opt->rep.inplace_update_num_locks = v; +} + void rocksdb_options_set_compaction_style(rocksdb_options_t *opt, int style) { opt->rep.compaction_style = static_cast(style); } @@ -1022,21 +1097,14 @@ DB::OpenForReadOnly DB::MultiGet DB::KeyMayExist DB::GetOptions -DB::GetLiveFiles DB::GetSortedWalFiles DB::GetLatestSequenceNumber DB::GetUpdatesSince -DB::DeleteFile DB::GetDbIdentity DB::RunManualCompaction custom cache compaction_filter -max_bytes_for_level_multiplier_additional -access_hint_on_compaction_start -table_factory table_properties_collectors -inplace_update_support -inplace_update_num_locks */ rocksdb_comparator_t* rocksdb_comparator_create( diff --git a/db/c_test.c b/db/c_test.c index d8fa8eddb6..e6c5a9e672 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -439,9 +439,11 @@ int main(int argc, char** argv) { rocksdb_close(db); rocksdb_destroy_db(options, dbname, &err); - rocksdb_options_set_filter_policy(options, rocksdb_filterpolicy_create_bloom(10)); + rocksdb_filterpolicy_t* policy = rocksdb_filterpolicy_create_bloom(10); + rocksdb_options_set_filter_policy(options, policy); rocksdb_options_set_prefix_extractor(options, rocksdb_slicetransform_create_fixed_prefix(3)); rocksdb_options_set_hash_skip_list_rep(options, 50000, 4, 4); + rocksdb_options_set_plain_table_factory(options, 4, 10, 0.75, 16); db = rocksdb_open(options, dbname, &err); CheckNoError(err); @@ -477,6 +479,7 @@ int main(int argc, char** argv) { rocksdb_iter_get_error(iter, &err); CheckNoError(err); rocksdb_iter_destroy(iter); + rocksdb_filterpolicy_destroy(policy); } StartPhase("cleanup"); diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 30e187a191..b7ec66d961 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -649,7 +649,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( FileMetaData* f = nullptr; bool done = false; int start_index = 0; - unsigned int candidate_count; + unsigned int candidate_count = 0; assert(file_by_time.size() == version->files_[level].size()); unsigned int max_files_to_compact = std::min(max_merge_width, diff --git a/db/db_bench.cc b/db/db_bench.cc index 14d886f5c4..17c5a9e55d 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -1162,8 +1162,8 @@ class Benchmark { fresh_db = true; if (num_threads > 1) { fprintf(stderr, "filluniquerandom multithreaded not supported" - " set --threads=1"); - exit(1); + ", use 1 thread"); + num_threads = 1; } method = &Benchmark::WriteUniqueRandom; } else if (name == Slice("overwrite")) { diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index bf5132dc96..bdb443dd0b 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -7,6 +7,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#define __STDC_FORMAT_MACROS +#include #include #include #include @@ -17,6 +19,7 @@ #include "rocksdb/env.h" #include "port/port.h" #include "util/mutexlock.h" +#include "util/sync_point.h" namespace rocksdb { @@ -115,20 +118,55 @@ Status DBImpl::GetLiveFiles(std::vector& ret, } Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) { - // First get sorted files in archive dir, then append sorted files from main - // dir to maintain sorted order - - // list wal files in archive dir. + // First get sorted files in db dir, then get sorted files from archived + // dir, to avoid a race condition where a log file is moved to archived + // dir in between. Status s; + // list wal files in main db dir. + VectorLogPtr logs; + s = GetSortedWalsOfType(options_.wal_dir, logs, kAliveLogFile); + if (!s.ok()) { + return s; + } + + // Reproduce the race condition where a log file is moved + // to archived dir, between these two sync points, used in + // (DBTest,TransactionLogIteratorRace) + TEST_SYNC_POINT("DBImpl::GetSortedWalFiles:1"); + TEST_SYNC_POINT("DBImpl::GetSortedWalFiles:2"); + + files.clear(); + // list wal files in archive dir. std::string archivedir = ArchivalDirectory(options_.wal_dir); if (env_->FileExists(archivedir)) { - s = AppendSortedWalsOfType(archivedir, files, kArchivedLogFile); + s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile); if (!s.ok()) { return s; } } - // list wal files in main db dir. - return AppendSortedWalsOfType(options_.wal_dir, files, kAliveLogFile); + + uint64_t latest_archived_log_number = 0; + if (!files.empty()) { + latest_archived_log_number = files.back()->LogNumber(); + Log(options_.info_log, "Latest Archived log: %" PRIu64, + latest_archived_log_number); + } + + files.reserve(files.size() + logs.size()); + for (auto& log : logs) { + if (log->LogNumber() > latest_archived_log_number) { + files.push_back(std::move(log)); + } else { + // When the race condition happens, we could see the + // same log in both db dir and archived dir. Simply + // ignore the one in db dir. Note that, if we read + // archived dir first, we would have missed the log file. + Log(options_.info_log, "%s already moved to archive", + log->PathName().c_str()); + } + } + + return s; } } diff --git a/db/db_impl.cc b/db/db_impl.cc index 357d67a2b6..4f0ef367ad 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -66,6 +66,7 @@ #include "util/mutexlock.h" #include "util/perf_context_imp.h" #include "util/stop_watch.h" +#include "util/sync_point.h" namespace rocksdb { @@ -117,6 +118,14 @@ struct DBImpl::CompactionState { total_bytes(0) { } + // Create a client visible context of this compaction + CompactionFilter::Context GetFilterContextV1() { + CompactionFilter::Context context; + context.is_full_compaction = compaction->IsFullCompaction(); + context.is_manual_compaction = compaction->IsManualCompaction(); + return context; + } + // Create a client visible context of this compaction CompactionFilterContext GetFilterContext() { CompactionFilterContext context; @@ -284,6 +293,9 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { // Use dbname as default result.wal_dir = dbname; } + if (result.wal_dir.back() == '/') { + result.wal_dir = result.wal_dir.substr(result.wal_dir.size() - 1); + } return result; } @@ -719,7 +731,11 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { if (type == kLogFile && (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0)) { auto archived_log_name = ArchivedLogFileName(options_.wal_dir, number); + // The sync point below is used in (DBTest,TransactionLogIteratorRace) + TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:1"); Status s = env_->RenameFile(fname, archived_log_name); + // The sync point below is used in (DBTest,TransactionLogIteratorRace) + TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:2"); Log(options_.info_log, "Move log file %s to %s -- %s\n", fname.c_str(), archived_log_name.c_str(), s.ToString().c_str()); @@ -867,7 +883,7 @@ void DBImpl::PurgeObsoleteWALFiles() { size_t files_del_num = log_files_num - files_keep_num; VectorLogPtr archived_logs; - AppendSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile); + GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile); if (files_del_num > archived_logs.size()) { Log(options_.info_log, "Trying to delete more archived log files than " @@ -1335,7 +1351,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, s = cfd->imm()->InstallMemtableFlushResults( cfd, mems, versions_.get(), &mutex_, options_.info_log.get(), file_number, pending_outputs_, &deletion_state.memtables_to_free, - db_directory_.get()); + db_directory_.get(), log_buffer); } if (s.ok()) { @@ -1679,20 +1695,14 @@ struct CompareLogByPointer { } }; -Status DBImpl::AppendSortedWalsOfType(const std::string& path, +Status DBImpl::GetSortedWalsOfType(const std::string& path, VectorLogPtr& log_files, WalFileType log_type) { std::vector all_files; const Status status = env_->GetChildren(path, &all_files); if (!status.ok()) { return status; } - log_files.reserve(log_files.size() + all_files.size()); - VectorLogPtr::iterator pos_start; - if (!log_files.empty()) { - pos_start = log_files.end() - 1; - } else { - pos_start = log_files.begin(); - } + log_files.reserve(all_files.size()); for (const auto& f : all_files) { uint64_t number; FileType type; @@ -1718,7 +1728,7 @@ Status DBImpl::AppendSortedWalsOfType(const std::string& path, } } CompareLogByPointer compare_log_files; - std::sort(pos_start, log_files.end(), compare_log_files); + std::sort(log_files.begin(), log_files.end(), compare_log_files); return status; } @@ -1941,7 +1951,8 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, cfd->Ref(); Status flush_status; while (flush_status.ok() && cfd->imm()->IsFlushPending()) { - Log(options_.info_log, + LogToBuffer( + log_buffer, "BackgroundCallFlush doing FlushMemTableToOutputFile with column " "family %u, flush slots available %d", cfd->GetID(), options_.max_background_flushes - bg_flush_scheduled_); @@ -2398,7 +2409,8 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, } -Status DBImpl::InstallCompactionResults(CompactionState* compact) { +Status DBImpl::InstallCompactionResults(CompactionState* compact, + LogBuffer* log_buffer) { mutex_.AssertHeld(); // paranoia: verify that the files that we started with @@ -2414,12 +2426,12 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { return Status::Corruption("Compaction input files inconsistent"); } - Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes", - compact->compaction->num_input_files(0), - compact->compaction->level(), - compact->compaction->num_input_files(1), - compact->compaction->output_level(), - static_cast(compact->total_bytes)); + LogToBuffer(log_buffer, "Compacted %d@%d + %d@%d files => %lld bytes", + compact->compaction->num_input_files(0), + compact->compaction->level(), + compact->compaction->num_input_files(1), + compact->compaction->output_level(), + static_cast(compact->total_bytes)); // Add compaction outputs compact->compaction->AddInputDeletions(compact->compaction->edit()); @@ -2491,7 +2503,7 @@ Status DBImpl::ProcessKeyValueCompaction( auto compaction_filter = cfd->options()->compaction_filter; std::unique_ptr compaction_filter_from_factory = nullptr; if (!compaction_filter) { - auto context = compact->GetFilterContext(); + auto context = compact->GetFilterContextV1(); compaction_filter_from_factory = cfd->options()->compaction_filter_factory->CreateCompactionFilter( context); @@ -2828,7 +2840,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, int64_t imm_micros = 0; // Micros spent doing imm_ compactions ColumnFamilyData* cfd = compact->compaction->column_family_data(); - Log(options_.info_log, + LogToBuffer( + log_buffer, "[CF %u] Compacting %d@%d + %d@%d files, score %.2f slots available %d", cfd->GetID(), compact->compaction->num_input_files(0), compact->compaction->level(), compact->compaction->num_input_files(1), @@ -2836,7 +2849,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, options_.max_background_compactions - bg_compaction_scheduled_); char scratch[2345]; compact->compaction->Summary(scratch, sizeof(scratch)); - Log(options_.info_log, "Compaction start summary: %s\n", scratch); + LogToBuffer(log_buffer, "Compaction start summary: %s\n", scratch); assert(cfd->current()->NumLevelFiles(compact->compaction->level()) > 0); assert(compact->builder == nullptr); @@ -2866,6 +2879,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, // Release mutex while we're actually doing the compaction work mutex_.Unlock(); + log_buffer->FlushBufferToLog(); const uint64_t start_micros = env_->NowMicros(); unique_ptr input(versions_->MakeInputIterator(compact->compaction)); @@ -3083,11 +3097,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, ReleaseCompactionUnusedFileNumbers(compact); if (status.ok()) { - status = InstallCompactionResults(compact); + status = InstallCompactionResults(compact, log_buffer); InstallSuperVersion(cfd, deletion_state); } Version::LevelSummaryStorage tmp; - Log(options_.info_log, + LogToBuffer( + log_buffer, "compacted to: %s, %.1f MB/sec, level %d, files in(%d, %d) out(%d) " "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " "write-amplify(%.1f) %s\n", @@ -4103,6 +4118,7 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) { } else { unique_ptr lfile; + log::Writer* new_log = nullptr; MemTable* new_mem = nullptr; // Attempt to switch to a new memtable and trigger flush of old. @@ -4121,19 +4137,27 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) { // (compression, etc) but err on the side of caution. lfile->SetPreallocationBlockSize(1.1 * cfd->options()->write_buffer_size); + new_log = new log::Writer(std::move(lfile)); new_mem = new MemTable(cfd->internal_comparator(), *cfd->options()); new_superversion = new SuperVersion(); } + Log(options_.info_log, + "New memtable created with log file: #%lu\n", + (unsigned long)new_log_number); } mutex_.Lock(); if (!s.ok()) { // Avoid chewing through file number space in a tight loop. versions_->ReuseFileNumber(new_log_number); assert(!new_mem); + assert(!new_log); break; } logfile_number_ = new_log_number; - log_.reset(new log::Writer(std::move(lfile))); + assert(new_log != nullptr); + // TODO(icanadi) delete outside of mutex + delete log_.release(); + log_.reset(new_log); cfd->mem()->SetNextLogNumber(logfile_number_); cfd->imm()->Add(cfd->mem()); if (force) { @@ -4157,6 +4181,7 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) { cfd->GetID(), (unsigned long)logfile_number_); force = false; // Do not force another compaction if have room MaybeScheduleFlushOrCompaction(); + // TODO(icanadi) delete outside of mutex) delete cfd->InstallSuperVersion(new_superversion, &mutex_); } } diff --git a/db/db_impl.h b/db/db_impl.h index d754a6e518..4130c34543 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -311,8 +311,11 @@ class DBImpl : public DB { LogBuffer* log_buffer); uint64_t SlowdownAmount(int n, double bottom, double top); + + // TODO(icanadi) free superversion_to_free and old_log outside of mutex Status MakeRoomForWrite(ColumnFamilyData* cfd, bool force /* flush even if there is room? */); + void BuildBatchGroup(Writer** last_writer, autovector* write_batch_group); @@ -360,15 +363,16 @@ class DBImpl : public DB { Status OpenCompactionOutputFile(CompactionState* compact); Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input); - Status InstallCompactionResults(CompactionState* compact); + Status InstallCompactionResults(CompactionState* compact, + LogBuffer* log_buffer); void AllocateCompactionOutputFileNumbers(CompactionState* compact); void ReleaseCompactionUnusedFileNumbers(CompactionState* compact); void PurgeObsoleteWALFiles(); - Status AppendSortedWalsOfType(const std::string& path, - VectorLogPtr& log_files, - WalFileType type); + Status GetSortedWalsOfType(const std::string& path, + VectorLogPtr& log_files, + WalFileType type); // Requires: all_logs should be sorted with earliest log file first // Retains all log files in all_logs which contain updates with seq no. diff --git a/db/db_test.cc b/db/db_test.cc index 9f6230a822..b97273d2eb 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -37,6 +37,7 @@ #include "util/mutexlock.h" #include "util/statistics.h" #include "util/testharness.h" +#include "util/sync_point.h" #include "util/testutil.h" namespace rocksdb { @@ -2628,7 +2629,7 @@ class KeepFilterFactory : public CompactionFilterFactory { : check_context_(check_context) {} virtual std::unique_ptr CreateCompactionFilter( - const CompactionFilterContext& context) override { + const CompactionFilter::Context& context) override { if (check_context_) { ASSERT_EQ(expect_full_compaction_.load(), context.is_full_compaction); ASSERT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction); @@ -2645,7 +2646,7 @@ class KeepFilterFactory : public CompactionFilterFactory { class DeleteFilterFactory : public CompactionFilterFactory { public: virtual std::unique_ptr CreateCompactionFilter( - const CompactionFilterContext& context) override { + const CompactionFilter::Context& context) override { if (context.is_manual_compaction) { return std::unique_ptr(new DeleteFilter()); } else { @@ -2661,7 +2662,7 @@ class ChangeFilterFactory : public CompactionFilterFactory { explicit ChangeFilterFactory() {} virtual std::unique_ptr CreateCompactionFilter( - const CompactionFilterContext& context) override { + const CompactionFilter::Context& context) override { return std::unique_ptr(new ChangeFilter()); } @@ -5386,6 +5387,51 @@ TEST(DBTest, TransactionLogIterator) { } while (ChangeCompactOptions()); } +TEST(DBTest, TransactionLogIteratorRace) { + // Setup sync point dependency to reproduce the race condition of + // a log file moved to archived dir, in the middle of GetSortedWalFiles + rocksdb::SyncPoint::GetInstance()->LoadDependency( + { { "DBImpl::GetSortedWalFiles:1", "DBImpl::PurgeObsoleteFiles:1" }, + { "DBImpl::PurgeObsoleteFiles:2", "DBImpl::GetSortedWalFiles:2" }, + }); + + do { + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + Options options = OptionsForLogIterTest(); + DestroyAndReopen(&options); + Put("key1", DummyString(1024)); + dbfull()->Flush(FlushOptions()); + Put("key2", DummyString(1024)); + dbfull()->Flush(FlushOptions()); + Put("key3", DummyString(1024)); + dbfull()->Flush(FlushOptions()); + Put("key4", DummyString(1024)); + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U); + + { + auto iter = OpenTransactionLogIter(0); + ExpectRecords(4, iter); + } + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + // trigger async flush, and log move. Well, log move will + // wait until the GetSortedWalFiles:1 to reproduce the race + // condition + FlushOptions flush_options; + flush_options.wait = false; + dbfull()->Flush(flush_options); + + // "key5" would be written in a new memtable and log + Put("key5", DummyString(1024)); + { + // this iter would miss "key4" if not fixed + auto iter = OpenTransactionLogIter(0); + ExpectRecords(5, iter); + } + } while (ChangeCompactOptions()); +} + TEST(DBTest, TransactionLogIteratorMoveOverZeroFiles) { do { Options options = OptionsForLogIterTest(); diff --git a/db/memtable.cc b/db/memtable.cc index d8ca68c6d9..73ce600883 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -142,6 +142,11 @@ Slice MemTableRep::UserKey(const char* key) const { return Slice(slice.data(), slice.size() - 8); } +KeyHandle MemTableRep::Allocate(const size_t len, char** buf) { + *buf = arena_->Allocate(len); + return static_cast(*buf); +} + // Encode a suitable internal key target for "target" and return it. // Uses *scratch as scratch space, and the returned pointer will point // into this scratch space. @@ -243,7 +248,9 @@ void MemTable::Add(SequenceNumber s, ValueType type, const size_t encoded_len = VarintLength(internal_key_size) + internal_key_size + VarintLength(val_size) + val_size; - char* buf = arena_.Allocate(encoded_len); + char* buf = nullptr; + KeyHandle handle = table_->Allocate(encoded_len, &buf); + assert(buf != nullptr); char* p = EncodeVarint32(buf, internal_key_size); memcpy(p, key.data(), key_size); p += key_size; @@ -252,7 +259,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, p = EncodeVarint32(p, val_size); memcpy(p, value.data(), val_size); assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len); - table_->Insert(buf); + table_->Insert(handle); if (prefix_bloom_) { assert(prefix_extractor_); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 641ab3ed5f..655ded7f13 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -12,6 +12,7 @@ #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "util/coding.h" +#include "util/log_buffer.h" namespace rocksdb { @@ -145,7 +146,7 @@ Status MemTableList::InstallMemtableFlushResults( ColumnFamilyData* cfd, const autovector& mems, VersionSet* vset, port::Mutex* mu, Logger* info_log, uint64_t file_number, std::set& pending_outputs, autovector* to_delete, - Directory* db_directory) { + Directory* db_directory, LogBuffer* log_buffer) { mu->AssertHeld(); // flush was sucessful @@ -175,9 +176,8 @@ Status MemTableList::InstallMemtableFlushResults( break; } - Log(info_log, - "Level-0 commit table #%lu started", - (unsigned long)m->file_number_); + LogToBuffer(log_buffer, "Level-0 commit table #%lu started", + (unsigned long)m->file_number_); // this can release and reacquire the mutex. s = vset->LogAndApply(cfd, &m->edit_, mu, db_directory); @@ -191,10 +191,8 @@ Status MemTableList::InstallMemtableFlushResults( uint64_t mem_id = 1; // how many memtables has been flushed. do { if (s.ok()) { // commit new state - Log(info_log, - "Level-0 commit table #%lu: memtable #%lu done", - (unsigned long)m->file_number_, - (unsigned long)mem_id); + LogToBuffer(log_buffer, "Level-0 commit table #%lu: memtable #%lu done", + (unsigned long)m->file_number_, (unsigned long)mem_id); current_->Remove(m); assert(m->file_number_ > 0); diff --git a/db/memtable_list.h b/db/memtable_list.h index 31f189322d..903305779c 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -21,6 +21,7 @@ #include "rocksdb/iterator.h" #include "rocksdb/options.h" #include "util/autovector.h" +#include "util/log_buffer.h" namespace rocksdb { @@ -110,7 +111,8 @@ class MemTableList { Logger* info_log, uint64_t file_number, std::set& pending_outputs, autovector* to_delete, - Directory* db_directory); + Directory* db_directory, + LogBuffer* log_buffer); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). diff --git a/db/table_cache.h b/db/table_cache.h index 02063bdfda..97e0f6a279 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -25,6 +25,8 @@ namespace rocksdb { class Env; struct FileMetaData; +// TODO(sdong): try to come up with a better API to pass the file information +// other than simply passing FileMetaData. class TableCache { public: TableCache(const std::string& dbname, const Options* options, diff --git a/db/version_set.cc b/db/version_set.cc index 76d83693f6..d6c6b5772e 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -143,6 +143,18 @@ bool SomeFileOverlapsRange( return !BeforeFile(ucmp, largest_user_key, files[index]); } +namespace { +// Used for LevelFileNumIterator to pass "block handle" value, +// which actually means file information in this iterator. +// It contains subset of fields of FileMetaData, that is sufficient +// for table cache to use. +struct EncodedFileMetaData { + uint64_t number; // file number + uint64_t file_size; // file size + Cache::Handle* table_reader_handle; // cached table reader's handler +}; +} // namespace + // An internal iterator. For a given version/level pair, yields // information about the files in the level. For a given entry, key() // is the largest key that occurs in the file, and value() is an @@ -184,14 +196,19 @@ class Version::LevelFileNumIterator : public Iterator { } Slice value() const { assert(Valid()); - return Slice(reinterpret_cast((*flist_)[index_]), - sizeof(FileMetaData)); + auto* file_meta = (*flist_)[index_]; + current_value_.number = file_meta->number; + current_value_.file_size = file_meta->file_size; + current_value_.table_reader_handle = file_meta->table_reader_handle; + return Slice(reinterpret_cast(¤t_value_), + sizeof(EncodedFileMetaData)); } virtual Status status() const { return Status::OK(); } private: const InternalKeyComparator icmp_; const std::vector* const flist_; uint32_t index_; + mutable EncodedFileMetaData current_value_; }; static Iterator* GetFileIterator(void* arg, const ReadOptions& options, @@ -199,7 +216,7 @@ static Iterator* GetFileIterator(void* arg, const ReadOptions& options, const InternalKeyComparator& icomparator, const Slice& file_value, bool for_compaction) { TableCache* cache = reinterpret_cast(arg); - if (file_value.size() != sizeof(FileMetaData)) { + if (file_value.size() != sizeof(EncodedFileMetaData)) { return NewErrorIterator( Status::Corruption("FileReader invoked with unexpected value")); } else { @@ -211,11 +228,13 @@ static Iterator* GetFileIterator(void* arg, const ReadOptions& options, options_copy.prefix = nullptr; } - const FileMetaData* meta_file = - reinterpret_cast(file_value.data()); + const EncodedFileMetaData* encoded_meta = + reinterpret_cast(file_value.data()); + FileMetaData meta(encoded_meta->number, encoded_meta->file_size); + meta.table_reader_handle = encoded_meta->table_reader_handle; return cache->NewIterator( - options.prefix ? options_copy : options, soptions, icomparator, - *meta_file, nullptr /* don't need reference to table*/, for_compaction); + options.prefix ? options_copy : options, soptions, icomparator, meta, + nullptr /* don't need reference to table*/, for_compaction); } } @@ -234,12 +253,13 @@ bool Version::PrefixMayMatch(const ReadOptions& options, // key() will always be the biggest value for this SST? may_match = true; } else { - const FileMetaData* meta_file = - reinterpret_cast(level_iter->value().data()); - + const EncodedFileMetaData* encoded_meta = + reinterpret_cast( + level_iter->value().data()); + FileMetaData meta(encoded_meta->number, encoded_meta->file_size); + meta.table_reader_handle = encoded_meta->table_reader_handle; may_match = cfd_->table_cache()->PrefixMayMatch( - options, cfd_->internal_comparator(), *meta_file, internal_prefix, - nullptr); + options, cfd_->internal_comparator(), meta, internal_prefix, nullptr); } return may_match; } diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index a6bc900852..7d4a374d9b 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -243,6 +243,7 @@ extern void rocksdb_options_set_paranoid_checks( rocksdb_options_t*, unsigned char); extern void rocksdb_options_set_env(rocksdb_options_t*, rocksdb_env_t*); extern void rocksdb_options_set_info_log(rocksdb_options_t*, rocksdb_logger_t*); +extern void rocksdb_options_set_info_log_level(rocksdb_options_t*, int); extern void rocksdb_options_set_write_buffer_size(rocksdb_options_t*, size_t); extern void rocksdb_options_set_max_open_files(rocksdb_options_t*, int); extern void rocksdb_options_set_cache(rocksdb_options_t*, rocksdb_cache_t*); @@ -275,6 +276,8 @@ extern void rocksdb_options_set_expanded_compaction_factor( rocksdb_options_t*, int); extern void rocksdb_options_set_max_grandparent_overlap_factor( rocksdb_options_t*, int); +extern void rocksdb_options_set_max_bytes_for_level_multiplier_additional( + rocksdb_options_t*, int* level_values, size_t num_levels); extern void rocksdb_options_enable_statistics(rocksdb_options_t*); extern void rocksdb_options_set_max_write_buffer_number(rocksdb_options_t*, int); @@ -330,10 +333,14 @@ extern void rocksdb_options_set_block_size_deviation( rocksdb_options_t*, int); extern void rocksdb_options_set_advise_random_on_open( rocksdb_options_t*, unsigned char); +extern void rocksdb_options_set_access_hint_on_compaction_start( + rocksdb_options_t*, int); extern void rocksdb_options_set_use_adaptive_mutex( rocksdb_options_t*, unsigned char); extern void rocksdb_options_set_bytes_per_sync( rocksdb_options_t*, uint64_t); +extern void rocksdb_options_set_verify_checksums_in_compaction( + rocksdb_options_t*, unsigned char); extern void rocksdb_options_set_filter_deletes( rocksdb_options_t*, unsigned char); extern void rocksdb_options_set_max_sequential_skip_in_iterations( @@ -348,6 +355,7 @@ extern void rocksdb_options_prepare_for_bulk_load(rocksdb_options_t*); extern void rocksdb_options_set_memtable_vector_rep(rocksdb_options_t*); extern void rocksdb_options_set_hash_skip_list_rep(rocksdb_options_t*, size_t, int32_t, int32_t); extern void rocksdb_options_set_hash_link_list_rep(rocksdb_options_t*, size_t); +extern void rocksdb_options_set_plain_table_factory(rocksdb_options_t*, uint32_t, int, double, size_t); extern void rocksdb_options_set_max_bytes_for_level_base(rocksdb_options_t* opt, uint64_t n); extern void rocksdb_options_set_stats_dump_period_sec(rocksdb_options_t* opt, unsigned int sec); @@ -360,6 +368,16 @@ extern void rocksdb_options_set_memtable_prefix_bloom_probes( rocksdb_options_t*, uint32_t); extern void rocksdb_options_set_max_successive_merges( rocksdb_options_t*, size_t); +extern void rocksdb_options_set_min_partial_merge_operands( + rocksdb_options_t*, uint32_t); +extern void rocksdb_options_set_bloom_locality( + rocksdb_options_t*, uint32_t); +extern void rocksdb_options_set_allow_thread_local( + rocksdb_options_t*, unsigned char); +extern void rocksdb_options_set_inplace_update_support( + rocksdb_options_t*, unsigned char); +extern void rocksdb_options_set_inplace_update_num_locks( + rocksdb_options_t*, size_t); enum { rocksdb_no_compression = 0, diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index f54ee620c5..59b050923e 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -31,6 +31,15 @@ struct CompactionFilterContext { class CompactionFilter { public: + // Context information of a compaction run + struct Context { + // Does this compaction run include all data files + bool is_full_compaction; + // Is this compaction requested by the client (true), + // or is it occurring as an automatic compaction process + bool is_manual_compaction; + }; + virtual ~CompactionFilter() {} // The compaction process invokes this @@ -105,7 +114,7 @@ class CompactionFilterFactory { virtual ~CompactionFilterFactory() { } virtual std::unique_ptr CreateCompactionFilter( - const CompactionFilterContext& context) = 0; + const CompactionFilter::Context& context) = 0; // Returns a name that identifies this compaction filter factory. virtual const char* Name() const = 0; @@ -115,8 +124,8 @@ class CompactionFilterFactory { // return any filter class DefaultCompactionFilterFactory : public CompactionFilterFactory { public: - virtual std::unique_ptr - CreateCompactionFilter(const CompactionFilterContext& context) override { + virtual std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) override { return std::unique_ptr(nullptr); } diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index 6c65bdc3fa..05f1aebca3 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -45,6 +45,8 @@ class LookupKey; class Slice; class SliceTransform; +typedef void* KeyHandle; + class MemTableRep { public: // KeyComparator provides a means to compare keys, which are internal keys @@ -62,11 +64,19 @@ class MemTableRep { virtual ~KeyComparator() { } }; + explicit MemTableRep(Arena* arena) : arena_(arena) {} + + // Allocate a buf of len size for storing key. The idea is that a specific + // memtable representation knows its underlying data structure better. By + // allowing it to allocate memory, it can possibly put correlated stuff + // in consecutive memory area to make processor prefetching more efficient. + virtual KeyHandle Allocate(const size_t len, char** buf); + // Insert key into the collection. (The caller will pack key and value into a - // single buffer and pass that in as the parameter to Insert) + // single buffer and pass that in as the parameter to Insert). // REQUIRES: nothing that compares equal to key is currently in the // collection. - virtual void Insert(const char* key) = 0; + virtual void Insert(KeyHandle handle) = 0; // Returns true iff an entry that compares equal to key is in the collection. virtual bool Contains(const char* key) const = 0; @@ -153,6 +163,8 @@ class MemTableRep { // When *key is an internal key concatenated with the value, returns the // user key. virtual Slice UserKey(const char* key) const; + + Arena* arena_; }; // This is the base class for all factories that are used by RocksDB to create diff --git a/include/rocksdb/perf_context.h b/include/rocksdb/perf_context.h index 61adad6b73..63eddb61d5 100644 --- a/include/rocksdb/perf_context.h +++ b/include/rocksdb/perf_context.h @@ -64,7 +64,11 @@ struct PerfContext { uint64_t write_memtable_time; }; +#if defined(IOS_CROSS_COMPILE) +extern PerfContext perf_context; +#else extern __thread PerfContext perf_context; +#endif } diff --git a/java/Makefile b/java/Makefile index 8168d3418c..10dd4f1104 100644 --- a/java/Makefile +++ b/java/Makefile @@ -1,4 +1,4 @@ -NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options +NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options org.rocksdb.WriteBatch org.rocksdb.WriteBatchInternal org.rocksdb.WriteBatchTest org.rocksdb.WriteOptions NATIVE_INCLUDE = ./include ROCKSDB_JAR = rocksdbjni.jar @@ -19,3 +19,6 @@ sample: java -ea -Djava.library.path=.:../ -cp ".:./*" RocksDBSample /tmp/rocksdbjni @rm -rf /tmp/rocksdbjni @rm -rf /tmp/rocksdbjni_not_found + +test: + java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.WriteBatchTest diff --git a/java/RocksDBSample.java b/java/RocksDBSample.java index 4e06bb29de..e6421778cc 100644 --- a/java/RocksDBSample.java +++ b/java/RocksDBSample.java @@ -93,6 +93,21 @@ public class RocksDBSample { assert(len == RocksDB.NOT_FOUND); len = db.get(testKey, enoughArray); assert(len == testValue.length); + + db.remove(testKey); + len = db.get(testKey, enoughArray); + assert(len == RocksDB.NOT_FOUND); + + // repeat the test with WriteOptions + WriteOptions writeOpts = new WriteOptions(); + writeOpts.setSync(true); + writeOpts.setDisableWAL(true); + db.put(writeOpts, testKey, testValue); + len = db.get(testKey, enoughArray); + assert(len == testValue.length); + assert(new String(testValue).equals( + new String(enoughArray, 0, len))); + writeOpts.dispose(); } catch (RocksDBException e) { System.err.println(e); } diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java index b086940815..bdab8be1b3 100644 --- a/java/org/rocksdb/RocksDB.java +++ b/java/org/rocksdb/RocksDB.java @@ -1,4 +1,4 @@ -// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// Copyright (c) 2014, Facebook, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. @@ -61,7 +61,18 @@ public class RocksDB { * @param value the value associated with the specified key. */ public void put(byte[] key, byte[] value) throws RocksDBException { - put(key, key.length, value, value.length); + put(nativeHandle_, key, key.length, value, value.length); + } + + /** + * Set the database entry for "key" to "value". + * + * @param key the specified key to be inserted. + * @param value the value associated with the specified key. + */ + public void put(WriteOptions writeOpts, byte[] key, byte[] value) + throws RocksDBException { + put(nativeHandle_, writeOpts.nativeHandle_, key, key.length, value, value.length); } /** @@ -77,7 +88,7 @@ public class RocksDB { * found. */ public int get(byte[] key, byte[] value) throws RocksDBException { - return get(key, key.length, value, value.length); + return get(nativeHandle_, key, key.length, value, value.length); } /** @@ -92,7 +103,26 @@ public class RocksDB { * @see RocksDBException */ public byte[] get(byte[] key) throws RocksDBException { - return get(key, key.length); + return get(nativeHandle_, key, key.length); + } + + /** + * Remove the database entry (if any) for "key". Returns OK on + * success, and a non-OK status on error. It is not an error if "key" + * did not exist in the database. + */ + public void remove(byte[] key) throws RocksDBException { + remove(nativeHandle_, key, key.length); + } + + /** + * Remove the database entry (if any) for "key". Returns OK on + * success, and a non-OK status on error. It is not an error if "key" + * did not exist in the database. + */ + public void remove(WriteOptions writeOpt, byte[] key) + throws RocksDBException { + remove(nativeHandle_, writeOpt.nativeHandle_, key, key.length); } @Override protected void finalize() { @@ -108,14 +138,24 @@ public class RocksDB { // native methods private native void open0(String path) throws RocksDBException; - private native void open(long optionsHandle, String path) throws RocksDBException; + private native void open( + long optionsHandle, String path) throws RocksDBException; private native void put( + long handle, byte[] key, int keyLen, + byte[] value, int valueLen) throws RocksDBException; + private native void put( + long handle, long writeOptHandle, byte[] key, int keyLen, byte[] value, int valueLen) throws RocksDBException; private native int get( - byte[] key, int keyLen, + long handle, byte[] key, int keyLen, byte[] value, int valueLen) throws RocksDBException; private native byte[] get( + long handle, byte[] key, int keyLen) throws RocksDBException; + private native void remove( + long handle, byte[] key, int keyLen) throws RocksDBException; + private native void remove( + long handle, long writeOptHandle, byte[] key, int keyLen) throws RocksDBException; private native void close0(); diff --git a/java/org/rocksdb/WriteBatch.java b/java/org/rocksdb/WriteBatch.java new file mode 100644 index 0000000000..acacee3f0a --- /dev/null +++ b/java/org/rocksdb/WriteBatch.java @@ -0,0 +1,121 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb; + +import java.lang.*; +import java.util.*; + +/** + * WriteBatch holds a collection of updates to apply atomically to a DB. + * + * The updates are applied in the order in which they are added + * to the WriteBatch. For example, the value of "key" will be "v3" + * after the following batch is written: + * + * batch.put("key", "v1"); + * batch.remove("key"); + * batch.put("key", "v2"); + * batch.put("key", "v3"); + * + * Multiple threads can invoke const methods on a WriteBatch without + * external synchronization, but if any of the threads may call a + * non-const method, all threads accessing the same WriteBatch must use + * external synchronization. + */ +public class WriteBatch { + public WriteBatch() { + nativeHandle_ = 0; + newWriteBatch(0); + } + + public WriteBatch(int reserved_bytes) { + nativeHandle_ = 0; + newWriteBatch(reserved_bytes); + } + + /** + * Returns the number of updates in the batch. + */ + public native int count(); + + /** + * Store the mapping "key->value" in the database. + */ + public void put(byte[] key, byte[] value) { + put(key, key.length, value, value.length); + } + + /** + * Merge "value" with the existing value of "key" in the database. + * "key->merge(existing, value)" + */ + public void merge(byte[] key, byte[] value) { + merge(key, key.length, value, value.length); + } + + /** + * If the database contains a mapping for "key", erase it. Else do nothing. + */ + public void remove(byte[] key) { + remove(key, key.length); + } + + /** + * Append a blob of arbitrary size to the records in this batch. The blob will + * be stored in the transaction log but not in any other file. In particular, + * it will not be persisted to the SST files. When iterating over this + * WriteBatch, WriteBatch::Handler::LogData will be called with the contents + * of the blob as it is encountered. Blobs, puts, deletes, and merges will be + * encountered in the same order in thich they were inserted. The blob will + * NOT consume sequence number(s) and will NOT increase the count of the batch + * + * Example application: add timestamps to the transaction log for use in + * replication. + */ + public void putLogData(byte[] blob) { + putLogData(blob, blob.length); + } + + /** + * Clear all updates buffered in this batch + */ + public native void clear(); + + /** + * Delete the c++ side pointer. + */ + public synchronized void dispose() { + if (nativeHandle_ != 0) { + dispose0(); + } + } + + @Override protected void finalize() { + dispose(); + } + + private native void newWriteBatch(int reserved_bytes); + private native void put(byte[] key, int keyLen, + byte[] value, int valueLen); + private native void merge(byte[] key, int keyLen, + byte[] value, int valueLen); + private native void remove(byte[] key, int keyLen); + private native void putLogData(byte[] blob, int blobLen); + private native void dispose0(); + + private long nativeHandle_; +} + +/** + * Package-private class which provides java api to access + * c++ WriteBatchInternal. + */ +class WriteBatchInternal { + static native void setSequence(WriteBatch batch, long sn); + static native long sequence(WriteBatch batch); + static native void append(WriteBatch b1, WriteBatch b2); +} + diff --git a/java/org/rocksdb/WriteBatchTest.java b/java/org/rocksdb/WriteBatchTest.java new file mode 100644 index 0000000000..283caca65c --- /dev/null +++ b/java/org/rocksdb/WriteBatchTest.java @@ -0,0 +1,125 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +package org.rocksdb; + +import java.util.*; +import java.lang.*; +import java.io.UnsupportedEncodingException; + +/** + * This class mimics the db/write_batch_test.cc in the c++ rocksdb library. + */ +public class WriteBatchTest { + static { + System.loadLibrary("rocksdbjni"); + } + + public static void main(String args[]) { + System.out.println("Testing WriteBatchTest.Empty ==="); + Empty(); + + System.out.println("Testing WriteBatchTest.Multiple ==="); + Multiple(); + + System.out.println("Testing WriteBatchTest.Append ==="); + Append(); + + System.out.println("Testing WriteBatchTest.Blob ==="); + Blob(); + + // The following tests have not yet ported. + // Continue(); + // PutGatherSlices(); + + System.out.println("Passed all WriteBatchTest!"); + } + + static void Empty() { + WriteBatch batch = new WriteBatch(); + assert(batch.count() == 0); + } + + static void Multiple() { + try { + WriteBatch batch = new WriteBatch(); + batch.put("foo".getBytes("US-ASCII"), "bar".getBytes("US-ASCII")); + batch.remove("box".getBytes("US-ASCII")); + batch.put("baz".getBytes("US-ASCII"), "boo".getBytes("US-ASCII")); + WriteBatchInternal.setSequence(batch, 100); + assert(100 == WriteBatchInternal.sequence(batch)); + assert(3 == batch.count()); + assert(new String("Put(baz, boo)@102" + + "Delete(box)@101" + + "Put(foo, bar)@100") + .equals(new String(getContents(batch), "US-ASCII"))); + } catch (UnsupportedEncodingException e) { + System.err.println(e); + assert(false); + } + } + + static void Append() { + WriteBatch b1 = new WriteBatch(); + WriteBatch b2 = new WriteBatch(); + WriteBatchInternal.setSequence(b1, 200); + WriteBatchInternal.setSequence(b2, 300); + WriteBatchInternal.append(b1, b2); + assert(getContents(b1).length == 0); + assert(b1.count() == 0); + try { + b2.put("a".getBytes("US-ASCII"), "va".getBytes("US-ASCII")); + WriteBatchInternal.append(b1, b2); + assert("Put(a, va)@200".equals(new String(getContents(b1), "US-ASCII"))); + assert(1 == b1.count()); + b2.clear(); + b2.put("b".getBytes("US-ASCII"), "vb".getBytes("US-ASCII")); + WriteBatchInternal.append(b1, b2); + assert(new String("Put(a, va)@200" + + "Put(b, vb)@201") + .equals(new String(getContents(b1), "US-ASCII"))); + assert(2 == b1.count()); + b2.remove("foo".getBytes("US-ASCII")); + WriteBatchInternal.append(b1, b2); + assert(new String("Put(a, va)@200" + + "Put(b, vb)@202" + + "Put(b, vb)@201" + + "Delete(foo)@203") + .equals(new String(getContents(b1), "US-ASCII"))); + assert(4 == b1.count()); + } catch (UnsupportedEncodingException e) { + System.err.println(e); + assert(false); + } + } + + static void Blob() { + WriteBatch batch = new WriteBatch(); + try { + batch.put("k1".getBytes("US-ASCII"), "v1".getBytes("US-ASCII")); + batch.put("k2".getBytes("US-ASCII"), "v2".getBytes("US-ASCII")); + batch.put("k3".getBytes("US-ASCII"), "v3".getBytes("US-ASCII")); + batch.putLogData("blob1".getBytes("US-ASCII")); + batch.remove("k2".getBytes("US-ASCII")); + batch.putLogData("blob2".getBytes("US-ASCII")); + batch.merge("foo".getBytes("US-ASCII"), "bar".getBytes("US-ASCII")); + assert(5 == batch.count()); + assert(new String("Merge(foo, bar)@4" + + "Put(k1, v1)@0" + + "Delete(k2)@3" + + "Put(k2, v2)@1" + + "Put(k3, v3)@2") + .equals(new String(getContents(batch), "US-ASCII"))); + } catch (UnsupportedEncodingException e) { + System.err.println(e); + assert(false); + } + } + + static native byte[] getContents(WriteBatch batch); +} diff --git a/java/org/rocksdb/WriteOptions.java b/java/org/rocksdb/WriteOptions.java new file mode 100644 index 0000000000..26f0e2b7c9 --- /dev/null +++ b/java/org/rocksdb/WriteOptions.java @@ -0,0 +1,96 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb; + +/** + * Options that control write operations. + * + * Note that developers should call WriteOptions.dispose() to release the + * c++ side memory before a WriteOptions instance runs out of scope. + */ +public class WriteOptions { + public WriteOptions() { + nativeHandle_ = 0; + newWriteOptions(); + } + + public synchronized void dispose() { + if (nativeHandle_ != 0) { + dispose0(nativeHandle_); + } + } + + /** + * If true, the write will be flushed from the operating system + * buffer cache (by calling WritableFile::Sync()) before the write + * is considered complete. If this flag is true, writes will be + * slower. + * + * If this flag is false, and the machine crashes, some recent + * writes may be lost. Note that if it is just the process that + * crashes (i.e., the machine does not reboot), no writes will be + * lost even if sync==false. + * + * In other words, a DB write with sync==false has similar + * crash semantics as the "write()" system call. A DB write + * with sync==true has similar crash semantics to a "write()" + * system call followed by "fdatasync()". + * + * Default: false + */ + public void setSync(boolean flag) { + setSync(nativeHandle_, flag); + } + + /** + * If true, the write will be flushed from the operating system + * buffer cache (by calling WritableFile::Sync()) before the write + * is considered complete. If this flag is true, writes will be + * slower. + * + * If this flag is false, and the machine crashes, some recent + * writes may be lost. Note that if it is just the process that + * crashes (i.e., the machine does not reboot), no writes will be + * lost even if sync==false. + * + * In other words, a DB write with sync==false has similar + * crash semantics as the "write()" system call. A DB write + * with sync==true has similar crash semantics to a "write()" + * system call followed by "fdatasync()". + */ + public boolean sync() { + return sync(nativeHandle_); + } + + /** + * If true, writes will not first go to the write ahead log, + * and the write may got lost after a crash. + */ + public void setDisableWAL(boolean flag) { + setDisableWAL(nativeHandle_, flag); + } + + /** + * If true, writes will not first go to the write ahead log, + * and the write may got lost after a crash. + */ + public boolean disableWAL() { + return disableWAL(nativeHandle_); + } + + @Override protected void finalize() { + dispose(); + } + + private native void newWriteOptions(); + private native void setSync(long handle, boolean flag); + private native boolean sync(long handle); + private native void setDisableWAL(long handle, boolean flag); + private native boolean disableWAL(long handle); + private native void dispose0(long handle); + + protected long nativeHandle_; +} diff --git a/java/rocksjni/options.cc b/java/rocksjni/options.cc index ef308bc4d0..69224f6d0d 100644 --- a/java/rocksjni/options.cc +++ b/java/rocksjni/options.cc @@ -11,8 +11,10 @@ #include #include "include/org_rocksdb_Options.h" +#include "include/org_rocksdb_WriteOptions.h" #include "rocksjni/portal.h" #include "rocksdb/db.h" +#include "rocksdb/options.h" /* * Class: org_rocksdb_Options @@ -55,3 +57,72 @@ jboolean Java_org_rocksdb_Options_createIfMissing( JNIEnv* env, jobject jobj, jlong jhandle) { return reinterpret_cast(jhandle)->create_if_missing; } + +////////////////////////////////////////////////////////////////////////////// +// WriteOptions + +/* + * Class: org_rocksdb_WriteOptions + * Method: newWriteOptions + * Signature: ()V + */ +void Java_org_rocksdb_WriteOptions_newWriteOptions( + JNIEnv* env, jobject jwrite_options) { + rocksdb::WriteOptions* op = new rocksdb::WriteOptions(); + rocksdb::WriteOptionsJni::setHandle(env, jwrite_options, op); +} + +/* + * Class: org_rocksdb_WriteOptions + * Method: dispose0 + * Signature: ()V + */ +void Java_org_rocksdb_WriteOptions_dispose0( + JNIEnv* env, jobject jwrite_options, jlong jhandle) { + auto write_options = reinterpret_cast(jhandle); + delete write_options; + + rocksdb::WriteOptionsJni::setHandle(env, jwrite_options, nullptr); +} + +/* + * Class: org_rocksdb_WriteOptions + * Method: setSync + * Signature: (JZ)V + */ +void Java_org_rocksdb_WriteOptions_setSync( + JNIEnv* env, jobject jwrite_options, jlong jhandle, jboolean jflag) { + reinterpret_cast(jhandle)->sync = jflag; +} + +/* + * Class: org_rocksdb_WriteOptions + * Method: sync + * Signature: (J)Z + */ +jboolean Java_org_rocksdb_WriteOptions_sync( + JNIEnv* env, jobject jwrite_options, jlong jhandle) { + return reinterpret_cast(jhandle)->sync; +} + +/* + * Class: org_rocksdb_WriteOptions + * Method: setDisableWAL + * Signature: (JZ)V + */ +void Java_org_rocksdb_WriteOptions_setDisableWAL( + JNIEnv* env, jobject jwrite_options, jlong jhandle, jboolean jflag) { + reinterpret_cast(jhandle)->disableWAL = jflag; +} + +/* + * Class: org_rocksdb_WriteOptions + * Method: disableWAL + * Signature: (J)Z + */ +jboolean Java_org_rocksdb_WriteOptions_disableWAL( + JNIEnv* env, jobject jwrite_options, jlong jhandle) { + return reinterpret_cast(jhandle)->disableWAL; +} + + diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index a90b825146..5b0524aece 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -109,5 +109,66 @@ class OptionsJni { } }; +class WriteOptionsJni { + public: + // Get the java class id of org.rocksdb.WriteOptions. + static jclass getJClass(JNIEnv* env) { + static jclass jclazz = env->FindClass("org/rocksdb/WriteOptions"); + assert(jclazz != nullptr); + return jclazz; + } + + // Get the field id of the member variable of org.rocksdb.WriteOptions + // that stores the pointer to rocksdb::WriteOptions + static jfieldID getHandleFieldID(JNIEnv* env) { + static jfieldID fid = env->GetFieldID( + getJClass(env), "nativeHandle_", "J"); + assert(fid != nullptr); + return fid; + } + + // Get the pointer to rocksdb::WriteOptions + static rocksdb::WriteOptions* getHandle(JNIEnv* env, jobject jobj) { + return reinterpret_cast( + env->GetLongField(jobj, getHandleFieldID(env))); + } + + // Pass the rocksdb::WriteOptions pointer to the java side. + static void setHandle(JNIEnv* env, jobject jobj, rocksdb::WriteOptions* op) { + env->SetLongField( + jobj, getHandleFieldID(env), + reinterpret_cast(op)); + } +}; + +class WriteBatchJni { + public: + static jclass getJClass(JNIEnv* env) { + static jclass jclazz = env->FindClass("org/rocksdb/WriteBatch"); + assert(jclazz != nullptr); + return jclazz; + } + + static jfieldID getHandleFieldID(JNIEnv* env) { + static jfieldID fid = env->GetFieldID( + getJClass(env), "nativeHandle_", "J"); + assert(fid != nullptr); + return fid; + } + + // Get the pointer to rocksdb::WriteBatch of the specified + // org.rocksdb.WriteBatch. + static rocksdb::WriteBatch* getHandle(JNIEnv* env, jobject jwb) { + return reinterpret_cast( + env->GetLongField(jwb, getHandleFieldID(env))); + } + + // Pass the rocksdb::WriteBatch pointer to the java side. + static void setHandle(JNIEnv* env, jobject jwb, rocksdb::WriteBatch* wb) { + env->SetLongField( + jwb, getHandleFieldID(env), + reinterpret_cast(wb)); + } +}; } // namespace rocksdb #endif // JAVA_ROCKSJNI_PORTAL_H_ diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index b5d42c0c74..ccd87105dd 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -15,6 +15,9 @@ #include "rocksjni/portal.h" #include "rocksdb/db.h" +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB::Open + void rocksdb_open_helper( JNIEnv* env, jobject java_db, jstring jdb_path, const rocksdb::Options& opt) { rocksdb::DB* db; @@ -54,27 +57,20 @@ void Java_org_rocksdb_RocksDB_open( rocksdb_open_helper(env, jdb, jdb_path, *options); } -/* - * Class: org_rocksdb_RocksDB - * Method: put - * Signature: ([BI[BI)V - */ -void Java_org_rocksdb_RocksDB_put( - JNIEnv* env, jobject jdb, +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB::Put + +void rocksdb_put_helper( + JNIEnv* env, rocksdb::DB* db, const rocksdb::WriteOptions& write_options, jbyteArray jkey, jint jkey_len, jbyteArray jvalue, jint jvalue_len) { - rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, jdb); - jboolean isCopy; - jbyte* key = env->GetByteArrayElements(jkey, &isCopy); - jbyte* value = env->GetByteArrayElements(jvalue, &isCopy); - rocksdb::Slice key_slice( - reinterpret_cast(key), jkey_len); - rocksdb::Slice value_slice( - reinterpret_cast(value), jvalue_len); + jbyte* key = env->GetByteArrayElements(jkey, 0); + jbyte* value = env->GetByteArrayElements(jvalue, 0); + rocksdb::Slice key_slice(reinterpret_cast(key), jkey_len); + rocksdb::Slice value_slice(reinterpret_cast(value), jvalue_len); - rocksdb::Status s = db->Put( - rocksdb::WriteOptions(), key_slice, value_slice); + rocksdb::Status s = db->Put(write_options, key_slice, value_slice); // trigger java unref on key and value. // by passing JNI_ABORT, it will simply release the reference without @@ -90,12 +86,53 @@ void Java_org_rocksdb_RocksDB_put( /* * Class: org_rocksdb_RocksDB - * Method: get - * Signature: ([BI)[B + * Method: put + * Signature: (J[BI[BI)V */ -jbyteArray Java_org_rocksdb_RocksDB_get___3BI( - JNIEnv* env, jobject jdb, jbyteArray jkey, jint jkey_len) { - rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, jdb); +void Java_org_rocksdb_RocksDB_put__J_3BI_3BI( + JNIEnv* env, jobject jdb, jlong jdb_handle, + jbyteArray jkey, jint jkey_len, + jbyteArray jvalue, jint jvalue_len) { + auto db = reinterpret_cast(jdb_handle); + static const rocksdb::WriteOptions default_write_options = + rocksdb::WriteOptions(); + + rocksdb_put_helper(env, db, default_write_options, + jkey, jkey_len, + jvalue, jvalue_len); +} + +/* + * Class: org_rocksdb_RocksDB + * Method: put + * Signature: (JJ[BI[BI)V + */ +void Java_org_rocksdb_RocksDB_put__JJ_3BI_3BI( + JNIEnv* env, jobject jdb, + jlong jdb_handle, jlong jwrite_options_handle, + jbyteArray jkey, jint jkey_len, + jbyteArray jvalue, jint jvalue_len) { + auto db = reinterpret_cast(jdb_handle); + auto write_options = reinterpret_cast( + jwrite_options_handle); + + rocksdb_put_helper(env, db, *write_options, + jkey, jkey_len, + jvalue, jvalue_len); +} + +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB::Get + +/* + * Class: org_rocksdb_RocksDB + * Method: get + * Signature: (J[BI)[B + */ +jbyteArray Java_org_rocksdb_RocksDB_get__J_3BI( + JNIEnv* env, jobject jdb, jlong jdb_handle, + jbyteArray jkey, jint jkey_len) { + auto db = reinterpret_cast(jdb_handle); jboolean isCopy; jbyte* key = env->GetByteArrayElements(jkey, &isCopy); @@ -131,20 +168,17 @@ jbyteArray Java_org_rocksdb_RocksDB_get___3BI( /* * Class: org_rocksdb_RocksDB * Method: get - * Signature: ([BI[BI)I + * Signature: (J[BI[BI)I */ -jint Java_org_rocksdb_RocksDB_get___3BI_3BI( - JNIEnv* env, jobject jdb, +jint Java_org_rocksdb_RocksDB_get__J_3BI_3BI( + JNIEnv* env, jobject jdb, jlong jdb_handle, jbyteArray jkey, jint jkey_len, jbyteArray jvalue, jint jvalue_len) { static const int kNotFound = -1; static const int kStatusError = -2; + auto db = reinterpret_cast(jdb_handle); - rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, jdb); - - jboolean isCopy; - jbyte* key = env->GetByteArrayElements(jkey, &isCopy); - jbyte* value = env->GetByteArrayElements(jvalue, &isCopy); + jbyte* key = env->GetByteArrayElements(jkey, 0); rocksdb::Slice key_slice( reinterpret_cast(key), jkey_len); @@ -160,10 +194,8 @@ jint Java_org_rocksdb_RocksDB_get___3BI_3BI( env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); if (s.IsNotFound()) { - env->ReleaseByteArrayElements(jvalue, value, JNI_ABORT); return kNotFound; } else if (!s.ok()) { - env->ReleaseByteArrayElements(jvalue, value, JNI_ABORT); // Here since we are throwing a Java exception from c++ side. // As a result, c++ does not know calling this function will in fact // throwing an exception. As a result, the execution flow will @@ -179,11 +211,65 @@ jint Java_org_rocksdb_RocksDB_get___3BI_3BI( int cvalue_len = static_cast(cvalue.size()); int length = std::min(jvalue_len, cvalue_len); - memcpy(value, cvalue.c_str(), length); - env->ReleaseByteArrayElements(jvalue, value, JNI_COMMIT); - return static_cast(cvalue_len); + env->SetByteArrayRegion( + jvalue, 0, length, + reinterpret_cast(cvalue.c_str())); + return cvalue_len; } +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB::Delete() +void rocksdb_remove_helper( + JNIEnv* env, rocksdb::DB* db, const rocksdb::WriteOptions& write_options, + jbyteArray jkey, jint jkey_len) { + jbyte* key = env->GetByteArrayElements(jkey, 0); + rocksdb::Slice key_slice(reinterpret_cast(key), jkey_len); + + rocksdb::Status s = db->Delete(write_options, key_slice); + + // trigger java unref on key and value. + // by passing JNI_ABORT, it will simply release the reference without + // copying the result back to the java byte array. + env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); + + if (!s.ok()) { + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + } + return; +} + +/* + * Class: org_rocksdb_RocksDB + * Method: remove + * Signature: (J[BI)V + */ +void Java_org_rocksdb_RocksDB_remove__J_3BI( + JNIEnv* env, jobject jdb, jlong jdb_handle, + jbyteArray jkey, jint jkey_len) { + auto db = reinterpret_cast(jdb_handle); + static const rocksdb::WriteOptions default_write_options = + rocksdb::WriteOptions(); + + rocksdb_remove_helper(env, db, default_write_options, jkey, jkey_len); +} + +/* + * Class: org_rocksdb_RocksDB + * Method: remove + * Signature: (JJ[BI)V + */ +void Java_org_rocksdb_RocksDB_remove__JJ_3BI( + JNIEnv* env, jobject jdb, jlong jdb_handle, + jlong jwrite_options, jbyteArray jkey, jint jkey_len) { + auto db = reinterpret_cast(jdb_handle); + auto write_options = reinterpret_cast(jwrite_options); + + rocksdb_remove_helper(env, db, *write_options, jkey, jkey_len); +} + +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB::~DB() + /* * Class: org_rocksdb_RocksDB * Method: close0 @@ -192,8 +278,8 @@ jint Java_org_rocksdb_RocksDB_get___3BI_3BI( void Java_org_rocksdb_RocksDB_close0( JNIEnv* env, jobject java_db) { rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, java_db); + assert(db != nullptr); delete db; - db = nullptr; - rocksdb::RocksDBJni::setHandle(env, java_db, db); + rocksdb::RocksDBJni::setHandle(env, java_db, nullptr); } diff --git a/java/rocksjni/write_batch.cc b/java/rocksjni/write_batch.cc new file mode 100644 index 0000000000..f72c3ba6de --- /dev/null +++ b/java/rocksjni/write_batch.cc @@ -0,0 +1,263 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// This file implements the "bridge" between Java and C++ and enables +// calling c++ rocksdb::WriteBatch methods from Java side. +#include + +#include "include/org_rocksdb_WriteBatch.h" +#include "include/org_rocksdb_WriteBatchInternal.h" +#include "include/org_rocksdb_WriteBatchTest.h" +#include "rocksjni/portal.h" +#include "rocksdb/db.h" +#include "db/memtable.h" +#include "rocksdb/write_batch.h" +#include "db/write_batch_internal.h" +#include "rocksdb/env.h" +#include "rocksdb/memtablerep.h" +#include "util/logging.h" +#include "util/testharness.h" + +/* + * Class: org_rocksdb_WriteBatch + * Method: newWriteBatch + * Signature: (I)V + */ +void Java_org_rocksdb_WriteBatch_newWriteBatch( + JNIEnv* env, jobject jobj, jint jreserved_bytes) { + rocksdb::WriteBatch* wb = new rocksdb::WriteBatch( + static_cast(jreserved_bytes)); + + rocksdb::WriteBatchJni::setHandle(env, jobj, wb); +} + +/* + * Class: org_rocksdb_WriteBatch + * Method: count + * Signature: ()I + */ +jint Java_org_rocksdb_WriteBatch_count(JNIEnv* env, jobject jobj) { + rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(wb != nullptr); + + return static_cast(wb->Count()); +} + +/* + * Class: org_rocksdb_WriteBatch + * Method: clear + * Signature: ()V + */ +void Java_org_rocksdb_WriteBatch_clear(JNIEnv* env, jobject jobj) { + rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(wb != nullptr); + + wb->Clear(); +} + +/* + * Class: org_rocksdb_WriteBatch + * Method: put + * Signature: ([BI[BI)V + */ +void Java_org_rocksdb_WriteBatch_put( + JNIEnv* env, jobject jobj, + jbyteArray jkey, jint jkey_len, + jbyteArray jvalue, jint jvalue_len) { + rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(wb != nullptr); + + jbyte* key = env->GetByteArrayElements(jkey, nullptr); + jbyte* value = env->GetByteArrayElements(jvalue, nullptr); + rocksdb::Slice key_slice(reinterpret_cast(key), jkey_len); + rocksdb::Slice value_slice(reinterpret_cast(value), jvalue_len); + wb->Put(key_slice, value_slice); + env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); + env->ReleaseByteArrayElements(jvalue, value, JNI_ABORT); +} + +/* + * Class: org_rocksdb_WriteBatch + * Method: merge + * Signature: ([BI[BI)V + */ +JNIEXPORT void JNICALL Java_org_rocksdb_WriteBatch_merge( + JNIEnv* env, jobject jobj, + jbyteArray jkey, jint jkey_len, + jbyteArray jvalue, jint jvalue_len) { + rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(wb != nullptr); + + jbyte* key = env->GetByteArrayElements(jkey, nullptr); + jbyte* value = env->GetByteArrayElements(jvalue, nullptr); + rocksdb::Slice key_slice(reinterpret_cast(key), jkey_len); + rocksdb::Slice value_slice(reinterpret_cast(value), jvalue_len); + wb->Merge(key_slice, value_slice); + env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); + env->ReleaseByteArrayElements(jvalue, value, JNI_ABORT); +} + +/* + * Class: org_rocksdb_WriteBatch + * Method: remove + * Signature: ([BI)V + */ +JNIEXPORT void JNICALL Java_org_rocksdb_WriteBatch_remove( + JNIEnv* env, jobject jobj, + jbyteArray jkey, jint jkey_len) { + rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(wb != nullptr); + + jbyte* key = env->GetByteArrayElements(jkey, nullptr); + rocksdb::Slice key_slice(reinterpret_cast(key), jkey_len); + wb->Delete(key_slice); + env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); +} + +/* + * Class: org_rocksdb_WriteBatch + * Method: putLogData + * Signature: ([BI)V + */ +void Java_org_rocksdb_WriteBatch_putLogData( + JNIEnv* env, jobject jobj, jbyteArray jblob, jint jblob_len) { + rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(wb != nullptr); + + jbyte* blob = env->GetByteArrayElements(jblob, nullptr); + rocksdb::Slice blob_slice(reinterpret_cast(blob), jblob_len); + wb->PutLogData(blob_slice); + env->ReleaseByteArrayElements(jblob, blob, JNI_ABORT); +} + +/* + * Class: org_rocksdb_WriteBatch + * Method: dispose0 + * Signature: ()V + */ +void Java_org_rocksdb_WriteBatch_dispose0(JNIEnv* env, jobject jobj) { + rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(wb != nullptr); + delete wb; + + rocksdb::WriteBatchJni::setHandle(env, jobj, nullptr); +} + +/* + * Class: org_rocksdb_WriteBatchInternal + * Method: setSequence + * Signature: (Lorg/rocksdb/WriteBatch;J)V + */ +void Java_org_rocksdb_WriteBatchInternal_setSequence( + JNIEnv* env, jclass jclazz, jobject jobj, jlong jsn) { + rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(wb != nullptr); + + rocksdb::WriteBatchInternal::SetSequence( + wb, static_cast(jsn)); +} + +/* + * Class: org_rocksdb_WriteBatchInternal + * Method: sequence + * Signature: (Lorg/rocksdb/WriteBatch;)J + */ +jlong Java_org_rocksdb_WriteBatchInternal_sequence( + JNIEnv* env, jclass jclazz, jobject jobj) { + rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(wb != nullptr); + + return static_cast(rocksdb::WriteBatchInternal::Sequence(wb)); +} + +/* + * Class: org_rocksdb_WriteBatchInternal + * Method: append + * Signature: (Lorg/rocksdb/WriteBatch;Lorg/rocksdb/WriteBatch;)V + */ +void Java_org_rocksdb_WriteBatchInternal_append( + JNIEnv* env, jclass jclazz, jobject jwb1, jobject jwb2) { + rocksdb::WriteBatch* wb1 = rocksdb::WriteBatchJni::getHandle(env, jwb1); + assert(wb1 != nullptr); + rocksdb::WriteBatch* wb2 = rocksdb::WriteBatchJni::getHandle(env, jwb2); + assert(wb2 != nullptr); + + rocksdb::WriteBatchInternal::Append(wb1, wb2); +} + +/* + * Class: org_rocksdb_WriteBatchTest + * Method: getContents + * Signature: (Lorg/rocksdb/WriteBatch;)[B + */ +jbyteArray Java_org_rocksdb_WriteBatchTest_getContents( + JNIEnv* env, jclass jclazz, jobject jobj) { + rocksdb::WriteBatch* b = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(b != nullptr); + + // todo: Currently the following code is directly copied from + // db/write_bench_test.cc. It could be implemented in java once + // all the necessary components can be accessed via jni api. + + rocksdb::InternalKeyComparator cmp(rocksdb::BytewiseComparator()); + auto factory = std::make_shared(); + rocksdb::Options options; + options.memtable_factory = factory; + rocksdb::MemTable* mem = new rocksdb::MemTable(cmp, options); + mem->Ref(); + std::string state; + rocksdb::Status s = rocksdb::WriteBatchInternal::InsertInto(b, mem, &options); + int count = 0; + rocksdb::Iterator* iter = mem->NewIterator(); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + rocksdb::ParsedInternalKey ikey; + memset(reinterpret_cast(&ikey), 0, sizeof(ikey)); + ASSERT_TRUE(rocksdb::ParseInternalKey(iter->key(), &ikey)); + switch (ikey.type) { + case rocksdb::kTypeValue: + state.append("Put("); + state.append(ikey.user_key.ToString()); + state.append(", "); + state.append(iter->value().ToString()); + state.append(")"); + count++; + break; + case rocksdb::kTypeMerge: + state.append("Merge("); + state.append(ikey.user_key.ToString()); + state.append(", "); + state.append(iter->value().ToString()); + state.append(")"); + count++; + break; + case rocksdb::kTypeDeletion: + state.append("Delete("); + state.append(ikey.user_key.ToString()); + state.append(")"); + count++; + break; + default: + assert(false); + break; + } + state.append("@"); + state.append(rocksdb::NumberToString(ikey.sequence)); + } + delete iter; + if (!s.ok()) { + state.append(s.ToString()); + } else if (count != rocksdb::WriteBatchInternal::Count(b)) { + state.append("CountMismatch()"); + } + delete mem->Unref(); + + jbyteArray jstate = env->NewByteArray(state.size()); + env->SetByteArrayRegion( + jstate, 0, state.size(), + reinterpret_cast(state.c_str())); + + return jstate; +} + diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index c3adf3ac53..967836811e 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -45,7 +45,9 @@ namespace { // The longest the prefix of the cache key used to identify blocks can be. // We are using the fact that we know for Posix files the unique ID is three // varints. -const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length*3+1; +// For some reason, compiling for iOS complains that this variable is unused +const size_t kMaxCacheKeyPrefixSize __attribute__((unused)) = + kMaxVarint64Length * 3 + 1; // Read the block identified by "handle" from "file". // The only relevant option is options.verify_checksums for now. diff --git a/tools/auto_sanity_test.sh b/tools/auto_sanity_test.sh new file mode 100755 index 0000000000..2d63c0a85f --- /dev/null +++ b/tools/auto_sanity_test.sh @@ -0,0 +1,71 @@ +TMP_DIR="/tmp/rocksdb-sanity-test" + +if [ "$#" -lt 2 ]; then + echo "usage: ./auto_sanity_test.sh [new_commit] [old_commit]" + echo "Missing either [new_commit] or [old_commit], perform sanity check with the latest and 10th latest commits." + recent_commits=`git log | grep -e "^commit [a-z0-9]\+$"| head -n10 | sed -e 's/commit //g'` + commit_new=`echo "$recent_commits" | head -n1` + commit_old=`echo "$recent_commits" | tail -n1` + echo "the most recent commits are:" + echo "$recent_commits" +else + commit_new=$1 + commit_old=$2 +fi + +if [ ! -d $TMP_DIR ]; then + mkdir $TMP_DIR +fi +dir_new="${TMP_DIR}/${commit_new}" +dir_old="${TMP_DIR}/${commit_old}" + +function makestuff() { + echo "make clean" + make clean > /dev/null + echo "make db_sanity_test -j32" + make db_sanity_test -j32 > /dev/null + if [ $? -ne 0 ]; then + echo "[ERROR] Failed to perform 'make db_sanity_test'" + exit 1 + fi +} + +rm -r -f $dir_new +rm -r -f $dir_old + +echo "Running db sanity check with commits $commit_new and $commit_old." + +echo "=============================================================" +echo "Making build $commit_new" +makestuff +mv db_sanity_test new_db_sanity_test +echo "Creating db based on the new commit --- $commit_new" +./new_db_sanity_test $dir_new create + +echo "=============================================================" +echo "Making build $commit_old" +makestuff +mv db_sanity_test old_db_sanity_test +echo "Creating db based on the old commit --- $commit_old" +./old_db_sanity_test $dir_old create + +echo "=============================================================" +echo "Verifying new db $dir_new using the old commit --- $commit_old" +./old_db_sanity_test $dir_new verify +if [ $? -ne 0 ]; then + echo "[ERROR] Verification of $dir_new using commit $commit_old failed." + exit 2 +fi + +echo "=============================================================" +echo "Verifying old db $dir_old using the new commit --- $commit_new" +./new_db_sanity_test $dir_old verify +if [ $? -ne 0 ]; then + echo "[ERROR] Verification of $dir_old using commit $commit_new failed." + exit 2 +fi + +rm old_db_sanity_test +rm new_db_sanity_test + +echo "Auto sanity test passed!" diff --git a/util/crc32c.cc b/util/crc32c.cc index 04312d6f67..d27fb4be98 100644 --- a/util/crc32c.cc +++ b/util/crc32c.cc @@ -314,39 +314,16 @@ static inline void Slow_CRC32(uint64_t* l, uint8_t const **p) { } static inline void Fast_CRC32(uint64_t* l, uint8_t const **p) { - #ifdef __SSE4_2__ +#ifdef __SSE4_2__ *l = _mm_crc32_u64(*l, LE_LOAD64(*p)); *p += 8; - #else +#else Slow_CRC32(l, p); - #endif +#endif } -// Detect if SS42 or not. -static bool isSSE42() { - #ifdef __GNUC__ - uint32_t c_; - uint32_t d_; - __asm__("cpuid" : "=c"(c_), "=d"(d_) : "a"(1) : "ebx"); - return c_ & (1U << 20); // copied from CpuId.h in Folly. - #else - return false; - #endif -} - -typedef void (*Function)(uint64_t*, uint8_t const**); - -static inline Function Choose_CRC32() { - return isSSE42() ? Fast_CRC32 : Slow_CRC32; -} - -static Function func = Choose_CRC32(); - -static inline void CRC32(uint64_t* l, uint8_t const **p) { - func(l, p); -} - -uint32_t Extend(uint32_t crc, const char* buf, size_t size) { +template +uint32_t ExtendImpl(uint32_t crc, const char* buf, size_t size) { const uint8_t *p = reinterpret_cast(buf); const uint8_t *e = p + size; uint64_t l = crc ^ 0xffffffffu; @@ -388,5 +365,29 @@ uint32_t Extend(uint32_t crc, const char* buf, size_t size) { return l ^ 0xffffffffu; } +// Detect if SS42 or not. +static bool isSSE42() { +#if defined(__GNUC__) && defined(__x86_64__) && !defined(IOS_CROSS_COMPILE) + uint32_t c_; + uint32_t d_; + __asm__("cpuid" : "=c"(c_), "=d"(d_) : "a"(1) : "ebx"); + return c_ & (1U << 20); // copied from CpuId.h in Folly. +#else + return false; +#endif +} + +typedef uint32_t (*Function)(uint32_t, const char*, size_t); + +static inline Function Choose_Extend() { + return isSSE42() ? ExtendImpl : ExtendImpl; +} + +Function ChosenExtend = Choose_Extend(); + +uint32_t Extend(uint32_t crc, const char* buf, size_t size) { + return ChosenExtend(crc, buf, size); +} + } // namespace crc32c } // namespace rocksdb diff --git a/util/dynamic_bloom_test.cc b/util/dynamic_bloom_test.cc index e8bbc38e17..4a34d509a0 100644 --- a/util/dynamic_bloom_test.cc +++ b/util/dynamic_bloom_test.cc @@ -3,6 +3,8 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. +#define __STDC_FORMAT_MACROS +#include #include #include @@ -74,11 +76,12 @@ TEST(DynamicBloomTest, VaryingLengths) { // Count number of filters that significantly exceed the false positive rate int mediocre_filters = 0; int good_filters = 0; + uint32_t num_probes = static_cast(FLAGS_num_probes); fprintf(stderr, "bits_per_key: %d num_probes: %d\n", - FLAGS_bits_per_key, FLAGS_num_probes); + FLAGS_bits_per_key, num_probes); - for (uint32_t cl_per_block = 0; cl_per_block < FLAGS_num_probes; + for (uint32_t cl_per_block = 0; cl_per_block < num_probes; ++cl_per_block) { for (uint32_t num = 1; num <= 10000; num = NextNum(num)) { uint32_t bloom_bits = 0; @@ -88,7 +91,7 @@ TEST(DynamicBloomTest, VaryingLengths) { bloom_bits = std::max(num * FLAGS_bits_per_key, cl_per_block * CACHE_LINE_SIZE * 8); } - DynamicBloom bloom(bloom_bits, cl_per_block, FLAGS_num_probes); + DynamicBloom bloom(bloom_bits, cl_per_block, num_probes); for (uint64_t i = 0; i < num; i++) { bloom.Add(Key(i, buffer)); ASSERT_TRUE(bloom.MayContain(Key(i, buffer))); @@ -127,6 +130,7 @@ TEST(DynamicBloomTest, VaryingLengths) { TEST(DynamicBloomTest, perf) { StopWatchNano timer(Env::Default()); + uint32_t num_probes = static_cast(FLAGS_num_probes); if (!FLAGS_enable_perf) { return; @@ -134,9 +138,9 @@ TEST(DynamicBloomTest, perf) { for (uint64_t m = 1; m <= 8; ++m) { const uint64_t num_keys = m * 8 * 1024 * 1024; - fprintf(stderr, "testing %luM keys\n", m * 8); + fprintf(stderr, "testing %" PRIu64 "M keys\n", m * 8); - DynamicBloom std_bloom(num_keys * 10, 0, FLAGS_num_probes); + DynamicBloom std_bloom(num_keys * 10, 0, num_probes); timer.Start(); for (uint64_t i = 1; i <= num_keys; ++i) { @@ -144,7 +148,7 @@ TEST(DynamicBloomTest, perf) { } uint64_t elapsed = timer.ElapsedNanos(); - fprintf(stderr, "standard bloom, avg add latency %lu\n", + fprintf(stderr, "standard bloom, avg add latency %" PRIu64 "\n", elapsed / num_keys); uint64_t count = 0; @@ -155,13 +159,13 @@ TEST(DynamicBloomTest, perf) { } } elapsed = timer.ElapsedNanos(); - fprintf(stderr, "standard bloom, avg query latency %lu\n", + fprintf(stderr, "standard bloom, avg query latency %" PRIu64 "\n", elapsed / count); ASSERT_TRUE(count == num_keys); - for (int cl_per_block = 1; cl_per_block <= FLAGS_num_probes; + for (uint32_t cl_per_block = 1; cl_per_block <= num_probes; ++cl_per_block) { - DynamicBloom blocked_bloom(num_keys * 10, cl_per_block, FLAGS_num_probes); + DynamicBloom blocked_bloom(num_keys * 10, cl_per_block, num_probes); timer.Start(); for (uint64_t i = 1; i <= num_keys; ++i) { @@ -169,7 +173,7 @@ TEST(DynamicBloomTest, perf) { } uint64_t elapsed = timer.ElapsedNanos(); - fprintf(stderr, "blocked bloom(%d), avg add latency %lu\n", + fprintf(stderr, "blocked bloom(%d), avg add latency %" PRIu64 "\n", cl_per_block, elapsed / num_keys); uint64_t count = 0; @@ -182,7 +186,7 @@ TEST(DynamicBloomTest, perf) { } elapsed = timer.ElapsedNanos(); - fprintf(stderr, "blocked bloom(%d), avg query latency %lu\n", + fprintf(stderr, "blocked bloom(%d), avg query latency %" PRIu64 "\n", cl_per_block, elapsed / count); ASSERT_TRUE(count == num_keys); } diff --git a/util/hash_linklist_rep.cc b/util/hash_linklist_rep.cc index f1f064fb30..441f5c9939 100644 --- a/util/hash_linklist_rep.cc +++ b/util/hash_linklist_rep.cc @@ -22,12 +22,6 @@ namespace { typedef const char* Key; struct Node { - explicit Node(const Key& k) : - key(k) { - } - - Key const key; - // Accessors/mutators for links. Wrapped in methods so we can // add the appropriate barriers as necessary. Node* Next() { @@ -40,17 +34,19 @@ struct Node { // pointer observes a fully initialized version of the inserted node. next_.Release_Store(x); } - // No-barrier variants that can be safely used in a few locations. Node* NoBarrier_Next() { return reinterpret_cast(next_.NoBarrier_Load()); } + void NoBarrier_SetNext(Node* x) { next_.NoBarrier_Store(x); } -private: + private: port::AtomicPointer next_; + public: + char key[0]; }; class HashLinkListRep : public MemTableRep { @@ -58,7 +54,9 @@ class HashLinkListRep : public MemTableRep { HashLinkListRep(const MemTableRep::KeyComparator& compare, Arena* arena, const SliceTransform* transform, size_t bucket_size); - virtual void Insert(const char* key) override; + virtual KeyHandle Allocate(const size_t len, char** buf) override; + + virtual void Insert(KeyHandle handle) override; virtual bool Contains(const char* key) const override; @@ -93,8 +91,6 @@ class HashLinkListRep : public MemTableRep { const SliceTransform* transform_; const MemTableRep::KeyComparator& compare_; - // immutable after construction - Arena* const arena_; bool BucketContains(Node* head, const Slice& key) const; @@ -114,11 +110,6 @@ class HashLinkListRep : public MemTableRep { return GetBucket(GetHash(slice)); } - Node* NewNode(const Key& key) { - char* mem = arena_->AllocateAligned(sizeof(Node)); - return new (mem) Node(key); - } - bool Equal(const Slice& a, const Key& b) const { return (compare_(b, a) == 0); } @@ -318,10 +309,10 @@ class HashLinkListRep : public MemTableRep { HashLinkListRep::HashLinkListRep(const MemTableRep::KeyComparator& compare, Arena* arena, const SliceTransform* transform, size_t bucket_size) - : bucket_size_(bucket_size), + : MemTableRep(arena), + bucket_size_(bucket_size), transform_(transform), - compare_(compare), - arena_(arena) { + compare_(compare) { char* mem = arena_->AllocateAligned( sizeof(port::AtomicPointer) * bucket_size); @@ -335,15 +326,22 @@ HashLinkListRep::HashLinkListRep(const MemTableRep::KeyComparator& compare, HashLinkListRep::~HashLinkListRep() { } -void HashLinkListRep::Insert(const char* key) { - assert(!Contains(key)); - Slice internal_key = GetLengthPrefixedSlice(key); +KeyHandle HashLinkListRep::Allocate(const size_t len, char** buf) { + char* mem = arena_->AllocateAligned(sizeof(Node) + len); + Node* x = new (mem) Node(); + *buf = x->key; + return static_cast(x); +} + +void HashLinkListRep::Insert(KeyHandle handle) { + Node* x = static_cast(handle); + assert(!Contains(x->key)); + Slice internal_key = GetLengthPrefixedSlice(x->key); auto transformed = GetPrefix(internal_key); auto& bucket = buckets_[GetHash(transformed)]; Node* head = static_cast(bucket.Acquire_Load()); if (!head) { - Node* x = NewNode(key); // NoBarrier_SetNext() suffices since we will add a barrier when // we publish a pointer to "x" in prev[i]. x->NoBarrier_SetNext(nullptr); @@ -372,9 +370,7 @@ void HashLinkListRep::Insert(const char* key) { } // Our data structure does not allow duplicate insertion - assert(cur == nullptr || !Equal(key, cur->key)); - - Node* x = NewNode(key); + assert(cur == nullptr || !Equal(x->key, cur->key)); // NoBarrier_SetNext() suffices since we will add a barrier when // we publish a pointer to "x" in prev[i]. diff --git a/util/hash_skiplist_rep.cc b/util/hash_skiplist_rep.cc index ee92e79522..230fae9573 100644 --- a/util/hash_skiplist_rep.cc +++ b/util/hash_skiplist_rep.cc @@ -25,7 +25,7 @@ class HashSkipListRep : public MemTableRep { const SliceTransform* transform, size_t bucket_size, int32_t skiplist_height, int32_t skiplist_branching_factor); - virtual void Insert(const char* key) override; + virtual void Insert(KeyHandle handle) override; virtual bool Contains(const char* key) const override; @@ -225,7 +225,8 @@ HashSkipListRep::HashSkipListRep(const MemTableRep::KeyComparator& compare, Arena* arena, const SliceTransform* transform, size_t bucket_size, int32_t skiplist_height, int32_t skiplist_branching_factor) - : bucket_size_(bucket_size), + : MemTableRep(arena), + bucket_size_(bucket_size), skiplist_height_(skiplist_height), skiplist_branching_factor_(skiplist_branching_factor), transform_(transform), @@ -255,7 +256,8 @@ HashSkipListRep::Bucket* HashSkipListRep::GetInitializedBucket( return bucket; } -void HashSkipListRep::Insert(const char* key) { +void HashSkipListRep::Insert(KeyHandle handle) { + auto* key = static_cast(handle); assert(!Contains(key)); auto transformed = transform_->Transform(UserKey(key)); auto bucket = GetInitializedBucket(transformed); diff --git a/util/perf_context.cc b/util/perf_context.cc index 650abebca0..855e7c45ab 100644 --- a/util/perf_context.cc +++ b/util/perf_context.cc @@ -10,7 +10,11 @@ namespace rocksdb { // by default, enable counts only +#if defined(IOS_CROSS_COMPILE) PerfLevel perf_level = kEnableCount; +#else +__thread PerfLevel perf_level = kEnableCount; +#endif void SetPerfLevel(PerfLevel level) { perf_level = level; } @@ -69,6 +73,9 @@ std::string PerfContext::ToString() const { return ss.str(); } +#if defined(IOS_CROSS_COMPILE) +PerfContext perf_context; +#else __thread PerfContext perf_context; - +#endif } diff --git a/util/perf_context_imp.h b/util/perf_context_imp.h index f7818e69c5..ac044ca098 100644 --- a/util/perf_context_imp.h +++ b/util/perf_context_imp.h @@ -9,7 +9,13 @@ namespace rocksdb { +// TODO(icanadi): when calling perf_context is macro-ed (TODO ljin), make it +// noop in case IOS_CROSS_COMPILE +#if defined(IOS_CROSS_COMPILE) extern enum PerfLevel perf_level; +#else +extern __thread PerfLevel perf_level; +#endif inline void StartPerfTimer(StopWatchNano* timer) { if (perf_level >= PerfLevel::kEnableTime) { diff --git a/util/skiplistrep.cc b/util/skiplistrep.cc index e78e760e9b..93f7134c7d 100644 --- a/util/skiplistrep.cc +++ b/util/skiplistrep.cc @@ -13,13 +13,13 @@ class SkipListRep : public MemTableRep { SkipList skip_list_; public: explicit SkipListRep(const MemTableRep::KeyComparator& compare, Arena* arena) - : skip_list_(compare, arena) { + : MemTableRep(arena), skip_list_(compare, arena) { } // Insert key into the list. // REQUIRES: nothing that compares equal to key is currently in the list. - virtual void Insert(const char* key) override { - skip_list_.Insert(key); + virtual void Insert(KeyHandle handle) override { + skip_list_.Insert(static_cast(handle)); } // Returns true iff an entry that compares equal to key is in the list. diff --git a/util/sync_point.cc b/util/sync_point.cc new file mode 100644 index 0000000000..5d0ac2dd66 --- /dev/null +++ b/util/sync_point.cc @@ -0,0 +1,62 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "util/sync_point.h" + +namespace rocksdb { + +SyncPoint* SyncPoint::GetInstance() { + static SyncPoint sync_point; + return &sync_point; +} + +void SyncPoint::LoadDependency(const std::vector& dependencies) { + successors_.clear(); + predecessors_.clear(); + cleared_points_.clear(); + for (const auto& dependency : dependencies) { + successors_[dependency.predecessor].push_back(dependency.successor); + predecessors_[dependency.successor].push_back(dependency.predecessor); + } +} + +bool SyncPoint::PredecessorsAllCleared(const std::string& point) { + for (const auto& pred : predecessors_[point]) { + if (cleared_points_.count(pred) == 0) { + return false; + } + } + return true; +} + +void SyncPoint::EnableProcessing() { + std::unique_lock lock(mutex_); + enabled_ = true; +} + +void SyncPoint::DisableProcessing() { + std::unique_lock lock(mutex_); + enabled_ = false; +} + +void SyncPoint::ClearTrace() { + std::unique_lock lock(mutex_); + cleared_points_.clear(); +} + +void SyncPoint::Process(const std::string& point) { + std::unique_lock lock(mutex_); + + if (!enabled_) return; + + while (!PredecessorsAllCleared(point)) { + cv_.wait(lock); + } + + cleared_points_.insert(point); + cv_.notify_all(); +} + +} // namespace rocksdb diff --git a/util/sync_point.h b/util/sync_point.h new file mode 100644 index 0000000000..3cc8923705 --- /dev/null +++ b/util/sync_point.h @@ -0,0 +1,79 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace rocksdb { + +// This class provides facility to reproduce race conditions deterministically +// in unit tests. +// Developer could specify sync points in the codebase via TEST_SYNC_POINT. +// Each sync point represents a position in the execution stream of a thread. +// In the unit test, 'Happens After' relationship among sync points could be +// setup via SyncPoint::LoadDependency, to reproduce a desired interleave of +// threads execution. +// Refer to (DBTest,TransactionLogIteratorRace), for an exmaple use case. + +class SyncPoint { + public: + static SyncPoint* GetInstance(); + + struct Dependency { + std::string predecessor; + std::string successor; + }; + // call once at the beginning of a test to setup the dependency between + // sync points + void LoadDependency(const std::vector& dependencies); + + // enable sync point processing (disabled on startup) + void EnableProcessing(); + + // disable sync point processing + void DisableProcessing(); + + // remove the execution trace of all sync points + void ClearTrace(); + + // triggered by TEST_SYNC_POINT, blocking execution until all predecessors + // are executed. + void Process(const std::string& point); + + // TODO: it might be useful to provide a function that blocks until all + // sync points are cleared. + + private: + bool PredecessorsAllCleared(const std::string& point); + + // successor/predecessor map loaded from LoadDependency + std::unordered_map> successors_; + std::unordered_map> predecessors_; + + std::mutex mutex_; + std::condition_variable cv_; + // sync points that have been passed through + std::unordered_set cleared_points_; + bool enabled_ = false; +}; + +} // namespace rocksdb + +// Use TEST_SYNC_POINT to specify sync points inside code base. +// Sync points can have happens-after depedency on other sync points, +// configured at runtime via SyncPoint::LoadDependency. This could be +// utilized to re-produce race conditions between threads. +// See TransactionLogIteratorRace in db_test.cc for an example use case. +// TEST_SYNC_POINT is no op in release build. +#ifdef NDEBUG +#define TEST_SYNC_POINT(x) +#else +#define TEST_SYNC_POINT(x) rocksdb::SyncPoint::GetInstance()->Process(x) +#endif diff --git a/util/vectorrep.cc b/util/vectorrep.cc index 3777f7ffee..14e7c9f918 100644 --- a/util/vectorrep.cc +++ b/util/vectorrep.cc @@ -30,7 +30,7 @@ class VectorRep : public MemTableRep { // single buffer and pass that in as the parameter to Insert) // REQUIRES: nothing that compares equal to key is currently in the // collection. - virtual void Insert(const char* key) override; + virtual void Insert(KeyHandle handle) override; // Returns true iff an entry that compares equal to key is in the collection. virtual bool Contains(const char* key) const override; @@ -106,7 +106,8 @@ class VectorRep : public MemTableRep { const KeyComparator& compare_; }; -void VectorRep::Insert(const char* key) { +void VectorRep::Insert(KeyHandle handle) { + auto* key = static_cast(handle); assert(!Contains(key)); WriteLock l(&rwlock_); assert(!immutable_); @@ -134,7 +135,8 @@ size_t VectorRep::ApproximateMemoryUsage() { } VectorRep::VectorRep(const KeyComparator& compare, Arena* arena, size_t count) - : bucket_(new Bucket()), + : MemTableRep(arena), + bucket_(new Bucket()), immutable_(false), sorted_(false), compare_(compare) { bucket_.get()->reserve(count); } diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index f42ea8ccae..f6ffd94870 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -846,7 +846,7 @@ TEST(BackupableDBTest, RateLimiting) { auto rate_limited_backup_time = (bytes_written * kMicrosPerSec) / backupable_options_->backup_rate_limit; ASSERT_GT(backup_time, 0.9 * rate_limited_backup_time); - ASSERT_LT(backup_time, 1.5 * rate_limited_backup_time); + ASSERT_LT(backup_time, 2.5 * rate_limited_backup_time); CloseBackupableDB(); @@ -858,7 +858,7 @@ TEST(BackupableDBTest, RateLimiting) { auto rate_limited_restore_time = (bytes_written * kMicrosPerSec) / backupable_options_->restore_rate_limit; ASSERT_GT(restore_time, 0.9 * rate_limited_restore_time); - ASSERT_LT(restore_time, 1.5 * rate_limited_restore_time); + ASSERT_LT(restore_time, 2.5 * rate_limited_restore_time); AssertBackupConsistency(0, 0, 100000, 100010); } diff --git a/utilities/geodb/geodb_test.cc b/utilities/geodb/geodb_test.cc index d7af6c32b7..49e72d9d07 100644 --- a/utilities/geodb/geodb_test.cc +++ b/utilities/geodb/geodb_test.cc @@ -106,14 +106,14 @@ TEST(GeoDBTest, Search) { std::vector values; status = getdb()->SearchRadial(GeoPosition(46, 46), 200000, &values); ASSERT_TRUE(status.ok()); - ASSERT_EQ(values.size(), 1); + ASSERT_EQ(values.size(), 1U); // search all objects centered at 46 degree latitude with // a radius of 2 kilometers. There should be none. values.clear(); status = getdb()->SearchRadial(GeoPosition(46, 46), 2, &values); ASSERT_TRUE(status.ok()); - ASSERT_EQ(values.size(), 0); + ASSERT_EQ(values.size(), 0U); } } // namespace rocksdb diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index 344ebc3111..90194d21f6 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -201,7 +201,7 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory { user_comp_filter_factory_(comp_filter_factory) { } virtual std::unique_ptr CreateCompactionFilter( - const CompactionFilterContext& context) { + const CompactionFilter::Context& context) { return std::unique_ptr( new TtlCompactionFilter( ttl_, diff --git a/utilities/ttl/ttl_test.cc b/utilities/ttl/ttl_test.cc index a981cceb88..7891287292 100644 --- a/utilities/ttl/ttl_test.cc +++ b/utilities/ttl/ttl_test.cc @@ -283,9 +283,8 @@ class TtlTest { kNewValue_(kNewValue) { } - virtual std::unique_ptr - CreateCompactionFilter( - const CompactionFilterContext& context) override { + virtual std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) override { return std::unique_ptr( new TestFilter(kSampleSize_, kNewValue_)); }