This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/main by this push:
new 11fb55500b AVRO-4222: [C++] Support writer to specify compression
level (#3610)
11fb55500b is described below
commit 11fb55500bed9fbe9af53b85112cd13887f0ce80
Author: Gang Wu <[email protected]>
AuthorDate: Fri Jan 9 14:31:45 2026 +0800
AVRO-4222: [C++] Support writer to specify compression level (#3610)
* AVRO-4222: [C++] Support writer to specify compression level
* address feedback
---
lang/c++/impl/DataFile.cc | 240 +++++++++++++++++++++++++----------
lang/c++/impl/ZstdCompressWrapper.cc | 4 +-
lang/c++/impl/ZstdCompressWrapper.hh | 3 +-
lang/c++/include/avro/DataFile.hh | 26 ++--
lang/c++/test/DataFileTests.cc | 229 +++++++++++++++++++++++++++++++++
5 files changed, 423 insertions(+), 79 deletions(-)
diff --git a/lang/c++/impl/DataFile.cc b/lang/c++/impl/DataFile.cc
index e605a6f448..3a93a5fdf5 100644
--- a/lang/c++/impl/DataFile.cc
+++ b/lang/c++/impl/DataFile.cc
@@ -47,53 +47,183 @@ using std::array;
namespace {
const string AVRO_SCHEMA_KEY("avro.schema");
const string AVRO_CODEC_KEY("avro.codec");
-const string AVRO_NULL_CODEC("null");
-const string AVRO_DEFLATE_CODEC("deflate");
+const size_t minSyncInterval = 32;
+const size_t maxSyncInterval = 1u << 30;
+
+// Recommended by https://www.zlib.net/zlib_how.html
+const size_t zlibBufGrowSize = 128 * 1024;
+
+template<Codec codec>
+struct codec_trait {
+ static std::string name() {
+ throw Exception("Unsupported codec: {}", static_cast<int>(codec));
+ }
+ static void validate(std::optional<int> level) {
+ throw Exception("Unsupported codec: {}", static_cast<int>(codec));
+ }
+ static bool available() {
+ throw Exception("Unsupported codec: {}", static_cast<int>(codec));
+ }
+};
+
+template<>
+struct codec_trait<NULL_CODEC> {
+ static std::string name() {
+ return "null";
+ }
+ static void validate(std::optional<int> /*level*/) {}
+ static bool available() {
+ return true;
+ }
+};
+
+template<>
+struct codec_trait<DEFLATE_CODEC> {
+ static std::string name() {
+ return "deflate";
+ }
+
+ static void validate(std::optional<int> level) {
+ if (!level.has_value()) {
+ return;
+ }
+ int levelValue = level.value();
+ if (levelValue < 0 || levelValue > 9) {
+ throw Exception("Invalid compression level {} for deflate codec. "
+ "Valid range is 0-9.",
+ levelValue);
+ }
+ }
+
+ static bool available() {
+ return true;
+ }
+};
+
+template<>
+struct codec_trait<SNAPPY_CODEC> {
+ static std::string name() {
+ return "snappy";
+ }
+
+ static void validate(std::optional<int> /*level*/) {
+ }
+
+ static bool available() {
#ifdef SNAPPY_CODEC_AVAILABLE
-const string AVRO_SNAPPY_CODEC = "snappy";
+ return true;
+#else
+ return false;
#endif
+ }
+};
+
+template<>
+struct codec_trait<ZSTD_CODEC> {
+ static std::string name() {
+ return "zstandard";
+ }
+ static void validate(std::optional<int> level) {
+ if (!level.has_value()) {
+ return;
+ }
+ int levelValue = level.value();
+ if (levelValue < 1 || levelValue > 22) {
+ throw Exception("Invalid compression level {} for zstandard codec.
"
+ "Valid range is 1-22.",
+ levelValue);
+ }
+ }
+
+ static bool available() {
#ifdef ZSTD_CODEC_AVAILABLE
-const string AVRO_ZSTD_CODEC = "zstandard";
+ return true;
+#else
+ return false;
#endif
+ }
+};
-const size_t minSyncInterval = 32;
-const size_t maxSyncInterval = 1u << 30;
+#define DISPATCH_CODEC_FUNC(codec, func, ...) \
+ switch (codec) { \
+ case NULL_CODEC: \
+ return codec_trait<NULL_CODEC>::func(__VA_ARGS__); \
+ case DEFLATE_CODEC: \
+ return codec_trait<DEFLATE_CODEC>::func(__VA_ARGS__); \
+ case SNAPPY_CODEC: \
+ return codec_trait<SNAPPY_CODEC>::func(__VA_ARGS__); \
+ case ZSTD_CODEC: \
+ return codec_trait<ZSTD_CODEC>::func(__VA_ARGS__); \
+ default: \
+ throw Exception("Unknown codec: {}", static_cast<int>(codec)); \
+ }
-// Recommended by https://www.zlib.net/zlib_how.html
-const size_t zlibBufGrowSize = 128 * 1024;
+std::string getCodecName(Codec codec) {
+ DISPATCH_CODEC_FUNC(codec, name);
+}
+
+void validateCodec(Codec codec, std::optional<int> level) {
+ if (!isCodecAvailable(codec)) {
+ throw Exception("Codec {} is not available.", getCodecName(codec));
+ }
+ DISPATCH_CODEC_FUNC(codec, validate, level);
+}
+
+Codec getCodec(const std::string &name) {
+ if (name == codec_trait<NULL_CODEC>::name()) {
+ return NULL_CODEC;
+ } else if (name == codec_trait<DEFLATE_CODEC>::name()) {
+ return DEFLATE_CODEC;
+ } else if (name == codec_trait<SNAPPY_CODEC>::name()) {
+ return SNAPPY_CODEC;
+ } else if (name == codec_trait<ZSTD_CODEC>::name()) {
+ return ZSTD_CODEC;
+ } else {
+ throw Exception("Unknown codec name: {}", name);
+ }
+}
} // namespace
+bool isCodecAvailable(Codec codec) {
+ DISPATCH_CODEC_FUNC(codec, available);
+}
+
+#undef DISPATCH_CODEC_FUNC
+
DataFileWriterBase::DataFileWriterBase(const char *filename, const ValidSchema
&schema, size_t syncInterval,
- Codec codec, const Metadata &metadata)
: filename_(filename),
-
schema_(schema),
-
encoderPtr_(binaryEncoder()),
-
syncInterval_(syncInterval),
-
codec_(codec),
-
stream_(fileOutputStream(filename)),
-
buffer_(memoryOutputStream()),
-
sync_(makeSync()),
-
objectCount_(0),
-
metadata_(metadata),
-
lastSync_(0) {
+ Codec codec, const Metadata &metadata,
+ std::optional<int> compressionLevel) :
filename_(filename),
+
schema_(schema),
+
encoderPtr_(binaryEncoder()),
+
syncInterval_(syncInterval),
+
codec_(codec),
+
compressionLevel_(compressionLevel),
+
stream_(fileOutputStream(filename)),
+
buffer_(memoryOutputStream()),
+
sync_(makeSync()),
+
objectCount_(0),
+
metadata_(metadata),
+
lastSync_(0) {
init(schema, syncInterval, codec);
}
-DataFileWriterBase::DataFileWriterBase(std::unique_ptr<OutputStream>
outputStream,
- const ValidSchema &schema, size_t
syncInterval,
- Codec codec, const Metadata &metadata)
: filename_(),
-
schema_(schema),
-
encoderPtr_(binaryEncoder()),
-
syncInterval_(syncInterval),
-
codec_(codec),
-
stream_(std::move(outputStream)),
-
buffer_(memoryOutputStream()),
-
sync_(makeSync()),
-
objectCount_(0),
-
metadata_(metadata),
-
lastSync_(0) {
+DataFileWriterBase::DataFileWriterBase(std::unique_ptr<OutputStream>
outputStream, const ValidSchema &schema,
+ size_t syncInterval, Codec codec, const
Metadata &metadata,
+ std::optional<int> compressionLevel) :
filename_(),
+
schema_(schema),
+
encoderPtr_(binaryEncoder()),
+
syncInterval_(syncInterval),
+
codec_(codec),
+
compressionLevel_(compressionLevel),
+
stream_(std::move(outputStream)),
+
buffer_(memoryOutputStream()),
+
sync_(makeSync()),
+
objectCount_(0),
+
metadata_(metadata),
+
lastSync_(0) {
init(schema, syncInterval, codec);
}
@@ -103,23 +233,9 @@ void DataFileWriterBase::init(const ValidSchema &schema,
size_t syncInterval, co
"Invalid sync interval: {}. Should be between {} and {}",
syncInterval, minSyncInterval, maxSyncInterval);
}
- setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
- if (codec_ == NULL_CODEC) {
- setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
- } else if (codec_ == DEFLATE_CODEC) {
- setMetadata(AVRO_CODEC_KEY, AVRO_DEFLATE_CODEC);
-#ifdef SNAPPY_CODEC_AVAILABLE
- } else if (codec_ == SNAPPY_CODEC) {
- setMetadata(AVRO_CODEC_KEY, AVRO_SNAPPY_CODEC);
-#endif
-#ifdef ZSTD_CODEC_AVAILABLE
- } else if (codec_ == ZSTD_CODEC) {
- setMetadata(AVRO_CODEC_KEY, AVRO_ZSTD_CODEC);
-#endif
- } else {
- throw Exception("Unknown codec: {}", int(codec));
- }
+ validateCodec(codec, compressionLevel_);
+ setMetadata(AVRO_CODEC_KEY, getCodecName(codec));
setMetadata(AVRO_SCHEMA_KEY, schema.toJson(false));
writeHeader();
@@ -160,7 +276,10 @@ void DataFileWriterBase::sync() {
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;
- int ret = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
-15, 8, Z_DEFAULT_STRATEGY);
+ // Use Z_DEFAULT_COMPRESSION if no level specified
+ int effectiveLevel =
compressionLevel_.value_or(Z_DEFAULT_COMPRESSION);
+
+ int ret = deflateInit2(&zs, effectiveLevel, Z_DEFLATED, -15, 8,
Z_DEFAULT_STRATEGY);
if (ret != Z_OK) {
throw Exception("Failed to initialize deflate, error: {}",
ret);
}
@@ -246,7 +365,7 @@ void DataFileWriterBase::sync() {
}
ZstdCompressWrapper zstdCompressWrapper;
- std::vector<char> compressed =
zstdCompressWrapper.compress(uncompressed);
+ std::vector<char> compressed =
zstdCompressWrapper.compress(uncompressed, compressionLevel_);
std::unique_ptr<InputStream> in = memoryInputStream(
reinterpret_cast<const uint8_t *>(compressed.data()),
compressed.size());
@@ -580,23 +699,16 @@ void DataFileReaderBase::readHeader() {
readerSchema_ = dataSchema();
}
+ // Parse codec from metadata using codec_trait
it = metadata_.find(AVRO_CODEC_KEY);
- if (it != metadata_.end() && toString(it->second) == AVRO_DEFLATE_CODEC) {
- codec_ = DEFLATE_CODEC;
-#ifdef SNAPPY_CODEC_AVAILABLE
- } else if (it != metadata_.end()
- && toString(it->second) == AVRO_SNAPPY_CODEC) {
- codec_ = SNAPPY_CODEC;
-#endif
-#ifdef ZSTD_CODEC_AVAILABLE
- } else if (it != metadata_.end() && toString(it->second) ==
AVRO_ZSTD_CODEC) {
- codec_ = ZSTD_CODEC;
-#endif
+ if (it != metadata_.end()) {
+ const auto codecName = toString(it->second);
+ codec_ = getCodec(codecName);
+ if (!isCodecAvailable(codec_)) {
+ throw Exception("Codec {} is not available.", codecName);
+ }
} else {
codec_ = NULL_CODEC;
- if (it != metadata_.end() && toString(it->second) != AVRO_NULL_CODEC) {
- throw Exception("Unknown codec in data file: " +
toString(it->second));
- }
}
avro::decode(*decoder_, sync_);
diff --git a/lang/c++/impl/ZstdCompressWrapper.cc
b/lang/c++/impl/ZstdCompressWrapper.cc
index b756e19d8e..fecf335ef6 100644
--- a/lang/c++/impl/ZstdCompressWrapper.cc
+++ b/lang/c++/impl/ZstdCompressWrapper.cc
@@ -25,7 +25,7 @@
namespace avro {
-std::vector<char> ZstdCompressWrapper::compress(const std::vector<char>
&uncompressed) {
+std::vector<char> ZstdCompressWrapper::compress(const std::vector<char>
&uncompressed, std::optional<int> compressionLevel) {
// Pre-allocate buffer for compressed data
size_t max_compressed_size = ZSTD_compressBound(uncompressed.size());
if (ZSTD_isError(max_compressed_size)) {
@@ -37,7 +37,7 @@ std::vector<char> ZstdCompressWrapper::compress(const
std::vector<char> &uncompr
size_t compressed_size = ZSTD_compress(
compressed.data(), max_compressed_size,
uncompressed.data(), uncompressed.size(),
- ZSTD_CLEVEL_DEFAULT);
+ compressionLevel.value_or(ZSTD_CLEVEL_DEFAULT));
if (ZSTD_isError(compressed_size)) {
throw Exception("ZSTD compression error: {}",
ZSTD_getErrorName(compressed_size));
diff --git a/lang/c++/impl/ZstdCompressWrapper.hh
b/lang/c++/impl/ZstdCompressWrapper.hh
index 871dd711a6..419fb26177 100644
--- a/lang/c++/impl/ZstdCompressWrapper.hh
+++ b/lang/c++/impl/ZstdCompressWrapper.hh
@@ -21,6 +21,7 @@
#ifdef ZSTD_CODEC_AVAILABLE
+#include <optional>
#include <vector>
#include <zstd.h>
@@ -32,7 +33,7 @@ public:
ZstdCompressWrapper();
~ZstdCompressWrapper();
- std::vector<char> compress(const std::vector<char> &uncompressed);
+ std::vector<char> compress(const std::vector<char> &uncompressed,
std::optional<int> compressionLevel = std::nullopt);
private:
ZSTD_CCtx *cctx_ = nullptr;
diff --git a/lang/c++/include/avro/DataFile.hh
b/lang/c++/include/avro/DataFile.hh
index 27f9c43913..4a16a3bd5d 100644
--- a/lang/c++/include/avro/DataFile.hh
+++ b/lang/c++/include/avro/DataFile.hh
@@ -28,6 +28,7 @@
#include <array>
#include <map>
+#include <optional>
#include <string>
#include <vector>
@@ -37,17 +38,15 @@ namespace avro {
enum Codec {
NULL_CODEC = 0,
DEFLATE_CODEC = 1,
-
-#ifdef SNAPPY_CODEC_AVAILABLE
SNAPPY_CODEC = 2,
-#endif
-
-#ifdef ZSTD_CODEC_AVAILABLE
- ZSTD_CODEC = 3,
-#endif
-
+ ZSTD_CODEC = 3
};
+/**
+ * Returns true if the specified codec is available at runtime.
+ */
+AVRO_DECL bool isCodecAvailable(Codec codec);
+
const int SyncSize = 16;
/**
* The sync value.
@@ -81,6 +80,7 @@ class AVRO_DECL DataFileWriterBase {
const EncoderPtr encoderPtr_;
const size_t syncInterval_;
Codec codec_;
+ std::optional<int> compressionLevel_;
std::unique_ptr<OutputStream> stream_;
std::unique_ptr<OutputStream> buffer_;
@@ -134,10 +134,10 @@ public:
*/
DataFileWriterBase(const char *filename, const ValidSchema &schema,
size_t syncInterval, Codec codec = NULL_CODEC,
- const Metadata &metadata = {});
+ const Metadata &metadata = {}, std::optional<int>
compressionLevel = std::nullopt);
DataFileWriterBase(std::unique_ptr<OutputStream> outputStream,
const ValidSchema &schema, size_t syncInterval, Codec
codec,
- const Metadata &metadata = {});
+ const Metadata &metadata = {}, std::optional<int>
compressionLevel = std::nullopt);
DataFileWriterBase(const DataFileWriterBase &) = delete;
DataFileWriterBase &operator=(const DataFileWriterBase &) = delete;
@@ -173,11 +173,13 @@ public:
*/
DataFileWriter(const char *filename, const ValidSchema &schema,
size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC,
- const Metadata &metadata = {}) :
base_(std::make_unique<DataFileWriterBase>(filename, schema, syncInterval,
codec, metadata)) {}
+ const Metadata &metadata = {}, std::optional<int>
compressionLevel = std::nullopt)
+ : base_(std::make_unique<DataFileWriterBase>(filename, schema,
syncInterval, codec, metadata, compressionLevel)) {}
DataFileWriter(std::unique_ptr<OutputStream> outputStream, const
ValidSchema &schema,
size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC,
- const Metadata &metadata = {}) :
base_(std::make_unique<DataFileWriterBase>(std::move(outputStream), schema,
syncInterval, codec, metadata)) {}
+ const Metadata &metadata = {}, std::optional<int>
compressionLevel = std::nullopt)
+ : base_(std::make_unique<DataFileWriterBase>(std::move(outputStream),
schema, syncInterval, codec, metadata, compressionLevel)) {}
DataFileWriter(const DataFileWriter &) = delete;
DataFileWriter &operator=(const DataFileWriter &) = delete;
diff --git a/lang/c++/test/DataFileTests.cc b/lang/c++/test/DataFileTests.cc
index 3a44970c55..ed4e93c066 100644
--- a/lang/c++/test/DataFileTests.cc
+++ b/lang/c++/test/DataFileTests.cc
@@ -1284,6 +1284,219 @@ void testMetadataWithZstdCodec() {
}
#endif
+void testDeflateCompressionLevelValidation() {
+ BOOST_TEST_CHECKPOINT(__func__);
+
+ avro::ValidSchema schema = avro::compileJsonSchemaFromString(sch);
+ const char *filename = "test_deflate_level.df";
+
+ boost::mt19937 rng(static_cast<uint32_t>(time(nullptr)));
+ boost::random::uniform_int_distribution<> dist(-100, 100);
+
+ for (int i = 0; i < 100; ++i) {
+ int level = dist(rng);
+ bool isValidLevel = (level >= 0 && level <= 9);
+
+ if (isValidLevel) {
+ // Valid levels should succeed
+ BOOST_CHECK_NO_THROW({
+ avro::DataFileWriter<ComplexInteger> writer(
+ filename, schema, 16 * 1024, avro::DEFLATE_CODEC, {},
level);
+ writer.close();
+ });
+ } else {
+ // Invalid levels should throw
+ BOOST_CHECK_THROW({ avro::DataFileWriter<ComplexInteger> writer(
+ filename, schema, 16 * 1024,
avro::DEFLATE_CODEC, {}, level); }, avro::Exception);
+ }
+ }
+
+ BOOST_CHECK_NO_THROW({
+ avro::DataFileWriter<ComplexInteger> writer(
+ filename, schema, 16 * 1024, avro::DEFLATE_CODEC, {},
std::nullopt);
+ writer.close();
+ });
+
+ std::filesystem::remove(filename);
+}
+
+#ifdef ZSTD_CODEC_AVAILABLE
+void testZstdCompressionLevelValidation() {
+ BOOST_TEST_CHECKPOINT(__func__);
+
+ avro::ValidSchema schema = avro::compileJsonSchemaFromString(sch);
+ const char *filename = "test_zstd_level.df";
+
+ boost::mt19937 rng(static_cast<uint32_t>(time(nullptr)));
+ boost::random::uniform_int_distribution<> dist(-100, 100);
+
+ for (int i = 0; i < 100; ++i) {
+ int level = dist(rng);
+ bool isValidLevel = (level >= 1 && level <= 22);
+
+ if (isValidLevel) {
+ // Valid levels should succeed
+ BOOST_CHECK_NO_THROW({
+ avro::DataFileWriter<ComplexInteger> writer(
+ filename, schema, 16 * 1024, avro::ZSTD_CODEC, {}, level);
+ writer.close();
+ });
+ } else {
+ // Invalid levels should throw
+ BOOST_CHECK_THROW({ avro::DataFileWriter<ComplexInteger> writer(
+ filename, schema, 16 * 1024,
avro::ZSTD_CODEC, {}, level); }, avro::Exception);
+ }
+ }
+
+ BOOST_CHECK_NO_THROW({
+ avro::DataFileWriter<ComplexInteger> writer(
+ filename, schema, 16 * 1024, avro::ZSTD_CODEC, {}, std::nullopt);
+ writer.close();
+ });
+
+ std::filesystem::remove(filename);
+}
+#endif
+
+void testDeflateCompressionRoundTrip() {
+ BOOST_TEST_CHECKPOINT(__func__);
+
+ avro::ValidSchema schema = avro::compileJsonSchemaFromString(sch);
+ const char *filename = "test_deflate_roundtrip.df";
+
+ boost::mt19937 rng(static_cast<uint32_t>(time(nullptr)));
+ boost::random::uniform_int_distribution<> levelDist(0, 10); // 0-9 valid,
10 = nullopt
+ boost::random::uniform_int_distribution<> dataDist(1, 1000);
+
+ for (int i = 0; i < 100; ++i) {
+ int rawLevel = levelDist(rng);
+ std::optional<int> level = (rawLevel == 10) ? std::nullopt :
std::optional<int>(rawLevel);
+ int numRecords = dataDist(rng) % 100 + 1;
+
+ std::vector<ComplexInteger> originalData;
+ int64_t re = rng();
+ int64_t im = rng();
+ for (int j = 0; j < numRecords; ++j) {
+ originalData.emplace_back(re, im);
+ re = re * 31 + im;
+ im = im * 17 + re;
+ }
+
+ // Write with compression level
+ {
+ avro::DataFileWriter<ComplexInteger> writer(
+ filename, schema, 16 * 1024, avro::DEFLATE_CODEC, {}, level);
+ for (const auto &record : originalData) {
+ writer.write(record);
+ }
+ writer.close();
+ }
+
+ // Read back and verify
+ {
+ avro::DataFileReader<ComplexInteger> reader(filename, schema);
+ std::vector<ComplexInteger> readData;
+ ComplexInteger record;
+ while (reader.read(record)) {
+ readData.push_back(record);
+ }
+
+ BOOST_CHECK_EQUAL(readData.size(), originalData.size());
+ for (size_t j = 0; j < originalData.size() && j < readData.size();
++j) {
+ BOOST_CHECK_EQUAL(readData[j].re, originalData[j].re);
+ BOOST_CHECK_EQUAL(readData[j].im, originalData[j].im);
+ }
+ }
+ }
+
+ std::filesystem::remove(filename);
+}
+
+#ifdef ZSTD_CODEC_AVAILABLE
+void testZstdCompressionRoundTrip() {
+ BOOST_TEST_CHECKPOINT(__func__);
+
+ avro::ValidSchema schema = avro::compileJsonSchemaFromString(sch);
+ const char *filename = "test_zstd_roundtrip.df";
+
+ boost::mt19937 rng(static_cast<uint32_t>(time(nullptr)));
+ // Valid ZSTD levels: 1-22
+ boost::random::uniform_int_distribution<> levelDist(0, 22); // 0 =
nullopt, 1-22 = valid levels
+ boost::random::uniform_int_distribution<> dataDist(1, 1000);
+
+ for (int i = 0; i < 100; ++i) {
+ int rawLevel = levelDist(rng);
+ std::optional<int> level = (rawLevel == 0) ? std::nullopt :
std::optional<int>(rawLevel);
+ int numRecords = dataDist(rng) % 100 + 1;
+
+ std::vector<ComplexInteger> originalData;
+ int64_t re = rng();
+ int64_t im = rng();
+ for (int j = 0; j < numRecords; ++j) {
+ originalData.emplace_back(re, im);
+ re = re * 31 + im;
+ im = im * 17 + re;
+ }
+
+ // Write with compression level
+ {
+ avro::DataFileWriter<ComplexInteger> writer(
+ filename, schema, 16 * 1024, avro::ZSTD_CODEC, {}, level);
+ for (const auto &record : originalData) {
+ writer.write(record);
+ }
+ writer.close();
+ }
+
+ // Read back and verify
+ {
+ avro::DataFileReader<ComplexInteger> reader(filename, schema);
+ std::vector<ComplexInteger> readData;
+ ComplexInteger record;
+ while (reader.read(record)) {
+ readData.push_back(record);
+ }
+
+ BOOST_CHECK_EQUAL(readData.size(), originalData.size());
+ for (size_t j = 0; j < originalData.size() && j < readData.size();
++j) {
+ BOOST_CHECK_EQUAL(readData[j].re, originalData[j].re);
+ BOOST_CHECK_EQUAL(readData[j].im, originalData[j].im);
+ }
+ }
+ }
+
+ std::filesystem::remove(filename);
+}
+#endif
+
+void testCodecEnumValues() {
+ BOOST_TEST_CHECKPOINT(__func__);
+
+ BOOST_CHECK_EQUAL(static_cast<int>(avro::NULL_CODEC), 0);
+ BOOST_CHECK_EQUAL(static_cast<int>(avro::DEFLATE_CODEC), 1);
+ BOOST_CHECK_EQUAL(static_cast<int>(avro::SNAPPY_CODEC), 2);
+ BOOST_CHECK_EQUAL(static_cast<int>(avro::ZSTD_CODEC), 3);
+}
+
+void testIsCodecAvailable() {
+ BOOST_TEST_CHECKPOINT(__func__);
+
+ BOOST_CHECK_EQUAL(avro::isCodecAvailable(avro::NULL_CODEC), true);
+ BOOST_CHECK_EQUAL(avro::isCodecAvailable(avro::DEFLATE_CODEC), true);
+
+#ifdef SNAPPY_CODEC_AVAILABLE
+ BOOST_CHECK_EQUAL(avro::isCodecAvailable(avro::SNAPPY_CODEC), true);
+#else
+ BOOST_CHECK_EQUAL(avro::isCodecAvailable(avro::SNAPPY_CODEC), false);
+#endif
+
+#ifdef ZSTD_CODEC_AVAILABLE
+ BOOST_CHECK_EQUAL(avro::isCodecAvailable(avro::ZSTD_CODEC), true);
+#else
+ BOOST_CHECK_EQUAL(avro::isCodecAvailable(avro::ZSTD_CODEC), false);
+#endif
+}
+
test_suite *
init_unit_test_suite(int, char *[]) {
{
@@ -1487,5 +1700,21 @@ init_unit_test_suite(int, char *[]) {
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testMetadataWithZstdCodec));
#endif
+ // Codec enum and isCodecAvailable tests
+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testCodecEnumValues));
+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testIsCodecAvailable));
+
+ // Compression level validation property tests
+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testDeflateCompressionLevelValidation));
+#ifdef ZSTD_CODEC_AVAILABLE
+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testZstdCompressionLevelValidation));
+#endif
+
+ // Compression round-trip property tests
+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testDeflateCompressionRoundTrip));
+#ifdef ZSTD_CODEC_AVAILABLE
+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testZstdCompressionRoundTrip));
+#endif
+
return nullptr;
}