This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/main by this push:
     new 11fb55500b AVRO-4222: [C++] Support writer to specify compression 
level (#3610)
11fb55500b is described below

commit 11fb55500bed9fbe9af53b85112cd13887f0ce80
Author: Gang Wu <[email protected]>
AuthorDate: Fri Jan 9 14:31:45 2026 +0800

    AVRO-4222: [C++] Support writer to specify compression level (#3610)
    
    * AVRO-4222: [C++] Support writer to specify compression level
    
    * address feedback
---
 lang/c++/impl/DataFile.cc            | 240 +++++++++++++++++++++++++----------
 lang/c++/impl/ZstdCompressWrapper.cc |   4 +-
 lang/c++/impl/ZstdCompressWrapper.hh |   3 +-
 lang/c++/include/avro/DataFile.hh    |  26 ++--
 lang/c++/test/DataFileTests.cc       | 229 +++++++++++++++++++++++++++++++++
 5 files changed, 423 insertions(+), 79 deletions(-)

diff --git a/lang/c++/impl/DataFile.cc b/lang/c++/impl/DataFile.cc
index e605a6f448..3a93a5fdf5 100644
--- a/lang/c++/impl/DataFile.cc
+++ b/lang/c++/impl/DataFile.cc
@@ -47,53 +47,183 @@ using std::array;
 namespace {
 const string AVRO_SCHEMA_KEY("avro.schema");
 const string AVRO_CODEC_KEY("avro.codec");
-const string AVRO_NULL_CODEC("null");
-const string AVRO_DEFLATE_CODEC("deflate");
 
+const size_t minSyncInterval = 32;
+const size_t maxSyncInterval = 1u << 30;
+
+// Recommended by https://www.zlib.net/zlib_how.html
+const size_t zlibBufGrowSize = 128 * 1024;
+
+template<Codec codec>
+struct codec_trait {
+    static std::string name() {
+        throw Exception("Unsupported codec: {}", static_cast<int>(codec));
+    }
+    static void validate(std::optional<int> level) {
+        throw Exception("Unsupported codec: {}", static_cast<int>(codec));
+    }
+    static bool available() {
+        throw Exception("Unsupported codec: {}", static_cast<int>(codec));
+    }
+};
+
+template<>
+struct codec_trait<NULL_CODEC> {
+    static std::string name() {
+        return "null";
+    }
+    static void validate(std::optional<int> /*level*/) {}
+    static bool available() {
+        return true;
+    }
+};
+
+template<>
+struct codec_trait<DEFLATE_CODEC> {
+    static std::string name() {
+        return "deflate";
+    }
+
+    static void validate(std::optional<int> level) {
+        if (!level.has_value()) {
+            return;
+        }
+        int levelValue = level.value();
+        if (levelValue < 0 || levelValue > 9) {
+            throw Exception("Invalid compression level {} for deflate codec. "
+                            "Valid range is 0-9.",
+                            levelValue);
+        }
+    }
+
+    static bool available() {
+        return true;
+    }
+};
+
+template<>
+struct codec_trait<SNAPPY_CODEC> {
+    static std::string name() {
+        return "snappy";
+    }
+
+    static void validate(std::optional<int> /*level*/) {
+    }
+
+    static bool available() {
 #ifdef SNAPPY_CODEC_AVAILABLE
-const string AVRO_SNAPPY_CODEC = "snappy";
+        return true;
+#else
+        return false;
 #endif
+    }
+};
+
+template<>
+struct codec_trait<ZSTD_CODEC> {
+    static std::string name() {
+        return "zstandard";
+    }
 
+    static void validate(std::optional<int> level) {
+        if (!level.has_value()) {
+            return;
+        }
+        int levelValue = level.value();
+        if (levelValue < 1 || levelValue > 22) {
+            throw Exception("Invalid compression level {} for zstandard codec. 
"
+                            "Valid range is 1-22.",
+                            levelValue);
+        }
+    }
+
+    static bool available() {
 #ifdef ZSTD_CODEC_AVAILABLE
-const string AVRO_ZSTD_CODEC = "zstandard";
+        return true;
+#else
+        return false;
 #endif
+    }
+};
 
-const size_t minSyncInterval = 32;
-const size_t maxSyncInterval = 1u << 30;
+#define DISPATCH_CODEC_FUNC(codec, func, ...)                              \
+    switch (codec) {                                                       \
+        case NULL_CODEC:                                                   \
+            return codec_trait<NULL_CODEC>::func(__VA_ARGS__);             \
+        case DEFLATE_CODEC:                                                \
+            return codec_trait<DEFLATE_CODEC>::func(__VA_ARGS__);          \
+        case SNAPPY_CODEC:                                                 \
+            return codec_trait<SNAPPY_CODEC>::func(__VA_ARGS__);           \
+        case ZSTD_CODEC:                                                   \
+            return codec_trait<ZSTD_CODEC>::func(__VA_ARGS__);             \
+        default:                                                           \
+            throw Exception("Unknown codec: {}", static_cast<int>(codec)); \
+    }
 
-// Recommended by https://www.zlib.net/zlib_how.html
-const size_t zlibBufGrowSize = 128 * 1024;
+std::string getCodecName(Codec codec) {
+    DISPATCH_CODEC_FUNC(codec, name);
+}
+
+void validateCodec(Codec codec, std::optional<int> level) {
+    if (!isCodecAvailable(codec)) {
+        throw Exception("Codec {} is not available.", getCodecName(codec));
+    }
+    DISPATCH_CODEC_FUNC(codec, validate, level);
+}
+
+Codec getCodec(const std::string &name) {
+    if (name == codec_trait<NULL_CODEC>::name()) {
+        return NULL_CODEC;
+    } else if (name == codec_trait<DEFLATE_CODEC>::name()) {
+        return DEFLATE_CODEC;
+    } else if (name == codec_trait<SNAPPY_CODEC>::name()) {
+        return SNAPPY_CODEC;
+    } else if (name == codec_trait<ZSTD_CODEC>::name()) {
+        return ZSTD_CODEC;
+    } else {
+        throw Exception("Unknown codec name: {}", name);
+    }
+}
 
 } // namespace
 
+bool isCodecAvailable(Codec codec) {
+    DISPATCH_CODEC_FUNC(codec, available);
+}
+
+#undef DISPATCH_CODEC_FUNC
+
 DataFileWriterBase::DataFileWriterBase(const char *filename, const ValidSchema 
&schema, size_t syncInterval,
-                                       Codec codec, const Metadata &metadata) 
: filename_(filename),
-                                                                               
 schema_(schema),
-                                                                               
 encoderPtr_(binaryEncoder()),
-                                                                               
 syncInterval_(syncInterval),
-                                                                               
 codec_(codec),
-                                                                               
 stream_(fileOutputStream(filename)),
-                                                                               
 buffer_(memoryOutputStream()),
-                                                                               
 sync_(makeSync()),
-                                                                               
 objectCount_(0),
-                                                                               
 metadata_(metadata),
-                                                                               
 lastSync_(0) {
+                                       Codec codec, const Metadata &metadata,
+                                       std::optional<int> compressionLevel) : 
filename_(filename),
+                                                                              
schema_(schema),
+                                                                              
encoderPtr_(binaryEncoder()),
+                                                                              
syncInterval_(syncInterval),
+                                                                              
codec_(codec),
+                                                                              
compressionLevel_(compressionLevel),
+                                                                              
stream_(fileOutputStream(filename)),
+                                                                              
buffer_(memoryOutputStream()),
+                                                                              
sync_(makeSync()),
+                                                                              
objectCount_(0),
+                                                                              
metadata_(metadata),
+                                                                              
lastSync_(0) {
     init(schema, syncInterval, codec);
 }
 
-DataFileWriterBase::DataFileWriterBase(std::unique_ptr<OutputStream> 
outputStream,
-                                       const ValidSchema &schema, size_t 
syncInterval,
-                                       Codec codec, const Metadata &metadata) 
: filename_(),
-                                                                               
 schema_(schema),
-                                                                               
 encoderPtr_(binaryEncoder()),
-                                                                               
 syncInterval_(syncInterval),
-                                                                               
 codec_(codec),
-                                                                               
 stream_(std::move(outputStream)),
-                                                                               
 buffer_(memoryOutputStream()),
-                                                                               
 sync_(makeSync()),
-                                                                               
 objectCount_(0),
-                                                                               
 metadata_(metadata),
-                                                                               
 lastSync_(0) {
+DataFileWriterBase::DataFileWriterBase(std::unique_ptr<OutputStream> 
outputStream, const ValidSchema &schema,
+                                       size_t syncInterval, Codec codec, const 
Metadata &metadata,
+                                       std::optional<int> compressionLevel) : 
filename_(),
+                                                                              
schema_(schema),
+                                                                              
encoderPtr_(binaryEncoder()),
+                                                                              
syncInterval_(syncInterval),
+                                                                              
codec_(codec),
+                                                                              
compressionLevel_(compressionLevel),
+                                                                              
stream_(std::move(outputStream)),
+                                                                              
buffer_(memoryOutputStream()),
+                                                                              
sync_(makeSync()),
+                                                                              
objectCount_(0),
+                                                                              
metadata_(metadata),
+                                                                              
lastSync_(0) {
     init(schema, syncInterval, codec);
 }
 
@@ -103,23 +233,9 @@ void DataFileWriterBase::init(const ValidSchema &schema, 
size_t syncInterval, co
             "Invalid sync interval: {}. Should be between {} and {}",
             syncInterval, minSyncInterval, maxSyncInterval);
     }
-    setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
 
-    if (codec_ == NULL_CODEC) {
-        setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
-    } else if (codec_ == DEFLATE_CODEC) {
-        setMetadata(AVRO_CODEC_KEY, AVRO_DEFLATE_CODEC);
-#ifdef SNAPPY_CODEC_AVAILABLE
-    } else if (codec_ == SNAPPY_CODEC) {
-        setMetadata(AVRO_CODEC_KEY, AVRO_SNAPPY_CODEC);
-#endif
-#ifdef ZSTD_CODEC_AVAILABLE
-    } else if (codec_ == ZSTD_CODEC) {
-        setMetadata(AVRO_CODEC_KEY, AVRO_ZSTD_CODEC);
-#endif
-    } else {
-        throw Exception("Unknown codec: {}", int(codec));
-    }
+    validateCodec(codec, compressionLevel_);
+    setMetadata(AVRO_CODEC_KEY, getCodecName(codec));
     setMetadata(AVRO_SCHEMA_KEY, schema.toJson(false));
 
     writeHeader();
@@ -160,7 +276,10 @@ void DataFileWriterBase::sync() {
             zs.zfree = Z_NULL;
             zs.opaque = Z_NULL;
 
-            int ret = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 
-15, 8, Z_DEFAULT_STRATEGY);
+            // Use Z_DEFAULT_COMPRESSION if no level specified
+            int effectiveLevel = 
compressionLevel_.value_or(Z_DEFAULT_COMPRESSION);
+
+            int ret = deflateInit2(&zs, effectiveLevel, Z_DEFLATED, -15, 8, 
Z_DEFAULT_STRATEGY);
             if (ret != Z_OK) {
                 throw Exception("Failed to initialize deflate, error: {}", 
ret);
             }
@@ -246,7 +365,7 @@ void DataFileWriterBase::sync() {
         }
 
         ZstdCompressWrapper zstdCompressWrapper;
-        std::vector<char> compressed = 
zstdCompressWrapper.compress(uncompressed);
+        std::vector<char> compressed = 
zstdCompressWrapper.compress(uncompressed, compressionLevel_);
 
         std::unique_ptr<InputStream> in = memoryInputStream(
             reinterpret_cast<const uint8_t *>(compressed.data()), 
compressed.size());
@@ -580,23 +699,16 @@ void DataFileReaderBase::readHeader() {
         readerSchema_ = dataSchema();
     }
 
+    // Parse codec from metadata using codec_trait
     it = metadata_.find(AVRO_CODEC_KEY);
-    if (it != metadata_.end() && toString(it->second) == AVRO_DEFLATE_CODEC) {
-        codec_ = DEFLATE_CODEC;
-#ifdef SNAPPY_CODEC_AVAILABLE
-    } else if (it != metadata_.end()
-               && toString(it->second) == AVRO_SNAPPY_CODEC) {
-        codec_ = SNAPPY_CODEC;
-#endif
-#ifdef ZSTD_CODEC_AVAILABLE
-    } else if (it != metadata_.end() && toString(it->second) == 
AVRO_ZSTD_CODEC) {
-        codec_ = ZSTD_CODEC;
-#endif
+    if (it != metadata_.end()) {
+        const auto codecName = toString(it->second);
+        codec_ = getCodec(codecName);
+        if (!isCodecAvailable(codec_)) {
+            throw Exception("Codec {} is not available.", codecName);
+        }
     } else {
         codec_ = NULL_CODEC;
-        if (it != metadata_.end() && toString(it->second) != AVRO_NULL_CODEC) {
-            throw Exception("Unknown codec in data file: " + 
toString(it->second));
-        }
     }
 
     avro::decode(*decoder_, sync_);
diff --git a/lang/c++/impl/ZstdCompressWrapper.cc 
b/lang/c++/impl/ZstdCompressWrapper.cc
index b756e19d8e..fecf335ef6 100644
--- a/lang/c++/impl/ZstdCompressWrapper.cc
+++ b/lang/c++/impl/ZstdCompressWrapper.cc
@@ -25,7 +25,7 @@
 
 namespace avro {
 
-std::vector<char> ZstdCompressWrapper::compress(const std::vector<char> 
&uncompressed) {
+std::vector<char> ZstdCompressWrapper::compress(const std::vector<char> 
&uncompressed, std::optional<int> compressionLevel) {
     // Pre-allocate buffer for compressed data
     size_t max_compressed_size = ZSTD_compressBound(uncompressed.size());
     if (ZSTD_isError(max_compressed_size)) {
@@ -37,7 +37,7 @@ std::vector<char> ZstdCompressWrapper::compress(const 
std::vector<char> &uncompr
     size_t compressed_size = ZSTD_compress(
         compressed.data(), max_compressed_size,
         uncompressed.data(), uncompressed.size(),
-        ZSTD_CLEVEL_DEFAULT);
+        compressionLevel.value_or(ZSTD_CLEVEL_DEFAULT));
 
     if (ZSTD_isError(compressed_size)) {
         throw Exception("ZSTD compression error: {}", 
ZSTD_getErrorName(compressed_size));
diff --git a/lang/c++/impl/ZstdCompressWrapper.hh 
b/lang/c++/impl/ZstdCompressWrapper.hh
index 871dd711a6..419fb26177 100644
--- a/lang/c++/impl/ZstdCompressWrapper.hh
+++ b/lang/c++/impl/ZstdCompressWrapper.hh
@@ -21,6 +21,7 @@
 
 #ifdef ZSTD_CODEC_AVAILABLE
 
+#include <optional>
 #include <vector>
 
 #include <zstd.h>
@@ -32,7 +33,7 @@ public:
     ZstdCompressWrapper();
     ~ZstdCompressWrapper();
 
-    std::vector<char> compress(const std::vector<char> &uncompressed);
+    std::vector<char> compress(const std::vector<char> &uncompressed, 
std::optional<int> compressionLevel = std::nullopt);
 
 private:
     ZSTD_CCtx *cctx_ = nullptr;
diff --git a/lang/c++/include/avro/DataFile.hh 
b/lang/c++/include/avro/DataFile.hh
index 27f9c43913..4a16a3bd5d 100644
--- a/lang/c++/include/avro/DataFile.hh
+++ b/lang/c++/include/avro/DataFile.hh
@@ -28,6 +28,7 @@
 
 #include <array>
 #include <map>
+#include <optional>
 #include <string>
 #include <vector>
 
@@ -37,17 +38,15 @@ namespace avro {
 enum Codec {
     NULL_CODEC = 0,
     DEFLATE_CODEC = 1,
-
-#ifdef SNAPPY_CODEC_AVAILABLE
     SNAPPY_CODEC = 2,
-#endif
-
-#ifdef ZSTD_CODEC_AVAILABLE
-    ZSTD_CODEC = 3,
-#endif
-
+    ZSTD_CODEC = 3
 };
 
+/**
+ * Returns true if the specified codec is available at runtime.
+ */
+AVRO_DECL bool isCodecAvailable(Codec codec);
+
 const int SyncSize = 16;
 /**
  * The sync value.
@@ -81,6 +80,7 @@ class AVRO_DECL DataFileWriterBase {
     const EncoderPtr encoderPtr_;
     const size_t syncInterval_;
     Codec codec_;
+    std::optional<int> compressionLevel_;
 
     std::unique_ptr<OutputStream> stream_;
     std::unique_ptr<OutputStream> buffer_;
@@ -134,10 +134,10 @@ public:
      */
     DataFileWriterBase(const char *filename, const ValidSchema &schema,
                        size_t syncInterval, Codec codec = NULL_CODEC,
-                       const Metadata &metadata = {});
+                       const Metadata &metadata = {}, std::optional<int> 
compressionLevel = std::nullopt);
     DataFileWriterBase(std::unique_ptr<OutputStream> outputStream,
                        const ValidSchema &schema, size_t syncInterval, Codec 
codec,
-                       const Metadata &metadata = {});
+                       const Metadata &metadata = {}, std::optional<int> 
compressionLevel = std::nullopt);
 
     DataFileWriterBase(const DataFileWriterBase &) = delete;
     DataFileWriterBase &operator=(const DataFileWriterBase &) = delete;
@@ -173,11 +173,13 @@ public:
      */
     DataFileWriter(const char *filename, const ValidSchema &schema,
                    size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC,
-                   const Metadata &metadata = {}) : 
base_(std::make_unique<DataFileWriterBase>(filename, schema, syncInterval, 
codec, metadata)) {}
+                   const Metadata &metadata = {}, std::optional<int> 
compressionLevel = std::nullopt)
+        : base_(std::make_unique<DataFileWriterBase>(filename, schema, 
syncInterval, codec, metadata, compressionLevel)) {}
 
     DataFileWriter(std::unique_ptr<OutputStream> outputStream, const 
ValidSchema &schema,
                    size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC,
-                   const Metadata &metadata = {}) : 
base_(std::make_unique<DataFileWriterBase>(std::move(outputStream), schema, 
syncInterval, codec, metadata)) {}
+                   const Metadata &metadata = {}, std::optional<int> 
compressionLevel = std::nullopt)
+        : base_(std::make_unique<DataFileWriterBase>(std::move(outputStream), 
schema, syncInterval, codec, metadata, compressionLevel)) {}
 
     DataFileWriter(const DataFileWriter &) = delete;
     DataFileWriter &operator=(const DataFileWriter &) = delete;
diff --git a/lang/c++/test/DataFileTests.cc b/lang/c++/test/DataFileTests.cc
index 3a44970c55..ed4e93c066 100644
--- a/lang/c++/test/DataFileTests.cc
+++ b/lang/c++/test/DataFileTests.cc
@@ -1284,6 +1284,219 @@ void testMetadataWithZstdCodec() {
 }
 #endif
 
+void testDeflateCompressionLevelValidation() {
+    BOOST_TEST_CHECKPOINT(__func__);
+
+    avro::ValidSchema schema = avro::compileJsonSchemaFromString(sch);
+    const char *filename = "test_deflate_level.df";
+
+    boost::mt19937 rng(static_cast<uint32_t>(time(nullptr)));
+    boost::random::uniform_int_distribution<> dist(-100, 100);
+
+    for (int i = 0; i < 100; ++i) {
+        int level = dist(rng);
+        bool isValidLevel = (level >= 0 && level <= 9);
+
+        if (isValidLevel) {
+            // Valid levels should succeed
+            BOOST_CHECK_NO_THROW({
+                avro::DataFileWriter<ComplexInteger> writer(
+                    filename, schema, 16 * 1024, avro::DEFLATE_CODEC, {}, 
level);
+                writer.close();
+            });
+        } else {
+            // Invalid levels should throw
+            BOOST_CHECK_THROW({ avro::DataFileWriter<ComplexInteger> writer(
+                                    filename, schema, 16 * 1024, 
avro::DEFLATE_CODEC, {}, level); }, avro::Exception);
+        }
+    }
+
+    BOOST_CHECK_NO_THROW({
+        avro::DataFileWriter<ComplexInteger> writer(
+            filename, schema, 16 * 1024, avro::DEFLATE_CODEC, {}, 
std::nullopt);
+        writer.close();
+    });
+
+    std::filesystem::remove(filename);
+}
+
+#ifdef ZSTD_CODEC_AVAILABLE
+void testZstdCompressionLevelValidation() {
+    BOOST_TEST_CHECKPOINT(__func__);
+
+    avro::ValidSchema schema = avro::compileJsonSchemaFromString(sch);
+    const char *filename = "test_zstd_level.df";
+
+    boost::mt19937 rng(static_cast<uint32_t>(time(nullptr)));
+    boost::random::uniform_int_distribution<> dist(-100, 100);
+
+    for (int i = 0; i < 100; ++i) {
+        int level = dist(rng);
+        bool isValidLevel = (level >= 1 && level <= 22);
+
+        if (isValidLevel) {
+            // Valid levels should succeed
+            BOOST_CHECK_NO_THROW({
+                avro::DataFileWriter<ComplexInteger> writer(
+                    filename, schema, 16 * 1024, avro::ZSTD_CODEC, {}, level);
+                writer.close();
+            });
+        } else {
+            // Invalid levels should throw
+            BOOST_CHECK_THROW({ avro::DataFileWriter<ComplexInteger> writer(
+                                    filename, schema, 16 * 1024, 
avro::ZSTD_CODEC, {}, level); }, avro::Exception);
+        }
+    }
+
+    BOOST_CHECK_NO_THROW({
+        avro::DataFileWriter<ComplexInteger> writer(
+            filename, schema, 16 * 1024, avro::ZSTD_CODEC, {}, std::nullopt);
+        writer.close();
+    });
+
+    std::filesystem::remove(filename);
+}
+#endif
+
+void testDeflateCompressionRoundTrip() {
+    BOOST_TEST_CHECKPOINT(__func__);
+
+    avro::ValidSchema schema = avro::compileJsonSchemaFromString(sch);
+    const char *filename = "test_deflate_roundtrip.df";
+
+    boost::mt19937 rng(static_cast<uint32_t>(time(nullptr)));
+    boost::random::uniform_int_distribution<> levelDist(0, 10); // 0-9 valid, 
10 = nullopt
+    boost::random::uniform_int_distribution<> dataDist(1, 1000);
+
+    for (int i = 0; i < 100; ++i) {
+        int rawLevel = levelDist(rng);
+        std::optional<int> level = (rawLevel == 10) ? std::nullopt : 
std::optional<int>(rawLevel);
+        int numRecords = dataDist(rng) % 100 + 1;
+
+        std::vector<ComplexInteger> originalData;
+        int64_t re = rng();
+        int64_t im = rng();
+        for (int j = 0; j < numRecords; ++j) {
+            originalData.emplace_back(re, im);
+            re = re * 31 + im;
+            im = im * 17 + re;
+        }
+
+        // Write with compression level
+        {
+            avro::DataFileWriter<ComplexInteger> writer(
+                filename, schema, 16 * 1024, avro::DEFLATE_CODEC, {}, level);
+            for (const auto &record : originalData) {
+                writer.write(record);
+            }
+            writer.close();
+        }
+
+        // Read back and verify
+        {
+            avro::DataFileReader<ComplexInteger> reader(filename, schema);
+            std::vector<ComplexInteger> readData;
+            ComplexInteger record;
+            while (reader.read(record)) {
+                readData.push_back(record);
+            }
+
+            BOOST_CHECK_EQUAL(readData.size(), originalData.size());
+            for (size_t j = 0; j < originalData.size() && j < readData.size(); 
++j) {
+                BOOST_CHECK_EQUAL(readData[j].re, originalData[j].re);
+                BOOST_CHECK_EQUAL(readData[j].im, originalData[j].im);
+            }
+        }
+    }
+
+    std::filesystem::remove(filename);
+}
+
+#ifdef ZSTD_CODEC_AVAILABLE
+void testZstdCompressionRoundTrip() {
+    BOOST_TEST_CHECKPOINT(__func__);
+
+    avro::ValidSchema schema = avro::compileJsonSchemaFromString(sch);
+    const char *filename = "test_zstd_roundtrip.df";
+
+    boost::mt19937 rng(static_cast<uint32_t>(time(nullptr)));
+    // Valid ZSTD levels: 1-22
+    boost::random::uniform_int_distribution<> levelDist(0, 22); // 0 = 
nullopt, 1-22 = valid levels
+    boost::random::uniform_int_distribution<> dataDist(1, 1000);
+
+    for (int i = 0; i < 100; ++i) {
+        int rawLevel = levelDist(rng);
+        std::optional<int> level = (rawLevel == 0) ? std::nullopt : 
std::optional<int>(rawLevel);
+        int numRecords = dataDist(rng) % 100 + 1;
+
+        std::vector<ComplexInteger> originalData;
+        int64_t re = rng();
+        int64_t im = rng();
+        for (int j = 0; j < numRecords; ++j) {
+            originalData.emplace_back(re, im);
+            re = re * 31 + im;
+            im = im * 17 + re;
+        }
+
+        // Write with compression level
+        {
+            avro::DataFileWriter<ComplexInteger> writer(
+                filename, schema, 16 * 1024, avro::ZSTD_CODEC, {}, level);
+            for (const auto &record : originalData) {
+                writer.write(record);
+            }
+            writer.close();
+        }
+
+        // Read back and verify
+        {
+            avro::DataFileReader<ComplexInteger> reader(filename, schema);
+            std::vector<ComplexInteger> readData;
+            ComplexInteger record;
+            while (reader.read(record)) {
+                readData.push_back(record);
+            }
+
+            BOOST_CHECK_EQUAL(readData.size(), originalData.size());
+            for (size_t j = 0; j < originalData.size() && j < readData.size(); 
++j) {
+                BOOST_CHECK_EQUAL(readData[j].re, originalData[j].re);
+                BOOST_CHECK_EQUAL(readData[j].im, originalData[j].im);
+            }
+        }
+    }
+
+    std::filesystem::remove(filename);
+}
+#endif
+
+void testCodecEnumValues() {
+    BOOST_TEST_CHECKPOINT(__func__);
+
+    BOOST_CHECK_EQUAL(static_cast<int>(avro::NULL_CODEC), 0);
+    BOOST_CHECK_EQUAL(static_cast<int>(avro::DEFLATE_CODEC), 1);
+    BOOST_CHECK_EQUAL(static_cast<int>(avro::SNAPPY_CODEC), 2);
+    BOOST_CHECK_EQUAL(static_cast<int>(avro::ZSTD_CODEC), 3);
+}
+
+void testIsCodecAvailable() {
+    BOOST_TEST_CHECKPOINT(__func__);
+
+    BOOST_CHECK_EQUAL(avro::isCodecAvailable(avro::NULL_CODEC), true);
+    BOOST_CHECK_EQUAL(avro::isCodecAvailable(avro::DEFLATE_CODEC), true);
+
+#ifdef SNAPPY_CODEC_AVAILABLE
+    BOOST_CHECK_EQUAL(avro::isCodecAvailable(avro::SNAPPY_CODEC), true);
+#else
+    BOOST_CHECK_EQUAL(avro::isCodecAvailable(avro::SNAPPY_CODEC), false);
+#endif
+
+#ifdef ZSTD_CODEC_AVAILABLE
+    BOOST_CHECK_EQUAL(avro::isCodecAvailable(avro::ZSTD_CODEC), true);
+#else
+    BOOST_CHECK_EQUAL(avro::isCodecAvailable(avro::ZSTD_CODEC), false);
+#endif
+}
+
 test_suite *
 init_unit_test_suite(int, char *[]) {
     {
@@ -1487,5 +1700,21 @@ init_unit_test_suite(int, char *[]) {
     
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testMetadataWithZstdCodec));
 #endif
 
+    // Codec enum and isCodecAvailable tests
+    
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testCodecEnumValues));
+    
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testIsCodecAvailable));
+
+    // Compression level validation property tests
+    
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testDeflateCompressionLevelValidation));
+#ifdef ZSTD_CODEC_AVAILABLE
+    
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testZstdCompressionLevelValidation));
+#endif
+
+    // Compression round-trip property tests
+    
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testDeflateCompressionRoundTrip));
+#ifdef ZSTD_CODEC_AVAILABLE
+    
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testZstdCompressionRoundTrip));
+#endif
+
     return nullptr;
 }

Reply via email to