This is an automated email from the ASF dual-hosted git repository.
thiru pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/main by this push:
new 54b3321615 AVRO-4172: [C++] Fix ZSTD codec compatibility (#3457)
54b3321615 is described below
commit 54b332161524086dcb6cde8afe097097eed7f3ee
Author: Zhang Jiawei <[email protected]>
AuthorDate: Thu Oct 16 00:56:21 2025 +0800
AVRO-4172: [C++] Fix ZSTD codec compatibility (#3457)
* AVRO-4172: [C++] Fix ZSTD codec compatibility
* AVRO-4172: [C++] add codec compatibility test
* fix
* fix
* fix
* add ZstdCodecWrapper for compression
* fix
* fix
* split zstd compress and decompress wrapper
* fix
---
lang/c++/CMakeLists.txt | 2 +
lang/c++/impl/DataFile.cc | 50 ++++----------------
lang/c++/impl/ZstdCompressWrapper.cc | 62 ++++++++++++++++++++++++
lang/c++/impl/ZstdCompressWrapper.hh | 45 ++++++++++++++++++
lang/c++/impl/ZstdDecompressWrapper.cc | 78 ++++++++++++++++++++++++++++++
lang/c++/impl/ZstdDecompressWrapper.hh | 46 ++++++++++++++++++
lang/c++/test/DataFileTests.cc | 84 +++++++++++++++++++++++++++++++++
share/test/data/weather-deflate.avro | Bin 0 -> 319 bytes
share/test/data/weather-zstd.avro | Bin 0 -> 333 bytes
9 files changed, 326 insertions(+), 41 deletions(-)
diff --git a/lang/c++/CMakeLists.txt b/lang/c++/CMakeLists.txt
index 07eefe0b26..03f4dfb8ab 100644
--- a/lang/c++/CMakeLists.txt
+++ b/lang/c++/CMakeLists.txt
@@ -129,6 +129,8 @@ set (AVRO_SOURCE_FILES
impl/Stream.cc impl/FileStream.cc
impl/Generic.cc impl/GenericDatum.cc
impl/DataFile.cc
+ impl/ZstdCompressWrapper.cc
+ impl/ZstdDecompressWrapper.cc
impl/parsing/Symbol.cc
impl/parsing/ValidatingCodec.cc
impl/parsing/JsonCodec.cc
diff --git a/lang/c++/impl/DataFile.cc b/lang/c++/impl/DataFile.cc
index 5bf27c1ab7..e605a6f448 100644
--- a/lang/c++/impl/DataFile.cc
+++ b/lang/c++/impl/DataFile.cc
@@ -28,7 +28,8 @@
#endif
#ifdef ZSTD_CODEC_AVAILABLE
-#include <zstd.h>
+#include "ZstdCompressWrapper.hh"
+#include "ZstdDecompressWrapper.hh"
#endif
#include <zlib.h>
@@ -244,24 +245,12 @@ void DataFileWriterBase::sync() {
reinterpret_cast<const char *>(data) + len);
}
- // Pre-allocate buffer for compressed data
- size_t max_compressed_size = ZSTD_compressBound(uncompressed.size());
- std::vector<char> compressed(max_compressed_size);
+ ZstdCompressWrapper zstdCompressWrapper;
+ std::vector<char> compressed =
zstdCompressWrapper.compress(uncompressed);
- // Compress the data using ZSTD block API
- size_t compressed_size = ZSTD_compress(
- compressed.data(), max_compressed_size,
- uncompressed.data(), uncompressed.size(),
- ZSTD_CLEVEL_DEFAULT);
-
- if (ZSTD_isError(compressed_size)) {
- throw Exception("ZSTD compression error: {}",
ZSTD_getErrorName(compressed_size));
- }
-
- compressed.resize(compressed_size);
std::unique_ptr<InputStream> in = memoryInputStream(
reinterpret_cast<const uint8_t *>(compressed.data()),
compressed.size());
- avro::encode(*encoderPtr_, static_cast<int64_t>(compressed_size));
+ avro::encode(*encoderPtr_, static_cast<int64_t>(compressed.size()));
encoderPtr_->flush();
copy(*in, *stream_);
#endif
@@ -482,35 +471,15 @@ void DataFileReaderBase::readDataBlock() {
#ifdef ZSTD_CODEC_AVAILABLE
} else if (codec_ == ZSTD_CODEC) {
compressed_.clear();
+ uncompressed.clear();
const uint8_t *data;
size_t len;
while (st->next(&data, &len)) {
compressed_.insert(compressed_.end(), data, data + len);
}
- // Get the decompressed size
- size_t decompressed_size = ZSTD_getFrameContentSize(
- reinterpret_cast<const char *>(compressed_.data()),
compressed_.size());
- if (decompressed_size == ZSTD_CONTENTSIZE_ERROR) {
- throw Exception("ZSTD: Not a valid compressed frame");
- } else if (decompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) {
- throw Exception("ZSTD: Unable to determine decompressed size");
- }
-
- // Decompress the data
- uncompressed.clear();
- uncompressed.resize(decompressed_size);
- size_t result = ZSTD_decompress(
- uncompressed.data(), decompressed_size,
- reinterpret_cast<const char *>(compressed_.data()),
compressed_.size());
-
- if (ZSTD_isError(result)) {
- throw Exception("ZSTD decompression error: {}",
ZSTD_getErrorName(result));
- }
- if (result != decompressed_size) {
- throw Exception("ZSTD: Decompressed size mismatch: expected {},
got {}",
- decompressed_size, result);
- }
+ ZstdDecompressWrapper zstdDecompressWrapper;
+ uncompressed = zstdDecompressWrapper.decompress(compressed_);
std::unique_ptr<InputStream> in = memoryInputStream(
reinterpret_cast<const uint8_t *>(uncompressed.data()),
@@ -620,8 +589,7 @@ void DataFileReaderBase::readHeader() {
codec_ = SNAPPY_CODEC;
#endif
#ifdef ZSTD_CODEC_AVAILABLE
- } else if (it != metadata_.end()
- && toString(it->second) == AVRO_ZSTD_CODEC) {
+ } else if (it != metadata_.end() && toString(it->second) ==
AVRO_ZSTD_CODEC) {
codec_ = ZSTD_CODEC;
#endif
} else {
diff --git a/lang/c++/impl/ZstdCompressWrapper.cc
b/lang/c++/impl/ZstdCompressWrapper.cc
new file mode 100644
index 0000000000..b756e19d8e
--- /dev/null
+++ b/lang/c++/impl/ZstdCompressWrapper.cc
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef ZSTD_CODEC_AVAILABLE
+
+#include "ZstdCompressWrapper.hh"
+#include "Exception.hh"
+
+#include <zstd.h>
+
+namespace avro {
+
+std::vector<char> ZstdCompressWrapper::compress(const std::vector<char>
&uncompressed) {
+ // Pre-allocate buffer for compressed data
+ size_t max_compressed_size = ZSTD_compressBound(uncompressed.size());
+ if (ZSTD_isError(max_compressed_size)) {
+ throw Exception("ZSTD compression error: {}",
ZSTD_getErrorName(max_compressed_size));
+ }
+ std::vector<char> compressed(max_compressed_size);
+
+ // Compress the data using ZSTD block API
+ size_t compressed_size = ZSTD_compress(
+ compressed.data(), max_compressed_size,
+ uncompressed.data(), uncompressed.size(),
+ ZSTD_CLEVEL_DEFAULT);
+
+ if (ZSTD_isError(compressed_size)) {
+ throw Exception("ZSTD compression error: {}",
ZSTD_getErrorName(compressed_size));
+ }
+ compressed.resize(compressed_size);
+ return compressed;
+}
+
+ZstdCompressWrapper::ZstdCompressWrapper() {
+ cctx_ = ZSTD_createCCtx();
+ if (!cctx_) {
+ throw Exception("ZSTD_createCCtx() failed");
+ }
+}
+
+ZstdCompressWrapper::~ZstdCompressWrapper() {
+ ZSTD_freeCCtx(cctx_);
+}
+
+} // namespace avro
+
+#endif // ZSTD_CODEC_AVAILABLE
diff --git a/lang/c++/impl/ZstdCompressWrapper.hh
b/lang/c++/impl/ZstdCompressWrapper.hh
new file mode 100644
index 0000000000..871dd711a6
--- /dev/null
+++ b/lang/c++/impl/ZstdCompressWrapper.hh
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef avro_ZstdCompressWrapper_hh__
+#define avro_ZstdCompressWrapper_hh__
+
+#ifdef ZSTD_CODEC_AVAILABLE
+
+#include <vector>
+
+#include <zstd.h>
+
+namespace avro {
+
+class ZstdCompressWrapper {
+public:
+ ZstdCompressWrapper();
+ ~ZstdCompressWrapper();
+
+ std::vector<char> compress(const std::vector<char> &uncompressed);
+
+private:
+ ZSTD_CCtx *cctx_ = nullptr;
+};
+
+} // namespace avro
+
+#endif // ZSTD_CODEC_AVAILABLE
+
+#endif // avro_ZstdCompressWrapper_hh__
diff --git a/lang/c++/impl/ZstdDecompressWrapper.cc
b/lang/c++/impl/ZstdDecompressWrapper.cc
new file mode 100644
index 0000000000..86d996dd31
--- /dev/null
+++ b/lang/c++/impl/ZstdDecompressWrapper.cc
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef ZSTD_CODEC_AVAILABLE
+
+#include "ZstdDecompressWrapper.hh"
+#include "Exception.hh"
+
+#include <zstd.h>
+
+namespace avro {
+
+std::string ZstdDecompressWrapper::decompress(const std::vector<char>
&compressed) {
+ std::string uncompressed;
+ // Get the decompressed size
+ size_t decompressed_size = ZSTD_getFrameContentSize(compressed.data(),
compressed.size());
+ if (decompressed_size == ZSTD_CONTENTSIZE_ERROR) {
+ throw Exception("ZSTD: Not a valid compressed frame");
+ } else if (decompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) {
+ // Stream decompress the data
+ ZSTD_inBuffer in{compressed.data(), compressed.size(), 0};
+ std::vector<char> tmp(ZSTD_DStreamOutSize());
+ ZSTD_outBuffer out{tmp.data(), tmp.size(), 0};
+ size_t ret;
+ do {
+ out.pos = 0;
+ ret = ZSTD_decompressStream(dctx_, &out, &in);
+ if (ZSTD_isError(ret)) {
+ throw Exception("ZSTD decompression error: {}",
ZSTD_getErrorName(ret));
+ }
+ uncompressed.append(tmp.data(), out.pos);
+ } while (ret != 0);
+ } else {
+ // Batch decompress the data
+ uncompressed.resize(decompressed_size);
+ size_t result = ZSTD_decompress(
+ uncompressed.data(), decompressed_size, compressed.data(),
compressed.size());
+
+ if (ZSTD_isError(result)) {
+ throw Exception("ZSTD decompression error: {}",
ZSTD_getErrorName(result));
+ }
+ if (result != decompressed_size) {
+ throw Exception("ZSTD: Decompressed size mismatch: expected {},
got {}",
+ decompressed_size, result);
+ }
+ }
+ return uncompressed;
+}
+
+ZstdDecompressWrapper::ZstdDecompressWrapper() {
+ dctx_ = ZSTD_createDCtx();
+ if (!dctx_) {
+ throw Exception("ZSTD_createDCtx() failed");
+ }
+}
+
+ZstdDecompressWrapper::~ZstdDecompressWrapper() {
+ ZSTD_freeDCtx(dctx_);
+}
+
+} // namespace avro
+
+#endif // ZSTD_CODEC_AVAILABLE
diff --git a/lang/c++/impl/ZstdDecompressWrapper.hh
b/lang/c++/impl/ZstdDecompressWrapper.hh
new file mode 100644
index 0000000000..b5b97758c4
--- /dev/null
+++ b/lang/c++/impl/ZstdDecompressWrapper.hh
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef avro_ZstdDecompressWrapper_hh__
+#define avro_ZstdDecompressWrapper_hh__
+
+#ifdef ZSTD_CODEC_AVAILABLE
+
+#include <string>
+#include <vector>
+
+#include <zstd.h>
+
+namespace avro {
+
+class ZstdDecompressWrapper {
+public:
+ ZstdDecompressWrapper();
+ ~ZstdDecompressWrapper();
+
+ std::string decompress(const std::vector<char> &compressed);
+
+private:
+ ZSTD_DCtx *dctx_ = nullptr;
+};
+
+} // namespace avro
+
+#endif // ZSTD_CODEC_AVAILABLE
+
+#endif // avro_ZstdDecompressWrapper_hh__
diff --git a/lang/c++/test/DataFileTests.cc b/lang/c++/test/DataFileTests.cc
index 3723615fb8..3a44970c55 100644
--- a/lang/c++/test/DataFileTests.cc
+++ b/lang/c++/test/DataFileTests.cc
@@ -892,6 +892,81 @@ void testSkipStringZstdCodec() {
}
#endif
+struct Weather {
+ std::string station;
+ int64_t time;
+ int32_t temp;
+ Weather(const char *station, int64_t time, int32_t temp)
+ : station(station), time(time), temp(temp) {}
+
+ bool operator==(const Weather &other) const {
+ return station == other.station && time == other.time && temp ==
other.temp;
+ }
+ friend std::ostream &operator<<(std::ostream &os, const Weather &w) {
+ return os << w.station << ' ' << w.time << ' ' << w.temp;
+ }
+};
+
+namespace avro {
+template<>
+struct codec_traits<Weather> {
+ static void decode(Decoder &d, Weather &v) {
+ avro::decode(d, v.station);
+ avro::decode(d, v.time);
+ avro::decode(d, v.temp);
+ }
+};
+} // namespace avro
+
+void testCompatibility(const char *filename) {
+ const char *readerSchemaStr = "{"
+ "\"type\": \"record\", \"name\":
\"test.Weather\", \"fields\":["
+ "{\"name\": \"station\", \"type\":
\"string\", \"order\": \"ignore\"},"
+ "{\"name\": \"time\", \"type\": \"long\"},"
+ "{\"name\": \"temp\", \"type\": \"int\"}"
+ "]}";
+ avro::ValidSchema readerSchema =
+ avro::compileJsonSchemaFromString(readerSchemaStr);
+ avro::DataFileReader<Weather> df(filename, readerSchema);
+
+ Weather ro("", -1, -1);
+ BOOST_CHECK_EQUAL(df.read(ro), true);
+ BOOST_CHECK_EQUAL(ro, Weather("011990-99999", -619524000000L, 0));
+ BOOST_CHECK_EQUAL(df.read(ro), true);
+ BOOST_CHECK_EQUAL(ro, Weather("011990-99999", -619506000000L, 22));
+ BOOST_CHECK_EQUAL(df.read(ro), true);
+ BOOST_CHECK_EQUAL(ro, Weather("011990-99999", -619484400000L, -11));
+ BOOST_CHECK_EQUAL(df.read(ro), true);
+ BOOST_CHECK_EQUAL(ro, Weather("012650-99999", -655531200000L, 111));
+ BOOST_CHECK_EQUAL(df.read(ro), true);
+ BOOST_CHECK_EQUAL(ro, Weather("012650-99999", -655509600000L, 78));
+ BOOST_CHECK_EQUAL(df.read(ro), false);
+}
+
+void testCompatibilityNullCodec() {
+ BOOST_TEST_CHECKPOINT(__func__);
+ testCompatibility("../../share/test/data/weather.avro");
+}
+
+void testCompatibilityDeflateCodec() {
+ BOOST_TEST_CHECKPOINT(__func__);
+ testCompatibility("../../share/test/data/weather-deflate.avro");
+}
+
+#ifdef SNAPPY_CODEC_AVAILABLE
+void testCompatibilitySnappyCodec() {
+ BOOST_TEST_CHECKPOINT(__func__);
+ testCompatibility("../../share/test/data/weather-snappy.avro");
+}
+#endif
+
+#ifdef ZSTD_CODEC_AVAILABLE
+void testCompatibilityZstdCodec() {
+ BOOST_TEST_CHECKPOINT(__func__);
+ testCompatibility("../../share/test/data/weather-zstd.avro");
+}
+#endif
+
struct TestRecord {
std::string s1;
int64_t id;
@@ -1376,6 +1451,15 @@ init_unit_test_suite(int, char *[]) {
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testSkipStringZstdCodec));
#endif
+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testCompatibilityNullCodec));
+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testCompatibilityDeflateCodec));
+#ifdef SNAPPY_CODEC_AVAILABLE
+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testCompatibilitySnappyCodec));
+#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testCompatibilityZstdCodec));
+#endif
+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testLastSyncNullCodec));
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testLastSyncDeflateCodec));
#ifdef SNAPPY_CODEC_AVAILABLE
diff --git a/share/test/data/weather-deflate.avro
b/share/test/data/weather-deflate.avro
new file mode 100644
index 0000000000..b07db86b21
Binary files /dev/null and b/share/test/data/weather-deflate.avro differ
diff --git a/share/test/data/weather-zstd.avro
b/share/test/data/weather-zstd.avro
new file mode 100644
index 0000000000..6a66a34a23
Binary files /dev/null and b/share/test/data/weather-zstd.avro differ