This is an automated email from the ASF dual-hosted git repository.

thiru pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/main by this push:
     new f59db4906 AVRO-4111: [C++] Replace boost::iostreams with zlib library 
(#3290)
f59db4906 is described below

commit f59db4906b40f7bee4e4a7a80824e43c55cecabb
Author: Gang Wu <[email protected]>
AuthorDate: Fri Jan 17 01:01:12 2025 +0800

    AVRO-4111: [C++] Replace boost::iostreams with zlib library (#3290)
    
    * AVRO-4111: [C++] Replace boost::iostreams with zlib library
    
    * declare buf as uint8_t
    
    * fix lint
    
    * remove unused cmake variables
---
 .github/workflows/test-lang-c++-ARM.yml |   2 +-
 .github/workflows/test-lang-c++.yml     |   2 +-
 lang/c++/CMakeLists.txt                 |  17 +++-
 lang/c++/impl/DataFile.cc               | 160 ++++++++++++++++++++------------
 lang/c++/include/avro/DataFile.hh       |   3 -
 5 files changed, 114 insertions(+), 70 deletions(-)

diff --git a/.github/workflows/test-lang-c++-ARM.yml 
b/.github/workflows/test-lang-c++-ARM.yml
index f101eaeb2..759065b08 100644
--- a/.github/workflows/test-lang-c++-ARM.yml
+++ b/.github/workflows/test-lang-c++-ARM.yml
@@ -44,7 +44,7 @@ jobs:
       - name: Install dependencies
         run: |
           sudo apt-get update -q
-          sudo apt-get install -q -y gcc g++ libboost-all-dev libfmt-dev cmake
+          sudo apt-get install -q -y gcc g++ libboost-all-dev libfmt-dev 
zlib1g-dev cmake
 
       - name: Build
         run: |
diff --git a/.github/workflows/test-lang-c++.yml 
b/.github/workflows/test-lang-c++.yml
index 61afa7ff6..c0c66ceec 100644
--- a/.github/workflows/test-lang-c++.yml
+++ b/.github/workflows/test-lang-c++.yml
@@ -39,7 +39,7 @@ jobs:
       - uses: actions/checkout@v4
 
       - name: Install Dependencies
-        run: sudo apt update && sudo apt-get install -qqy cppcheck 
libboost-all-dev libsnappy-dev libfmt-dev cmake
+        run: sudo apt update && sudo apt-get install -qqy cppcheck 
libboost-all-dev libsnappy-dev libfmt-dev zlib1g-dev cmake
 
       - name: Print Versions
         run: |
diff --git a/lang/c++/CMakeLists.txt b/lang/c++/CMakeLists.txt
index e6f70bffd..1fc19f90a 100644
--- a/lang/c++/CMakeLists.txt
+++ b/lang/c++/CMakeLists.txt
@@ -110,6 +110,13 @@ else (SNAPPY_FOUND)
     message("Disabled snappy codec. libsnappy not found.")
 endif (SNAPPY_FOUND)
 
+find_package(ZLIB REQUIRED)
+if (ZLIB_FOUND)
+    message("Enabled zlib codec")
+else (ZLIB_FOUND)
+    message(FATAL_ERROR "ZLIB is not found")
+endif (ZLIB_FOUND)
+
 add_definitions (${Boost_LIB_DIAGNOSTIC_DEFINITIONS})
 
 add_definitions 
(-DAVRO_VERSION="${AVRO_VERSION_MAJOR}.${AVRO_VERSION_MINOR}.${AVRO_VERSION_PATCH}")
@@ -140,8 +147,8 @@ set_property (TARGET avrocpp
     APPEND PROPERTY COMPILE_DEFINITIONS AVRO_DYN_LINK)
 
 add_library (avrocpp_s STATIC ${AVRO_SOURCE_FILES})
-target_include_directories(avrocpp_s PRIVATE ${SNAPPY_INCLUDE_DIR})
-target_link_libraries(avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES} 
fmt::fmt-header-only)
+target_include_directories(avrocpp_s PRIVATE ${SNAPPY_INCLUDE_DIR} 
${ZLIB_INCLUDE_DIR})
+target_link_libraries(avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES} 
${ZLIB_LIBRARIES} fmt::fmt-header-only)
 
 set_property (TARGET avrocpp avrocpp_s
     APPEND PROPERTY COMPILE_DEFINITIONS AVRO_SOURCE)
