CMakeLists.txt | 23 +++++++------- HISTORY.md | 4 +++ Makefile | 12 +++---- build_tools/build_detect_platform | 2 +- cmake/modules/Finduring.cmake | 2 +- db/db_test.cc | 4 +++ db/log_reader.cc | 14 ++++---- db/log_test.cc | 37 ++++++++++++++++++++-- file/prefetch_test.cc | 3 +- port/port_posix.h | 2 ++ table/block_based/data_block_hash_index.h | 1 + tools/CMakeLists.txt | 5 +++ util/compression.cc | 4 +-- util/compression.h | 7 ++-- util/string_util.h | 1 + util/xxhash.h | 3 +- .../range/range_tree/lib/portability/toku_atomic.h | 12 +++---- .../range/range_tree/lib/portability/toku_time.h | 36 +++++++++++++++++++-- 18 files changed, 131 insertions(+), 41 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index dbef05902..85136f4aa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -210,17 +210,6 @@ endif() include(CheckCCompilerFlag) if(CMAKE_SYSTEM_PROCESSOR MATCHES "^(powerpc|ppc)64") - CHECK_C_COMPILER_FLAG("-mcpu=power9" HAS_POWER9) - if(HAS_POWER9) - set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mcpu=power9 -mtune=power9") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mcpu=power9 -mtune=power9") - else() - CHECK_C_COMPILER_FLAG("-mcpu=power8" HAS_POWER8) - if(HAS_POWER8) - set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mcpu=power8 -mtune=power8") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mcpu=power8 -mtune=power8") - endif(HAS_POWER8) - endif(HAS_POWER9) CHECK_C_COMPILER_FLAG("-maltivec" HAS_ALTIVEC) if(HAS_ALTIVEC) message(STATUS " HAS_ALTIVEC yes") @@ -245,6 +234,14 @@ if(CMAKE_SYSTEM_PROCESSOR MATCHES "s390x") endif(HAS_S390X_MARCH_NATIVE) endif(CMAKE_SYSTEM_PROCESSOR MATCHES "s390x") +if(CMAKE_SYSTEM_PROCESSOR MATCHES "loongarch64") + CHECK_C_COMPILER_FLAG("-march=loongarch64" HAS_LOONGARCH64) + if(HAS_LOONGARCH64) + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mcpu=loongarch64 -mtune=loongarch64") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mcpu=loongarch64 -mtune=loongarch64") + endif(HAS_LOONGARCH64) +endif(CMAKE_SYSTEM_PROCESSOR MATCHES "loongarch64") + option(PORTABLE "build a portable binary" OFF) option(FORCE_SSE42 "force building with SSE4.2, even when PORTABLE=ON" OFF) option(FORCE_AVX "force building with AVX, even when PORTABLE=ON" OFF) @@ -275,6 +272,9 @@ if(PORTABLE) if(CMAKE_SYSTEM_PROCESSOR MATCHES "^s390x") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=z196") endif() + if(CMAKE_SYSTEM_PROCESSOR MATCHES "^loongarch64") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=loongarch64") + endif() endif() else() if(MSVC) @@ -1531,6 +1531,7 @@ if(WITH_TESTS) endif() if(WITH_BENCHMARK_TOOLS) + set(CMAKE_SKIP_BUILD_RPATH TRUE) add_executable(db_bench${ARTIFACT_SUFFIX} tools/simulated_hybrid_file_system.cc tools/db_bench.cc diff --git a/HISTORY.md b/HISTORY.md index 75aee6474..04eee2011 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,4 +1,8 @@ # Rocksdb Change Log +## Unreleased +### Bug Fixes +* Fixed a bug in DB open/recovery from a compressed WAL that was caused due to incorrect handling of certain record fragments with the same offset within a WAL block. + ## 7.9.3 (02/01/2023) ### Bug Fixes * Fixed a data race on `ColumnFamilyData::flush_reason` caused by concurrent flushes. diff --git a/Makefile b/Makefile index 06f2e32a2..2797adf56 100644 --- a/Makefile +++ b/Makefile @@ -233,8 +233,8 @@ am__v_AR_ = $(am__v_AR_$(AM_DEFAULT_VERBOSITY)) am__v_AR_0 = @echo " AR " $@; am__v_AR_1 = -AM_LINK = $(AM_V_CCLD)$(CXX) -L. $(patsubst lib%.a, -l%, $(patsubst lib%.$(PLATFORM_SHARED_EXT), -l%, $^)) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) -AM_SHARE = $(AM_V_CCLD) $(CXX) $(PLATFORM_SHARED_LDFLAGS)$@ -L. $(patsubst lib%.$(PLATFORM_SHARED_EXT), -l%, $^) $(EXEC_LDFLAGS) $(LDFLAGS) -o $@ +AM_LINK = $(AM_V_CCLD) $(CXX) $(COVERAGEFLAGS) -o $@ $(LDFLAGS) $(EXEC_LDFLAGS) -L. $(patsubst lib%.a, -l%, $(patsubst lib%.$(PLATFORM_SHARED_EXT), -l%, $^)) +AM_SHARE = $(AM_V_CCLD) $(CXX) $(PLATFORM_SHARED_LDFLAGS)$@ -o $@ $(LDFLAGS) $(EXEC_LDFLAGS) -L. $(patsubst lib%.$(PLATFORM_SHARED_EXT), -l%, $^) # Detect what platform we're building on. # Export some common variables that might have been passed as Make variables @@ -868,7 +868,7 @@ $(SHARED3): $(SHARED4) endif # PLATFORM_SHARED_VERSIONED $(SHARED4): $(LIB_OBJECTS) - $(AM_V_CCLD) $(CXX) $(PLATFORM_SHARED_LDFLAGS)$(SHARED3) $(LIB_OBJECTS) $(LDFLAGS) -o $@ + $(AM_V_CCLD) $(CXX) $(LIB_OBJECTS) -o $@ $(PLATFORM_SHARED_LDFLAGS)$(SHARED3) $(LDFLAGS) endif # PLATFORM_SHARED_EXT .PHONY: check clean coverage ldb_tests package dbg gen-pc build_size \ @@ -1872,7 +1872,7 @@ ldb: $(OBJ_DIR)/tools/ldb.o $(TOOLS_LIBRARY) $(LIBRARY) $(AM_LINK) iostats_context_test: $(OBJ_DIR)/monitoring/iostats_context_test.o $(TEST_LIBRARY) $(LIBRARY) - $(AM_V_CCLD)$(CXX) $^ $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) + $(AM_V_CCLD)$(CXX) $^ -o $@ $(LDFLAGS) $(EXEC_LDFLAGS) persistent_cache_test: $(OBJ_DIR)/utilities/persistent_cache/persistent_cache_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) @@ -2058,7 +2058,7 @@ JAVA_INCLUDE = -I$(JAVA_HOME)/include/ -I$(JAVA_HOME)/include/linux ifeq ($(PLATFORM), OS_SOLARIS) ARCH := $(shell isainfo -b) else ifeq ($(PLATFORM), OS_OPENBSD) - ifneq (,$(filter amd64 ppc64 ppc64le s390x arm64 aarch64 sparc64, $(MACHINE))) + ifneq (,$(filter amd64 ppc64 ppc64le s390x arm64 aarch64 sparc64 loongarch64, $(MACHINE))) ARCH := 64 else ARCH := 32 @@ -2079,7 +2079,7 @@ ifneq ($(origin JNI_LIBC), undefined) endif ifeq (,$(ROCKSDBJNILIB)) -ifneq (,$(filter ppc% s390x arm64 aarch64 sparc64, $(MACHINE))) +ifneq (,$(filter ppc% s390x arm64 aarch64 sparc64 loongarch64, $(MACHINE))) ROCKSDBJNILIB = librocksdbjni-linux-$(MACHINE)$(JNI_LIBC_POSTFIX).so else ROCKSDBJNILIB = librocksdbjni-linux$(ARCH)$(JNI_LIBC_POSTFIX).so diff --git a/build_tools/build_detect_platform b/build_tools/build_detect_platform index 15129411a..a96083d81 100755 --- a/build_tools/build_detect_platform +++ b/build_tools/build_detect_platform @@ -136,7 +136,7 @@ CROSS_COMPILE= PLATFORM_CCFLAGS= PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS" PLATFORM_SHARED_EXT="so" -PLATFORM_SHARED_LDFLAGS="-Wl,--no-as-needed -shared -Wl,-soname -Wl," +PLATFORM_SHARED_LDFLAGS="-shared -Wl,-soname -Wl," PLATFORM_SHARED_CFLAGS="-fPIC" PLATFORM_SHARED_VERSIONED=true diff --git a/cmake/modules/Finduring.cmake b/cmake/modules/Finduring.cmake index 8cb14cb27..6928e53ae 100644 --- a/cmake/modules/Finduring.cmake +++ b/cmake/modules/Finduring.cmake @@ -7,7 +7,7 @@ find_path(uring_INCLUDE_DIR NAMES liburing.h) find_library(uring_LIBRARIES - NAMES liburing.a liburing) + NAMES liburing.so liburing) include(FindPackageHandleStandardArgs) find_package_handle_standard_args(uring diff --git a/db/db_test.cc b/db/db_test.cc index 9575248b4..e7229e396 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -7191,7 +7191,11 @@ TEST_F(DBTest, LargeBlockSizeTest) { CreateAndReopenWithCF({"pikachu"}, options); ASSERT_OK(Put(0, "foo", "bar")); BlockBasedTableOptions table_options; +#if !defined(__i386__) && !defined(__mips__) table_options.block_size = 8LL * 1024 * 1024 * 1024LL; +#else + table_options.block_size = 3LL * 1024 * 1024 * 1024LL; +#endif options.table_factory.reset(NewBlockBasedTableFactory(table_options)); ASSERT_NOK(TryReopenWithColumnFamilies({"default", "pikachu"}, options)); } diff --git a/db/log_reader.cc b/db/log_reader.cc index a21868776..575a7d758 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -515,10 +515,11 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size, size_t uncompressed_size = 0; int remaining = 0; + const char* input = header + header_size; do { - remaining = uncompress_->Uncompress(header + header_size, length, - uncompressed_buffer_.get(), - &uncompressed_size); + remaining = uncompress_->Uncompress( + input, length, uncompressed_buffer_.get(), &uncompressed_size); + input = nullptr; if (remaining < 0) { buffer_.clear(); return kBadRecord; @@ -830,10 +831,11 @@ bool FragmentBufferedReader::TryReadFragment( uncompressed_record_.clear(); size_t uncompressed_size = 0; int remaining = 0; + const char* input = header + header_size; do { - remaining = uncompress_->Uncompress(header + header_size, length, - uncompressed_buffer_.get(), - &uncompressed_size); + remaining = uncompress_->Uncompress( + input, length, uncompressed_buffer_.get(), &uncompressed_size); + input = nullptr; if (remaining < 0) { buffer_.clear(); *fragment_type_or_err = kBadRecord; diff --git a/db/log_test.cc b/db/log_test.cc index 2a43dc152..f4d388f41 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -979,6 +979,38 @@ TEST_P(CompressionLogTest, Fragmentation) { ASSERT_EQ("EOF", Read()); } +TEST_P(CompressionLogTest, AlignedFragmentation) { + CompressionType compression_type = std::get<2>(GetParam()); + if (!StreamingCompressionTypeSupported(compression_type)) { + ROCKSDB_GTEST_SKIP("Test requires support for compression type"); + return; + } + ASSERT_OK(SetupTestEnv()); + Random rnd(301); + int num_filler_records = 0; + // Keep writing small records until the next record will be aligned at the + // beginning of the block. + while ((WrittenBytes() & (kBlockSize - 1)) >= kHeaderSize) { + char entry = 'a'; + ASSERT_OK(writer_->AddRecord(Slice(&entry, 1))); + num_filler_records++; + } + const std::vector wal_entries = { + rnd.RandomBinaryString(3 * kBlockSize), + }; + for (const std::string& wal_entry : wal_entries) { + Write(wal_entry); + } + + for (int i = 0; i < num_filler_records; ++i) { + ASSERT_EQ("a", Read()); + } + for (const std::string& wal_entry : wal_entries) { + ASSERT_EQ(wal_entry, Read()); + } + ASSERT_EQ("EOF", Read()); +} + INSTANTIATE_TEST_CASE_P( Compression, CompressionLogTest, ::testing::Combine(::testing::Values(0, 1), ::testing::Bool(), @@ -1026,10 +1058,11 @@ TEST_P(StreamingCompressionTest, Basic) { for (int i = 0; i < (int)compressed_buffers.size(); i++) { // Call uncompress till either the entire input is consumed or the output // buffer size is equal to the allocated output buffer size. + const char* input = compressed_buffers[i].c_str(); do { - ret_val = uncompress->Uncompress(compressed_buffers[i].c_str(), - compressed_buffers[i].size(), + ret_val = uncompress->Uncompress(input, compressed_buffers[i].size(), uncompressed_output_buffer, &output_pos); + input = nullptr; if (output_pos > 0) { std::string uncompressed_fragment; uncompressed_fragment.assign(uncompressed_output_buffer, output_pos); diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index 438286bfc..11cb841b9 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -2090,7 +2090,8 @@ TEST_F(FilePrefetchBufferTest, SeekWithBlockCacheHit) { // Simulate a seek of 4096 bytes at offset 0. Due to the readahead settings, // it will do two reads of 4096+8192 and 8192 Status s = fpb.PrefetchAsync(IOOptions(), r.get(), 0, 4096, &result); - ASSERT_EQ(s, Status::TryAgain()); + // Platforms that don't have IO uring may not support async IO + ASSERT_TRUE(s.IsTryAgain() || s.IsNotSupported()); // Simulate a block cache hit fpb.UpdateReadPattern(0, 4096, false); // Now read some data that straddles the two prefetch buffers - offset 8192 to diff --git a/port/port_posix.h b/port/port_posix.h index ec6aa281d..417fbf4f6 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -169,6 +169,8 @@ static inline void AsmVolatilePause() { asm volatile("isb"); #elif defined(__powerpc64__) asm volatile("or 27,27,27"); +#elif defined(__loongarch64) + asm volatile("dbar 0"); #endif // it's okay for other platforms to be no-ops } diff --git a/table/block_based/data_block_hash_index.h b/table/block_based/data_block_hash_index.h index f356395f3..321522175 100644 --- a/table/block_based/data_block_hash_index.h +++ b/table/block_based/data_block_hash_index.h @@ -5,6 +5,7 @@ #pragma once +#include #include #include diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index 19030e84b..66c84bbef 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -1,3 +1,5 @@ +set(CMAKE_SKIP_BUILD_RPATH TRUE) + set(CORE_TOOLS sst_dump.cc ldb.cc) @@ -7,6 +9,9 @@ foreach(src ${CORE_TOOLS}) ${src}) target_link_libraries(${exename}${ARTIFACT_SUFFIX} ${ROCKSDB_LIB}) list(APPEND core_tool_deps ${exename}) + install(TARGETS ${exename}${ARTIFACT_SUFFIX} + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + COMPONENT runtime) endforeach() if(WITH_TOOLS) diff --git a/util/compression.cc b/util/compression.cc index 8e2f01b12..712d333ee 100644 --- a/util/compression.cc +++ b/util/compression.cc @@ -85,14 +85,14 @@ void ZSTDStreamingCompress::Reset() { int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size, char* output, size_t* output_pos) { - assert(input != nullptr && output != nullptr && output_pos != nullptr); + assert(output != nullptr && output_pos != nullptr); *output_pos = 0; // Don't need to uncompress an empty input if (input_size == 0) { return 0; } #ifdef ZSTD_STREAMING - if (input_buffer_.src != input) { + if (input) { // New input input_buffer_ = {input, input_size, /*pos=*/0}; } diff --git a/util/compression.h b/util/compression.h index 0d4febcfb..dbc3d41cd 100644 --- a/util/compression.h +++ b/util/compression.h @@ -1705,8 +1705,11 @@ class StreamingUncompress { compress_format_version_(compress_format_version), max_output_len_(max_output_len) {} virtual ~StreamingUncompress() = default; - // uncompress should be called again with the same input if output_size is - // equal to max_output_len or with the next input fragment. + // Uncompress can be called repeatedly to progressively process the same + // input buffer, or can be called with a new input buffer. When the input + // buffer is not fully consumed, the return value is > 0 or output_size + // == max_output_len. When calling uncompress to continue processing the + // same input buffer, the input argument should be nullptr. // Parameters: // input - buffer to uncompress // input_size - size of input buffer diff --git a/util/string_util.h b/util/string_util.h index 55d106fff..11178fd1d 100644 --- a/util/string_util.h +++ b/util/string_util.h @@ -6,6 +6,7 @@ #pragma once +#include #include #include #include diff --git a/util/xxhash.h b/util/xxhash.h index 195f06b39..fa483b7eb 100644 --- a/util/xxhash.h +++ b/util/xxhash.h @@ -1287,7 +1287,8 @@ XXH_PUBLIC_API XXH128_hash_t XXH128(const void* data, size_t len, XXH64_hash_t s #ifndef XXH_FORCE_ALIGN_CHECK /* can be defined externally */ # if defined(__i386) || defined(__x86_64__) || defined(__aarch64__) \ - || defined(_M_IX86) || defined(_M_X64) || defined(_M_ARM64) /* visual */ + || defined(_M_IX86) || defined(_M_X64) || defined(_M_ARM64) \ + || defined(__loongarch64) /* visual */ # define XXH_FORCE_ALIGN_CHECK 0 # else # define XXH_FORCE_ALIGN_CHECK 1 diff --git a/utilities/transactions/lock/range/range_tree/lib/portability/toku_atomic.h b/utilities/transactions/lock/range/range_tree/lib/portability/toku_atomic.h index aaa2298fa..938590280 100644 --- a/utilities/transactions/lock/range/range_tree/lib/portability/toku_atomic.h +++ b/utilities/transactions/lock/range/range_tree/lib/portability/toku_atomic.h @@ -77,37 +77,37 @@ template __attribute__((always_inline)) static inline T toku_sync_fetch_and_add(T *addr, U diff) { paranoid_invariant(!crosses_boundary(addr, sizeof *addr)); - return __sync_fetch_and_add(addr, diff); + return __atomic_fetch_add(addr, diff, 5); } template __attribute__((always_inline)) static inline T toku_sync_add_and_fetch(T *addr, U diff) { paranoid_invariant(!crosses_boundary(addr, sizeof *addr)); - return __sync_add_and_fetch(addr, diff); + return __atomic_add_fetch(addr, diff, 5); } template __attribute__((always_inline)) static inline T toku_sync_fetch_and_sub(T *addr, U diff) { paranoid_invariant(!crosses_boundary(addr, sizeof *addr)); - return __sync_fetch_and_sub(addr, diff); + return __atomic_fetch_sub(addr, diff, 5); } template __attribute__((always_inline)) static inline T toku_sync_sub_and_fetch(T *addr, U diff) { paranoid_invariant(!crosses_boundary(addr, sizeof *addr)); - return __sync_sub_and_fetch(addr, diff); + return __atomic_sub_fetch(addr, diff, 5); } template __attribute__((always_inline)) static inline T toku_sync_val_compare_and_swap( T *addr, U oldval, V newval) { paranoid_invariant(!crosses_boundary(addr, sizeof *addr)); - return __sync_val_compare_and_swap(addr, oldval, newval); + return __atomic_compare_exchange(addr, oldval, newval); } template __attribute__((always_inline)) static inline bool toku_sync_bool_compare_and_swap(T *addr, U oldval, V newval) { paranoid_invariant(!crosses_boundary(addr, sizeof *addr)); - return __sync_bool_compare_and_swap(addr, oldval, newval); + return static_cast(__atomic_compare_exchange(addr, oldval, newval)); } // in case you include this but not toku_portability.h diff --git a/utilities/transactions/lock/range/range_tree/lib/portability/toku_time.h b/utilities/transactions/lock/range/range_tree/lib/portability/toku_time.h index 46111e7f0..367fb955a 100644 --- a/utilities/transactions/lock/range/range_tree/lib/portability/toku_time.h +++ b/utilities/transactions/lock/range/range_tree/lib/portability/toku_time.h @@ -58,7 +58,7 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. #include #include #include -#if defined(__powerpc__) +#if defined(__powerpc__) && defined(__GLIBC__) #include #endif @@ -131,8 +131,16 @@ static inline tokutime_t toku_time_now(void) { uint64_t result; __asm __volatile__("mrs %[rt], cntvct_el0" : [rt] "=r"(result)); return result; -#elif defined(__powerpc__) +#elif defined(__arm__) + uint32_t lo, hi; + __asm __volatile__("mrrc p15, 1, %[lo], %[hi], c14" : [ lo ] "=r" (lo), [hi] "=r" (hi)); + return (uint64_t)hi << 32 | lo; +#elif defined(__powerpc__) && defined(__GLIBC__) return __ppc_get_timebase(); +#elif defined(__powerpc64__) || defined(__ppc64__) + uint64_t result; + asm volatile("mfspr %0, 268" : "=r"(result)); + return result; #elif defined(__s390x__) uint64_t result; asm volatile("stckf %0" : "=Q"(result) : : "cc"); @@ -154,6 +162,30 @@ static inline tokutime_t toku_time_now(void) { uint64_t cycles; asm volatile("rdcycle %0" : "=r"(cycles)); return cycles; +#elif defined(__loongarch64) + unsigned long result; + asm volatile ("rdtime.d\t%0,$r0" : "=r" (result)); + return result; +#elif defined(__mips__) + // mips apparently only allows rdtsc for superusers, so we fall + // back to gettimeofday. It's possible clock_gettime would be better. + struct timeval tv; + gettimeofday(&tv, nullptr); + return (uint64_t)tv.tv_sec * 1000000 + tv.tv_usec; +#elif (__ARM_ARCH >= 6) + uint32_t pmccntr; + uint32_t pmuseren; + uint32_t pmcntenset; + // Read the user mode perf monitor counter access permissions. + asm volatile("mrc p15, 0, %0, c9, c14, 0" : "=r"(pmuseren)); + if (pmuseren & 1) { // Allows reading perfmon counters for user mode code. + asm volatile("mrc p15, 0, %0, c9, c12, 1" : "=r"(pmcntenset)); + if (pmcntenset & 0x80000000ul) { // Is it counting? + asm volatile("mrc p15, 0, %0, c9, c13, 0" : "=r"(pmccntr)); + // The counter is set up to count every 64th cycle + return (uint64_t)pmccntr * 64; // Should optimize to << 6 + } + } #else #error No timer implementation for this platform #endif