This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/main by this push:
     new 356efee046 AVRO-4132: [C++] Add ZSTD codec (#3364)
356efee046 is described below

commit 356efee046234004e77a8719d24b9ca03fa79aea
Author: Gang Wu <[email protected]>
AuthorDate: Sun Apr 27 14:58:19 2025 +0800

    AVRO-4132: [C++] Add ZSTD codec (#3364)
    
    * AVRO-4132: [C++] Add ZSTD codec
    
    * assign value to the enum Codec
---
 .github/workflows/test-lang-c++-ARM.yml |  2 +-
 .github/workflows/test-lang-c++.yml     |  2 +-
 lang/c++/CMakeLists.txt                 | 16 +++++-
 lang/c++/impl/DataFile.cc               | 90 +++++++++++++++++++++++++++++++++
 lang/c++/include/avro/DataFile.hh       | 10 ++--
 lang/c++/test/DataFileTests.cc          | 82 ++++++++++++++++++++++++++++++
 6 files changed, 196 insertions(+), 6 deletions(-)

diff --git a/.github/workflows/test-lang-c++-ARM.yml 
b/.github/workflows/test-lang-c++-ARM.yml
index 759065b089..a34354e20f 100644
--- a/.github/workflows/test-lang-c++-ARM.yml
+++ b/.github/workflows/test-lang-c++-ARM.yml
@@ -44,7 +44,7 @@ jobs:
       - name: Install dependencies
         run: |
           sudo apt-get update -q
-          sudo apt-get install -q -y gcc g++ libboost-all-dev libfmt-dev 
zlib1g-dev cmake
+          sudo apt-get install -q -y gcc g++ libboost-all-dev libfmt-dev 
zlib1g-dev libzstd-dev cmake
 
       - name: Build
         run: |
diff --git a/.github/workflows/test-lang-c++.yml 
b/.github/workflows/test-lang-c++.yml
index c0c66ceec1..2104c2d26c 100644
--- a/.github/workflows/test-lang-c++.yml
+++ b/.github/workflows/test-lang-c++.yml
@@ -39,7 +39,7 @@ jobs:
       - uses: actions/checkout@v4
 
       - name: Install Dependencies
-        run: sudo apt update && sudo apt-get install -qqy cppcheck 
libboost-all-dev libsnappy-dev libfmt-dev zlib1g-dev cmake
+        run: sudo apt update && sudo apt-get install -qqy cppcheck 
libboost-all-dev libsnappy-dev libfmt-dev zlib1g-dev libzstd-dev cmake
 
       - name: Print Versions
         run: |
diff --git a/lang/c++/CMakeLists.txt b/lang/c++/CMakeLists.txt
index dacde31840..d4586558a3 100644
--- a/lang/c++/CMakeLists.txt
+++ b/lang/c++/CMakeLists.txt
@@ -98,6 +98,16 @@ else ()
     message("Disabled snappy codec.")
 endif ()
 
+find_package(zstd CONFIG)
+if(zstd_FOUND)
+    message("Enabled zstd codec, version: ${zstd_VERSION}")
+    set(ZSTD_TARGET 
$<IF:$<TARGET_EXISTS:zstd::libzstd_shared>,zstd::libzstd_shared,zstd::libzstd_static>)
+    add_definitions(-DZSTD_CODEC_AVAILABLE)
+else()
+    message("Disabled zstd codec.")
+    set(ZSTD_TARGET "")
+endif()
+
 # FindZLIB guarantees that ZLIB::ZLIB target exists if found
 # See https://cmake.org/cmake/help/latest/module/FindZLIB.html#imported-targets
 find_package(ZLIB REQUIRED)
@@ -135,18 +145,22 @@ target_link_libraries(avrocpp PUBLIC
   $<BUILD_INTERFACE:fmt::fmt-header-only>
   $<BUILD_INTERFACE:ZLIB::ZLIB>
   $<BUILD_INTERFACE:$<TARGET_NAME_IF_EXISTS:Snappy::snappy>>
+  
$<$<BOOL:${zstd_FOUND}>:$<BUILD_INTERFACE:$<TARGET_NAME_IF_EXISTS:${ZSTD_TARGET}>>>
   $<BUILD_INTERFACE:$<TARGET_NAME_IF_EXISTS:Boost::system>>
   $<INSTALL_INTERFACE:ZLIB::ZLIB>
   $<INSTALL_INTERFACE:$<TARGET_NAME_IF_EXISTS:Snappy::snappy>>
+  
$<$<BOOL:${zstd_FOUND}>:$<INSTALL_INTERFACE:$<TARGET_NAME_IF_EXISTS:${ZSTD_TARGET}>>>
   $<INSTALL_INTERFACE:$<TARGET_NAME_IF_EXISTS:Boost::system>>
 )
 target_link_libraries(avrocpp_s PUBLIC
   $<BUILD_INTERFACE:fmt::fmt-header-only>
   $<BUILD_INTERFACE:ZLIB::ZLIB>
   $<BUILD_INTERFACE:$<TARGET_NAME_IF_EXISTS:Snappy::snappy>>
+  
$<$<BOOL:${zstd_FOUND}>:$<BUILD_INTERFACE:$<TARGET_NAME_IF_EXISTS:${ZSTD_TARGET}>>>
   $<BUILD_INTERFACE:$<TARGET_NAME_IF_EXISTS:Boost::system>>
   $<INSTALL_INTERFACE:ZLIB::ZLIB>
   $<INSTALL_INTERFACE:$<TARGET_NAME_IF_EXISTS:Snappy::snappy>>
+  
$<$<BOOL:${zstd_FOUND}>:$<INSTALL_INTERFACE:$<TARGET_NAME_IF_EXISTS:${ZSTD_TARGET}>>>
   $<INSTALL_INTERFACE:$<TARGET_NAME_IF_EXISTS:Boost::system>>
 )
 
@@ -205,7 +219,7 @@ if (AVRO_BUILD_TESTS)
 
     macro (unittest name)
         add_executable (${name} test/${name}.cc)
-        target_link_libraries (${name} avrocpp_s Boost::system ZLIB::ZLIB 
$<TARGET_NAME_IF_EXISTS:Snappy::snappy>)
+        target_link_libraries (${name} avrocpp_s Boost::system ZLIB::ZLIB 
$<TARGET_NAME_IF_EXISTS:Snappy::snappy> 
$<$<BOOL:${zstd_FOUND}>:$<TARGET_NAME_IF_EXISTS:${ZSTD_TARGET}>>)
         add_test (NAME ${name} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
             COMMAND ${CMAKE_CURRENT_BINARY_DIR}/${name})
     endmacro (unittest)
diff --git a/lang/c++/impl/DataFile.cc b/lang/c++/impl/DataFile.cc
index fb0eaa2ac8..f633b959ca 100644
--- a/lang/c++/impl/DataFile.cc
+++ b/lang/c++/impl/DataFile.cc
@@ -27,6 +27,10 @@
 #include <snappy.h>
 #endif
 
+#ifdef ZSTD_CODEC_AVAILABLE
+#include <zstd.h>
+#endif
+
 #include <zlib.h>
 
 namespace avro {
@@ -49,6 +53,10 @@ const string AVRO_DEFLATE_CODEC("deflate");
 const string AVRO_SNAPPY_CODEC = "snappy";
 #endif
 
+#ifdef ZSTD_CODEC_AVAILABLE
+const string AVRO_ZSTD_CODEC = "zstd";
+#endif
+
 const size_t minSyncInterval = 32;
 const size_t maxSyncInterval = 1u << 30;
 
@@ -100,6 +108,10 @@ void DataFileWriterBase::init(const ValidSchema &schema, 
size_t syncInterval, co
 #ifdef SNAPPY_CODEC_AVAILABLE
     } else if (codec_ == SNAPPY_CODEC) {
         setMetadata(AVRO_CODEC_KEY, AVRO_SNAPPY_CODEC);
+#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+    } else if (codec_ == ZSTD_CODEC) {
+        setMetadata(AVRO_CODEC_KEY, AVRO_ZSTD_CODEC);
 #endif
     } else {
         throw Exception("Unknown codec: {}", int(codec));
@@ -216,6 +228,39 @@ void DataFileWriterBase::sync() {
         avro::encode(*encoderPtr_, byteCount);
         encoderPtr_->flush();
         copy(*in, *stream_);
+#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+    } else if (codec_ == ZSTD_CODEC) {
+        // Read all uncompressed data into a single buffer
+        std::vector<char> uncompressed;
+        const uint8_t *data;
+        size_t len;
+        std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
+        while (input->next(&data, &len)) {
+            uncompressed.insert(uncompressed.end(), reinterpret_cast<const 
char *>(data),
+                                reinterpret_cast<const char *>(data) + len);
+        }
+
+        // Pre-allocate buffer for compressed data
+        size_t max_compressed_size = ZSTD_compressBound(uncompressed.size());
+        std::vector<char> compressed(max_compressed_size);
+
+        // Compress the data using ZSTD block API
+        size_t compressed_size = ZSTD_compress(
+            compressed.data(), max_compressed_size,
+            uncompressed.data(), uncompressed.size(),
+            ZSTD_CLEVEL_DEFAULT);
+
+        if (ZSTD_isError(compressed_size)) {
+            throw Exception("ZSTD compression error: {}", 
ZSTD_getErrorName(compressed_size));
+        }
+
+        compressed.resize(compressed_size);
+        std::unique_ptr<InputStream> in = memoryInputStream(
+            reinterpret_cast<const uint8_t *>(compressed.data()), 
compressed.size());
+        avro::encode(*encoderPtr_, static_cast<int64_t>(compressed_size));
+        encoderPtr_->flush();
+        copy(*in, *stream_);
 #endif
     }
 
@@ -431,6 +476,46 @@ void DataFileReaderBase::readDataBlock() {
         dataDecoder_->init(*in);
         dataStream_ = std::move(in);
 #endif
+#ifdef ZSTD_CODEC_AVAILABLE
+    } else if (codec_ == ZSTD_CODEC) {
+        compressed_.clear();
+        const uint8_t *data;
+        size_t len;
+        while (st->next(&data, &len)) {
+            compressed_.insert(compressed_.end(), data, data + len);
+        }
+
+        // Get the decompressed size
+        size_t decompressed_size = ZSTD_getFrameContentSize(
+            reinterpret_cast<const char *>(compressed_.data()), 
compressed_.size());
+        if (decompressed_size == ZSTD_CONTENTSIZE_ERROR) {
+            throw Exception("ZSTD: Not a valid compressed frame");
+        } else if (decompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) {
+            throw Exception("ZSTD: Unable to determine decompressed size");
+        }
+
+        // Decompress the data
+        uncompressed.clear();
+        uncompressed.resize(decompressed_size);
+        size_t result = ZSTD_decompress(
+            uncompressed.data(), decompressed_size,
+            reinterpret_cast<const char *>(compressed_.data()), 
compressed_.size());
+
+        if (ZSTD_isError(result)) {
+            throw Exception("ZSTD decompression error: {}", 
ZSTD_getErrorName(result));
+        }
+        if (result != decompressed_size) {
+            throw Exception("ZSTD: Decompressed size mismatch: expected {}, 
got {}",
+                            decompressed_size, result);
+        }
+
+        std::unique_ptr<InputStream> in = memoryInputStream(
+            reinterpret_cast<const uint8_t *>(uncompressed.data()),
+            uncompressed.size());
+
+        dataDecoder_->init(*in);
+        dataStream_ = std::move(in);
+#endif
     } else {
         compressed_.clear();
         uncompressed.clear();
@@ -530,6 +615,11 @@ void DataFileReaderBase::readHeader() {
     } else if (it != metadata_.end()
                && toString(it->second) == AVRO_SNAPPY_CODEC) {
         codec_ = SNAPPY_CODEC;
+#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+    } else if (it != metadata_.end()
+               && toString(it->second) == AVRO_ZSTD_CODEC) {
+        codec_ = ZSTD_CODEC;
 #endif
     } else {
         codec_ = NULL_CODEC;
diff --git a/lang/c++/include/avro/DataFile.hh 
b/lang/c++/include/avro/DataFile.hh
index 6b2cdab5c1..8ff5d88c61 100644
--- a/lang/c++/include/avro/DataFile.hh
+++ b/lang/c++/include/avro/DataFile.hh
@@ -35,11 +35,15 @@ namespace avro {
 
 /** Specify type of compression to use when writing data files. */
 enum Codec {
-    NULL_CODEC,
-    DEFLATE_CODEC,
+    NULL_CODEC = 0,
+    DEFLATE_CODEC = 1,
 
 #ifdef SNAPPY_CODEC_AVAILABLE
-    SNAPPY_CODEC
+    SNAPPY_CODEC = 2,
+#endif
+
+#ifdef ZSTD_CODEC_AVAILABLE
+    ZSTD_CODEC = 3,
 #endif
 
 };
diff --git a/lang/c++/test/DataFileTests.cc b/lang/c++/test/DataFileTests.cc
index c360a07dbb..07a6b6b9a3 100644
--- a/lang/c++/test/DataFileTests.cc
+++ b/lang/c++/test/DataFileTests.cc
@@ -216,6 +216,12 @@ public:
     }
 #endif
 
+#ifdef ZSTD_CODEC_AVAILABLE
+    void testWriteWithZstdCodec() {
+        testWriteWithCodec(avro::ZSTD_CODEC);
+    }
+#endif
+
     void testWriteWithCodec(avro::Codec codec) {
         avro::DataFileWriter<ComplexInteger> df(filename, writerSchema, 100, 
codec);
         int64_t re = 3;
@@ -618,6 +624,39 @@ public:
     }
 #endif
 
+#ifdef ZSTD_CODEC_AVAILABLE
+    void testZstd() {
+        // Add enough objects to span multiple blocks
+        const size_t number_of_objects = 1000000;
+        // first create a large file
+        ValidSchema dschema = avro::compileJsonSchemaFromString(sch);
+        {
+            avro::DataFileWriter<ComplexInteger> writer(
+                filename, dschema, 16 * 1024, avro::ZSTD_CODEC);
+
+            for (size_t i = 0; i < number_of_objects; ++i) {
+                ComplexInteger d;
+                d.re = i;
+                d.im = 2 * i;
+                writer.write(d);
+            }
+        }
+        {
+            avro::DataFileReader<ComplexInteger> reader(filename, dschema);
+            std::this_thread::sleep_for(std::chrono::seconds(1));
+            std::vector<int64_t> found;
+            ComplexInteger record;
+            while (reader.read(record)) {
+                found.push_back(record.re);
+            }
+            BOOST_CHECK_EQUAL(found.size(), number_of_objects);
+            for (unsigned int i = 0; i < found.size(); ++i) {
+                BOOST_CHECK_EQUAL(found[i], i);
+            }
+        }
+    }
+#endif
+
     void testSchemaReadWrite() {
         uint32_t a = 42;
         {
@@ -792,6 +831,13 @@ void testSkipStringSnappyCodec() {
 }
 #endif
 
+#ifdef ZSTD_CODEC_AVAILABLE
+void testSkipStringZstdCodec() {
+    BOOST_TEST_CHECKPOINT(__func__);
+    testSkipString(avro::ZSTD_CODEC);
+}
+#endif
+
 struct TestRecord {
     std::string s1;
     int64_t id;
@@ -1005,6 +1051,13 @@ void testLastSyncSnappyCodec() {
 }
 #endif
 
+#ifdef ZSTD_CODEC_AVAILABLE
+void testLastSyncZstdCodec() {
+    BOOST_TEST_CHECKPOINT(__func__);
+    testLastSync(avro::ZSTD_CODEC);
+}
+#endif
+
 void testReadRecordEfficientlyUsingLastSyncNullCodec() {
     BOOST_TEST_CHECKPOINT(__func__);
     testReadRecordEfficientlyUsingLastSync(avro::NULL_CODEC);
@@ -1022,6 +1075,13 @@ void testReadRecordEfficientlyUsingLastSyncSnappyCodec() 
{
 }
 #endif
 
+#ifdef ZSTD_CODEC_AVAILABLE
+void testReadRecordEfficientlyUsingLastSyncZstdCodec() {
+    BOOST_TEST_CHECKPOINT(__func__);
+    testReadRecordEfficientlyUsingLastSync(avro::ZSTD_CODEC);
+}
+#endif
+
 test_suite *
 init_unit_test_suite(int, char *[]) {
     {
@@ -1055,6 +1115,16 @@ init_unit_test_suite(int, char *[]) {
         addReaderTests(ts, t1);
         boost::unit_test::framework::master_test_suite().add(ts);
     }
+#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+    {
+        auto *ts = BOOST_TEST_SUITE("DataFile tests: test1.zstd.df");
+        shared_ptr<DataFileTest> t1(new DataFileTest("test1.zstd.df", sch, 
isch));
+        ts->add(BOOST_CLASS_TEST_CASE(
+            &DataFileTest::testWriteWithZstdCodec, t1));
+        addReaderTests(ts, t1);
+        boost::unit_test::framework::master_test_suite().add(ts);
+    }
 #endif
     {
         auto *ts = BOOST_TEST_SUITE("DataFile tests: test2.df");
@@ -1101,6 +1171,9 @@ init_unit_test_suite(int, char *[]) {
         shared_ptr<DataFileTest> t8(new DataFileTest("test8.df", dsch, 
dblsch));
 #ifdef SNAPPY_CODEC_AVAILABLE
         ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testSnappy, t8));
+#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+        ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testZstd, t8));
 #endif
         boost::unit_test::framework::master_test_suite().add(ts);
     }
@@ -1165,18 +1238,27 @@ init_unit_test_suite(int, char *[]) {
 #ifdef SNAPPY_CODEC_AVAILABLE
     
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testSkipStringSnappyCodec));
 #endif
+#ifdef ZSTD_CODEC_AVAILABLE
+    
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testSkipStringZstdCodec));
+#endif
 
     
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testLastSyncNullCodec));
     
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testLastSyncDeflateCodec));
 #ifdef SNAPPY_CODEC_AVAILABLE
     
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testLastSyncSnappyCodec));
 #endif
+#ifdef ZSTD_CODEC_AVAILABLE
+    
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testLastSyncZstdCodec));
+#endif
 
     
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncNullCodec));
     
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncDeflateCodec));
 #ifdef SNAPPY_CODEC_AVAILABLE
     
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncSnappyCodec));
 #endif
+#ifdef ZSTD_CODEC_AVAILABLE
+    
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncZstdCodec));
+#endif
 
     return nullptr;
 }

Reply via email to