This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/main by this push:
new 356efee046 AVRO-4132: [C++] Add ZSTD codec (#3364)
356efee046 is described below
commit 356efee046234004e77a8719d24b9ca03fa79aea
Author: Gang Wu <[email protected]>
AuthorDate: Sun Apr 27 14:58:19 2025 +0800
AVRO-4132: [C++] Add ZSTD codec (#3364)
* AVRO-4132: [C++] Add ZSTD codec
* assign value to the enum Codec
---
.github/workflows/test-lang-c++-ARM.yml | 2 +-
.github/workflows/test-lang-c++.yml | 2 +-
lang/c++/CMakeLists.txt | 16 +++++-
lang/c++/impl/DataFile.cc | 90 +++++++++++++++++++++++++++++++++
lang/c++/include/avro/DataFile.hh | 10 ++--
lang/c++/test/DataFileTests.cc | 82 ++++++++++++++++++++++++++++++
6 files changed, 196 insertions(+), 6 deletions(-)
diff --git a/.github/workflows/test-lang-c++-ARM.yml
b/.github/workflows/test-lang-c++-ARM.yml
index 759065b089..a34354e20f 100644
--- a/.github/workflows/test-lang-c++-ARM.yml
+++ b/.github/workflows/test-lang-c++-ARM.yml
@@ -44,7 +44,7 @@ jobs:
- name: Install dependencies
run: |
sudo apt-get update -q
- sudo apt-get install -q -y gcc g++ libboost-all-dev libfmt-dev
zlib1g-dev cmake
+ sudo apt-get install -q -y gcc g++ libboost-all-dev libfmt-dev
zlib1g-dev libzstd-dev cmake
- name: Build
run: |
diff --git a/.github/workflows/test-lang-c++.yml
b/.github/workflows/test-lang-c++.yml
index c0c66ceec1..2104c2d26c 100644
--- a/.github/workflows/test-lang-c++.yml
+++ b/.github/workflows/test-lang-c++.yml
@@ -39,7 +39,7 @@ jobs:
- uses: actions/checkout@v4
- name: Install Dependencies
- run: sudo apt update && sudo apt-get install -qqy cppcheck
libboost-all-dev libsnappy-dev libfmt-dev zlib1g-dev cmake
+ run: sudo apt update && sudo apt-get install -qqy cppcheck
libboost-all-dev libsnappy-dev libfmt-dev zlib1g-dev libzstd-dev cmake
- name: Print Versions
run: |
diff --git a/lang/c++/CMakeLists.txt b/lang/c++/CMakeLists.txt
index dacde31840..d4586558a3 100644
--- a/lang/c++/CMakeLists.txt
+++ b/lang/c++/CMakeLists.txt
@@ -98,6 +98,16 @@ else ()
message("Disabled snappy codec.")
endif ()
+find_package(zstd CONFIG)
+if(zstd_FOUND)
+ message("Enabled zstd codec, version: ${zstd_VERSION}")
+ set(ZSTD_TARGET
$<IF:$<TARGET_EXISTS:zstd::libzstd_shared>,zstd::libzstd_shared,zstd::libzstd_static>)
+ add_definitions(-DZSTD_CODEC_AVAILABLE)
+else()
+ message("Disabled zstd codec.")
+ set(ZSTD_TARGET "")
+endif()
+
# FindZLIB guarantees that ZLIB::ZLIB target exists if found
# See https://cmake.org/cmake/help/latest/module/FindZLIB.html#imported-targets
find_package(ZLIB REQUIRED)
@@ -135,18 +145,22 @@ target_link_libraries(avrocpp PUBLIC
$<BUILD_INTERFACE:fmt::fmt-header-only>
$<BUILD_INTERFACE:ZLIB::ZLIB>
$<BUILD_INTERFACE:$<TARGET_NAME_IF_EXISTS:Snappy::snappy>>
+
$<$<BOOL:${zstd_FOUND}>:$<BUILD_INTERFACE:$<TARGET_NAME_IF_EXISTS:${ZSTD_TARGET}>>>
$<BUILD_INTERFACE:$<TARGET_NAME_IF_EXISTS:Boost::system>>
$<INSTALL_INTERFACE:ZLIB::ZLIB>
$<INSTALL_INTERFACE:$<TARGET_NAME_IF_EXISTS:Snappy::snappy>>
+
$<$<BOOL:${zstd_FOUND}>:$<INSTALL_INTERFACE:$<TARGET_NAME_IF_EXISTS:${ZSTD_TARGET}>>>
$<INSTALL_INTERFACE:$<TARGET_NAME_IF_EXISTS:Boost::system>>
)
target_link_libraries(avrocpp_s PUBLIC
$<BUILD_INTERFACE:fmt::fmt-header-only>
$<BUILD_INTERFACE:ZLIB::ZLIB>
$<BUILD_INTERFACE:$<TARGET_NAME_IF_EXISTS:Snappy::snappy>>
+
$<$<BOOL:${zstd_FOUND}>:$<BUILD_INTERFACE:$<TARGET_NAME_IF_EXISTS:${ZSTD_TARGET}>>>
$<BUILD_INTERFACE:$<TARGET_NAME_IF_EXISTS:Boost::system>>
$<INSTALL_INTERFACE:ZLIB::ZLIB>
$<INSTALL_INTERFACE:$<TARGET_NAME_IF_EXISTS:Snappy::snappy>>
+
$<$<BOOL:${zstd_FOUND}>:$<INSTALL_INTERFACE:$<TARGET_NAME_IF_EXISTS:${ZSTD_TARGET}>>>
$<INSTALL_INTERFACE:$<TARGET_NAME_IF_EXISTS:Boost::system>>
)
@@ -205,7 +219,7 @@ if (AVRO_BUILD_TESTS)
macro (unittest name)
add_executable (${name} test/${name}.cc)
- target_link_libraries (${name} avrocpp_s Boost::system ZLIB::ZLIB
$<TARGET_NAME_IF_EXISTS:Snappy::snappy>)
+ target_link_libraries (${name} avrocpp_s Boost::system ZLIB::ZLIB
$<TARGET_NAME_IF_EXISTS:Snappy::snappy>
$<$<BOOL:${zstd_FOUND}>:$<TARGET_NAME_IF_EXISTS:${ZSTD_TARGET}>>)
add_test (NAME ${name} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
COMMAND ${CMAKE_CURRENT_BINARY_DIR}/${name})
endmacro (unittest)
diff --git a/lang/c++/impl/DataFile.cc b/lang/c++/impl/DataFile.cc
index fb0eaa2ac8..f633b959ca 100644
--- a/lang/c++/impl/DataFile.cc
+++ b/lang/c++/impl/DataFile.cc
@@ -27,6 +27,10 @@
#include <snappy.h>
#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+#include <zstd.h>
+#endif
+
#include <zlib.h>
namespace avro {
@@ -49,6 +53,10 @@ const string AVRO_DEFLATE_CODEC("deflate");
const string AVRO_SNAPPY_CODEC = "snappy";
#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+const string AVRO_ZSTD_CODEC = "zstd";
+#endif
+
const size_t minSyncInterval = 32;
const size_t maxSyncInterval = 1u << 30;
@@ -100,6 +108,10 @@ void DataFileWriterBase::init(const ValidSchema &schema,
size_t syncInterval, co
#ifdef SNAPPY_CODEC_AVAILABLE
} else if (codec_ == SNAPPY_CODEC) {
setMetadata(AVRO_CODEC_KEY, AVRO_SNAPPY_CODEC);
+#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+ } else if (codec_ == ZSTD_CODEC) {
+ setMetadata(AVRO_CODEC_KEY, AVRO_ZSTD_CODEC);
#endif
} else {
throw Exception("Unknown codec: {}", int(codec));
@@ -216,6 +228,39 @@ void DataFileWriterBase::sync() {
avro::encode(*encoderPtr_, byteCount);
encoderPtr_->flush();
copy(*in, *stream_);
+#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+ } else if (codec_ == ZSTD_CODEC) {
+ // Read all uncompressed data into a single buffer
+ std::vector<char> uncompressed;
+ const uint8_t *data;
+ size_t len;
+ std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
+ while (input->next(&data, &len)) {
+ uncompressed.insert(uncompressed.end(), reinterpret_cast<const
char *>(data),
+ reinterpret_cast<const char *>(data) + len);
+ }
+
+ // Pre-allocate buffer for compressed data
+ size_t max_compressed_size = ZSTD_compressBound(uncompressed.size());
+ std::vector<char> compressed(max_compressed_size);
+
+ // Compress the data using ZSTD block API
+ size_t compressed_size = ZSTD_compress(
+ compressed.data(), max_compressed_size,
+ uncompressed.data(), uncompressed.size(),
+ ZSTD_CLEVEL_DEFAULT);
+
+ if (ZSTD_isError(compressed_size)) {
+ throw Exception("ZSTD compression error: {}",
ZSTD_getErrorName(compressed_size));
+ }
+
+ compressed.resize(compressed_size);
+ std::unique_ptr<InputStream> in = memoryInputStream(
+ reinterpret_cast<const uint8_t *>(compressed.data()),
compressed.size());
+ avro::encode(*encoderPtr_, static_cast<int64_t>(compressed_size));
+ encoderPtr_->flush();
+ copy(*in, *stream_);
#endif
}
@@ -431,6 +476,46 @@ void DataFileReaderBase::readDataBlock() {
dataDecoder_->init(*in);
dataStream_ = std::move(in);
#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+ } else if (codec_ == ZSTD_CODEC) {
+ compressed_.clear();
+ const uint8_t *data;
+ size_t len;
+ while (st->next(&data, &len)) {
+ compressed_.insert(compressed_.end(), data, data + len);
+ }
+
+ // Get the decompressed size
+ size_t decompressed_size = ZSTD_getFrameContentSize(
+ reinterpret_cast<const char *>(compressed_.data()),
compressed_.size());
+ if (decompressed_size == ZSTD_CONTENTSIZE_ERROR) {
+ throw Exception("ZSTD: Not a valid compressed frame");
+ } else if (decompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) {
+ throw Exception("ZSTD: Unable to determine decompressed size");
+ }
+
+ // Decompress the data
+ uncompressed.clear();
+ uncompressed.resize(decompressed_size);
+ size_t result = ZSTD_decompress(
+ uncompressed.data(), decompressed_size,
+ reinterpret_cast<const char *>(compressed_.data()),
compressed_.size());
+
+ if (ZSTD_isError(result)) {
+ throw Exception("ZSTD decompression error: {}",
ZSTD_getErrorName(result));
+ }
+ if (result != decompressed_size) {
+ throw Exception("ZSTD: Decompressed size mismatch: expected {},
got {}",
+ decompressed_size, result);
+ }
+
+ std::unique_ptr<InputStream> in = memoryInputStream(
+ reinterpret_cast<const uint8_t *>(uncompressed.data()),
+ uncompressed.size());
+
+ dataDecoder_->init(*in);
+ dataStream_ = std::move(in);
+#endif
} else {
compressed_.clear();
uncompressed.clear();
@@ -530,6 +615,11 @@ void DataFileReaderBase::readHeader() {
} else if (it != metadata_.end()
&& toString(it->second) == AVRO_SNAPPY_CODEC) {
codec_ = SNAPPY_CODEC;
+#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+ } else if (it != metadata_.end()
+ && toString(it->second) == AVRO_ZSTD_CODEC) {
+ codec_ = ZSTD_CODEC;
#endif
} else {
codec_ = NULL_CODEC;
diff --git a/lang/c++/include/avro/DataFile.hh
b/lang/c++/include/avro/DataFile.hh
index 6b2cdab5c1..8ff5d88c61 100644
--- a/lang/c++/include/avro/DataFile.hh
+++ b/lang/c++/include/avro/DataFile.hh
@@ -35,11 +35,15 @@ namespace avro {
/** Specify type of compression to use when writing data files. */
enum Codec {
- NULL_CODEC,
- DEFLATE_CODEC,
+ NULL_CODEC = 0,
+ DEFLATE_CODEC = 1,
#ifdef SNAPPY_CODEC_AVAILABLE
- SNAPPY_CODEC
+ SNAPPY_CODEC = 2,
+#endif
+
+#ifdef ZSTD_CODEC_AVAILABLE
+ ZSTD_CODEC = 3,
#endif
};
diff --git a/lang/c++/test/DataFileTests.cc b/lang/c++/test/DataFileTests.cc
index c360a07dbb..07a6b6b9a3 100644
--- a/lang/c++/test/DataFileTests.cc
+++ b/lang/c++/test/DataFileTests.cc
@@ -216,6 +216,12 @@ public:
}
#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+ void testWriteWithZstdCodec() {
+ testWriteWithCodec(avro::ZSTD_CODEC);
+ }
+#endif
+
void testWriteWithCodec(avro::Codec codec) {
avro::DataFileWriter<ComplexInteger> df(filename, writerSchema, 100,
codec);
int64_t re = 3;
@@ -618,6 +624,39 @@ public:
}
#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+ void testZstd() {
+ // Add enough objects to span multiple blocks
+ const size_t number_of_objects = 1000000;
+ // first create a large file
+ ValidSchema dschema = avro::compileJsonSchemaFromString(sch);
+ {
+ avro::DataFileWriter<ComplexInteger> writer(
+ filename, dschema, 16 * 1024, avro::ZSTD_CODEC);
+
+ for (size_t i = 0; i < number_of_objects; ++i) {
+ ComplexInteger d;
+ d.re = i;
+ d.im = 2 * i;
+ writer.write(d);
+ }
+ }
+ {
+ avro::DataFileReader<ComplexInteger> reader(filename, dschema);
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ std::vector<int64_t> found;
+ ComplexInteger record;
+ while (reader.read(record)) {
+ found.push_back(record.re);
+ }
+ BOOST_CHECK_EQUAL(found.size(), number_of_objects);
+ for (unsigned int i = 0; i < found.size(); ++i) {
+ BOOST_CHECK_EQUAL(found[i], i);
+ }
+ }
+ }
+#endif
+
void testSchemaReadWrite() {
uint32_t a = 42;
{
@@ -792,6 +831,13 @@ void testSkipStringSnappyCodec() {
}
#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+void testSkipStringZstdCodec() {
+ BOOST_TEST_CHECKPOINT(__func__);
+ testSkipString(avro::ZSTD_CODEC);
+}
+#endif
+
struct TestRecord {
std::string s1;
int64_t id;
@@ -1005,6 +1051,13 @@ void testLastSyncSnappyCodec() {
}
#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+void testLastSyncZstdCodec() {
+ BOOST_TEST_CHECKPOINT(__func__);
+ testLastSync(avro::ZSTD_CODEC);
+}
+#endif
+
void testReadRecordEfficientlyUsingLastSyncNullCodec() {
BOOST_TEST_CHECKPOINT(__func__);
testReadRecordEfficientlyUsingLastSync(avro::NULL_CODEC);
@@ -1022,6 +1075,13 @@ void testReadRecordEfficientlyUsingLastSyncSnappyCodec()
{
}
#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+void testReadRecordEfficientlyUsingLastSyncZstdCodec() {
+ BOOST_TEST_CHECKPOINT(__func__);
+ testReadRecordEfficientlyUsingLastSync(avro::ZSTD_CODEC);
+}
+#endif
+
test_suite *
init_unit_test_suite(int, char *[]) {
{
@@ -1055,6 +1115,16 @@ init_unit_test_suite(int, char *[]) {
addReaderTests(ts, t1);
boost::unit_test::framework::master_test_suite().add(ts);
}
+#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+ {
+ auto *ts = BOOST_TEST_SUITE("DataFile tests: test1.zstd.df");
+ shared_ptr<DataFileTest> t1(new DataFileTest("test1.zstd.df", sch,
isch));
+ ts->add(BOOST_CLASS_TEST_CASE(
+ &DataFileTest::testWriteWithZstdCodec, t1));
+ addReaderTests(ts, t1);
+ boost::unit_test::framework::master_test_suite().add(ts);
+ }
#endif
{
auto *ts = BOOST_TEST_SUITE("DataFile tests: test2.df");
@@ -1101,6 +1171,9 @@ init_unit_test_suite(int, char *[]) {
shared_ptr<DataFileTest> t8(new DataFileTest("test8.df", dsch,
dblsch));
#ifdef SNAPPY_CODEC_AVAILABLE
ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testSnappy, t8));
+#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testZstd, t8));
#endif
boost::unit_test::framework::master_test_suite().add(ts);
}
@@ -1165,18 +1238,27 @@ init_unit_test_suite(int, char *[]) {
#ifdef SNAPPY_CODEC_AVAILABLE
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testSkipStringSnappyCodec));
#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testSkipStringZstdCodec));
+#endif
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testLastSyncNullCodec));
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testLastSyncDeflateCodec));
#ifdef SNAPPY_CODEC_AVAILABLE
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testLastSyncSnappyCodec));
#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testLastSyncZstdCodec));
+#endif
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncNullCodec));
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncDeflateCodec));
#ifdef SNAPPY_CODEC_AVAILABLE
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncSnappyCodec));
#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncZstdCodec));
+#endif
return nullptr;
}