@@ -152,8 +159,8 @@ set_target_properties (avrocpp PROPERTIES
 set_target_properties (avrocpp_s PROPERTIES
     VERSION ${AVRO_VERSION_MAJOR}.${AVRO_VERSION_MINOR}.${AVRO_VERSION_PATCH})
 
-target_link_libraries (avrocpp ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES} 
fmt::fmt-header-only)
-target_include_directories(avrocpp PRIVATE ${SNAPPY_INCLUDE_DIR})
+target_link_libraries (avrocpp ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES} 
${ZLIB_LIBRARIES} fmt::fmt-header-only)
+target_include_directories(avrocpp PRIVATE ${SNAPPY_INCLUDE_DIR} 
${ZLIB_INCLUDE_DIR})
 
 target_include_directories(avrocpp PUBLIC
   $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
@@ -209,7 +216,7 @@ if (AVRO_BUILD_TESTS)
 
     macro (unittest name)
         add_executable (${name} test/${name}.cc)
-        target_link_libraries (${name} avrocpp_s ${Boost_LIBRARIES} 
${SNAPPY_LIBRARIES})
+        target_link_libraries (${name} avrocpp_s ${Boost_LIBRARIES} 
${SNAPPY_LIBRARIES} ${ZLIB_LIBRARIES})
         add_test (NAME ${name} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
             COMMAND ${CMAKE_CURRENT_BINARY_DIR}/${name})
     endmacro (unittest)
diff --git a/lang/c++/impl/DataFile.cc b/lang/c++/impl/DataFile.cc
index 63ea7df20..fb0eaa2ac 100644
--- a/lang/c++/impl/DataFile.cc
+++ b/lang/c++/impl/DataFile.cc
@@ -23,15 +23,12 @@
 #include <random>
 #include <sstream>
 
-#include <boost/crc.hpp> // for boost::crc_32_type
-#include <boost/iostreams/device/file.hpp>
-#include <boost/iostreams/filter/gzip.hpp>
-#include <boost/iostreams/filter/zlib.hpp>
-
 #ifdef SNAPPY_CODEC_AVAILABLE
 #include <snappy.h>
 #endif
 
+#include <zlib.h>
+
 namespace avro {
 using std::copy;
 using std::istringstream;
@@ -55,12 +52,8 @@ const string AVRO_SNAPPY_CODEC = "snappy";
 const size_t minSyncInterval = 32;
 const size_t maxSyncInterval = 1u << 30;
 
-boost::iostreams::zlib_params get_zlib_params() {
-    boost::iostreams::zlib_params ret;
-    ret.method = boost::iostreams::zlib::deflated;
-    ret.noheader = true;
-    return ret;
-}
+// Recommended by https://www.zlib.net/zlib_how.html
+const size_t zlibBufGrowSize = 128 * 1024;
 
 } // namespace
 
@@ -144,21 +137,45 @@ void DataFileWriterBase::sync() {
         std::unique_ptr<InputStream> in = memoryInputStream(*buffer_);
         copy(*in, *stream_);
     } else if (codec_ == DEFLATE_CODEC) {
-        std::vector<char> buf;
+        std::vector<uint8_t> buf;
         {
-            boost::iostreams::filtering_ostream os;
-            os.push(boost::iostreams::zlib_compressor(get_zlib_params()));
-            os.push(boost::iostreams::back_inserter(buf));
-            const uint8_t *data;
-            size_t len;
+            z_stream zs;
+            zs.zalloc = Z_NULL;
+            zs.zfree = Z_NULL;
+            zs.opaque = Z_NULL;
+
+            int ret = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 
-15, 8, Z_DEFAULT_STRATEGY);
+            if (ret != Z_OK) {
+                throw Exception("Failed to initialize deflate, error: {}", 
ret);
+            }
 
             std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
-            while (input->next(&data, &len)) {
-                boost::iostreams::write(os, reinterpret_cast<const char 
*>(data), len);
+            const uint8_t *data;
+            size_t len;
+            while (ret != Z_STREAM_END && input->next(&data, &len)) {
+                zs.avail_in = static_cast<uInt>(len);
+                zs.next_in = const_cast<Bytef *>(data);
+                bool flush = (zs.total_in + len) >= buffer_->byteCount();
+                do {
+                    if (zs.total_out == buf.size()) {
+                        buf.resize(buf.size() + zlibBufGrowSize);
+                    }
+                    zs.avail_out = static_cast<uInt>(buf.size() - 
zs.total_out);
+                    zs.next_out = buf.data() + zs.total_out;
+                    ret = deflate(&zs, flush ? Z_FINISH : Z_NO_FLUSH);
+                    if (ret == Z_STREAM_END) {
+                        break;
+                    }
+                    if (ret != Z_OK) {
+                        throw Exception("Failed to deflate, error: {}", ret);
+                    }
+                } while (zs.avail_out == 0);
             }
+
+            buf.resize(zs.total_out);
+            (void) deflateEnd(&zs);
         } // make sure all is flushed
-        std::unique_ptr<InputStream> in = memoryInputStream(
-            reinterpret_cast<const uint8_t *>(buf.data()), buf.size());
+        std::unique_ptr<InputStream> in = memoryInputStream(buf.data(), 
buf.size());
         int64_t byteCount = buf.size();
         avro::encode(*encoderPtr_, byteCount);
         encoderPtr_->flush();
@@ -167,35 +184,28 @@ void DataFileWriterBase::sync() {
     } else if (codec_ == SNAPPY_CODEC) {
         std::vector<char> temp;
         std::string compressed;
-        boost::crc_32_type crc;
-        {
-            boost::iostreams::filtering_ostream os;
-            os.push(boost::iostreams::back_inserter(temp));
-            const uint8_t *data;
-            size_t len;
 
-            std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
-            while (input->next(&data, &len)) {
-                boost::iostreams::write(os, reinterpret_cast<const char 
*>(data),
-                                        len);
-            }
-        } // make sure all is flushed
+        const uint8_t *data;
+        size_t len;
+        std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
+        while (input->next(&data, &len)) {
+            temp.insert(temp.end(), reinterpret_cast<const char *>(data),
+                        reinterpret_cast<const char *>(data) + len);
+        }
 
-        crc.process_bytes(reinterpret_cast<const char *>(temp.data()),
-                          temp.size());
         // For Snappy, add the CRC32 checksum
-        auto checksum = crc();
+        auto checksum = crc32(0, reinterpret_cast<const Bytef *>(temp.data()),
+                              static_cast<uInt>(temp.size()));
 
         // Now compress
         size_t compressed_size = snappy::Compress(
             reinterpret_cast<const char *>(temp.data()), temp.size(),
             &compressed);
+
         temp.clear();
-        {
-            boost::iostreams::filtering_ostream os;
-            os.push(boost::iostreams::back_inserter(temp));
-            boost::iostreams::write(os, compressed.c_str(), compressed_size);
-        }
+        temp.insert(temp.end(), compressed.c_str(),
+                    compressed.c_str() + compressed_size);
+
         temp.push_back(static_cast<char>((checksum >> 24) & 0xFF));
         temp.push_back(static_cast<char>((checksum >> 16) & 0xFF));
         temp.push_back(static_cast<char>((checksum >> 8) & 0xFF));
@@ -285,8 +295,7 @@ void DataFileReaderBase::init(const ValidSchema 
&readerSchema) {
 static void drain(InputStream &in) {
     const uint8_t *p = nullptr;
     size_t n = 0;
-    while (in.next(&p, &n))
-        ;
+    while (in.next(&p, &n));
 }
 
 char hex(unsigned int x) {
@@ -384,7 +393,6 @@ void DataFileReaderBase::readDataBlock() {
         dataStream_ = std::move(st);
 #ifdef SNAPPY_CODEC_AVAILABLE
     } else if (codec_ == SNAPPY_CODEC) {
-        boost::crc_32_type crc;
         uint32_t checksum = 0;
         compressed_.clear();
         uncompressed.clear();
@@ -408,35 +416,67 @@ void DataFileReaderBase::readDataBlock() {
             throw Exception(
                 "Snappy Compression reported an error when decompressing");
         }
-        crc.process_bytes(uncompressed.c_str(), uncompressed.size());
-        auto c = crc();
+        auto c = crc32(0, reinterpret_cast<const Bytef 
*>(uncompressed.c_str()),
+                       static_cast<uInt>(uncompressed.size()));
         if (checksum != c) {
             throw Exception(
                 "Checksum did not match for Snappy compression: Expected: {}, 
computed: {}",
                 checksum, c);
         }
-        os_.reset(new boost::iostreams::filtering_istream());
-        os_->push(
-            boost::iostreams::basic_array_source<char>(uncompressed.c_str(),
-                                                       uncompressed.size()));
-        std::unique_ptr<InputStream> in = istreamInputStream(*os_);
+
+        std::unique_ptr<InputStream> in = memoryInputStream(
+            reinterpret_cast<const uint8_t *>(uncompressed.c_str()),
+            uncompressed.size());
 
         dataDecoder_->init(*in);
         dataStream_ = std::move(in);
 #endif
     } else {
         compressed_.clear();
-        const uint8_t *data;
-        size_t len;
-        while (st->next(&data, &len)) {
-            compressed_.insert(compressed_.end(), data, data + len);
+        uncompressed.clear();
+
+        {
+            z_stream zs;
+            zs.zalloc = Z_NULL;
+            zs.zfree = Z_NULL;
+            zs.opaque = Z_NULL;
+            zs.avail_in = 0;
+            zs.next_in = Z_NULL;
+
+            int ret = inflateInit2(&zs, /*windowBits=*/-15);
+            if (ret != Z_OK) {
+                throw Exception("Failed to initialize inflate, error: {}", 
ret);
+            }
+
+            const uint8_t *data;
+            size_t len;
+            while (ret != Z_STREAM_END && st->next(&data, &len)) {
+                zs.avail_in = static_cast<uInt>(len);
+                zs.next_in = const_cast<Bytef *>(data);
+                do {
+                    if (zs.total_out == uncompressed.size()) {
+                        uncompressed.resize(uncompressed.size() + 
zlibBufGrowSize);
+                    }
+                    zs.avail_out = static_cast<uInt>(uncompressed.size() - 
zs.total_out);
+                    zs.next_out = reinterpret_cast<Bytef 
*>(uncompressed.data() + zs.total_out);
+                    ret = inflate(&zs, Z_NO_FLUSH);
+                    if (ret == Z_STREAM_END) {
+                        break;
+                    }
+                    if (ret != Z_OK) {
+                        throw Exception("Failed to inflate, error: {}", ret);
+                    }
+                } while (zs.avail_out == 0);
+            }
+
+            uncompressed.resize(zs.total_out);
+            (void) inflateEnd(&zs);
         }
-        os_.reset(new boost::iostreams::filtering_istream());
-        os_->push(boost::iostreams::zlib_decompressor(get_zlib_params()));
-        os_->push(boost::iostreams::basic_array_source<char>(
-            compressed_.data(), compressed_.size()));
 
-        std::unique_ptr<InputStream> in = nonSeekableIstreamInputStream(*os_);
+        std::unique_ptr<InputStream> in = memoryInputStream(
+            reinterpret_cast<const uint8_t *>(uncompressed.c_str()),
+            uncompressed.size());
+
         dataDecoder_->init(*in);
         dataStream_ = std::move(in);
     }
diff --git a/lang/c++/include/avro/DataFile.hh 
b/lang/c++/include/avro/DataFile.hh
index dcfddf774..6b2cdab5c 100644
--- a/lang/c++/include/avro/DataFile.hh
+++ b/lang/c++/include/avro/DataFile.hh
@@ -31,8 +31,6 @@
 #include <string>
 #include <vector>
 
-#include <boost/iostreams/filtering_stream.hpp>
-
 namespace avro {
 
 /** Specify type of compression to use when writing data files. */
@@ -216,7 +214,6 @@ class AVRO_DECL DataFileReaderBase {
     DataFileSync sync_{};
 
     // for compressed buffer
-    std::unique_ptr<boost::iostreams::filtering_istream> os_;
     std::vector<char> compressed_;
     std::string uncompressed;
     void readHeader();

Reply via email to