This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 37d1180cca [feature-wip](parquet-reader)decode parquet data (#11536)
37d1180cca is described below

commit 37d1180cca26494f65ec6074ab68a4350cf083e6
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Mon Aug 8 12:44:06 2022 +0800

    [feature-wip](parquet-reader)decode parquet data (#11536)
---
 be/src/common/config.h                             |   1 +
 be/src/io/buffered_reader.cpp                      |  19 ++-
 be/src/io/buffered_reader.h                        |  22 ++--
 be/src/util/block_compression.cpp                  |  30 +++++
 be/src/util/block_compression.h                    |   4 +
 be/src/util/rle_encoding.h                         |  56 +++++++++
 be/src/vec/CMakeLists.txt                          |   4 +-
 be/src/vec/exec/format/parquet/level_decoder.cpp   |  76 ++++++++++++
 be/src/vec/exec/format/parquet/level_decoder.h     |  61 ++++++++++
 be/src/vec/exec/format/parquet/parquet_common.cpp  |  60 ++++++++++
 be/src/vec/exec/format/parquet/parquet_common.h    | 108 +++++++++++++++++
 be/src/vec/exec/format/parquet/schema_desc.cpp     |  11 +-
 .../parquet/vparquet_column_chunk_reader.cpp       | 130 +++++++++++++++++++--
 .../format/parquet/vparquet_column_chunk_reader.h  | 111 +++++++++++++++++-
 .../exec/format/parquet/vparquet_page_reader.cpp   |  30 +++--
 .../vec/exec/format/parquet/vparquet_page_reader.h |  16 ++-
 .../test_data/parquet_scanner/type-decoder.parquet | Bin 0 -> 338 bytes
 be/test/vec/exec/parquet/parquet_thrift_test.cpp   |  55 +++++++++
 18 files changed, 734 insertions(+), 60 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 258000e540..59e56bcde3 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -792,6 +792,7 @@ CONF_Int32(object_pool_buffer_size, "100");
 // ParquetReaderWrap prefetch buffer size
 CONF_Int32(parquet_reader_max_buffer_size, "50");
 CONF_Bool(parquet_predicate_push_down, "true");
+CONF_Int32(parquet_header_max_size, "8388608");
 
 // When the rows number reached this limit, will check the filter rate the of 
bloomfilter
 // if it is lower than a specific threshold, the predicate will be disabled.
diff --git a/be/src/io/buffered_reader.cpp b/be/src/io/buffered_reader.cpp
index 8e2446b9e1..ca40979321 100644
--- a/be/src/io/buffered_reader.cpp
+++ b/be/src/io/buffered_reader.cpp
@@ -185,10 +185,11 @@ bool BufferedReader::closed() {
     return _reader->closed();
 }
 
-BufferedFileStreamReader::BufferedFileStreamReader(FileReader* file, int64_t 
offset, int64_t length)
+BufferedFileStreamReader::BufferedFileStreamReader(FileReader* file, uint64_t 
offset,
+                                                   uint64_t length)
         : _file(file), _file_start_offset(offset), _file_end_offset(offset + 
length) {}
 
-Status BufferedFileStreamReader::seek(int64_t position) {
+Status BufferedFileStreamReader::seek(uint64_t position) {
     if (_file_position != position) {
         RETURN_IF_ERROR(_file->seek(position));
         _file_position = position;
@@ -196,8 +197,8 @@ Status BufferedFileStreamReader::seek(int64_t position) {
     return Status::OK();
 }
 
-Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, int64_t 
offset,
-                                            int64_t* bytes_to_read) {
+Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t 
offset,
+                                            size_t* bytes_to_read) {
     if (offset < _file_start_offset) {
         return Status::IOError("Out-of-bounds Access");
     }
@@ -230,19 +231,15 @@ Status BufferedFileStreamReader::read_bytes(const 
uint8_t** buf, int64_t offset,
     RETURN_IF_ERROR(seek(_buf_end_offset));
     bool eof = false;
     int64_t buf_remaining = _buf_end_offset - _buf_start_offset;
-    RETURN_IF_ERROR(
-            _file->read(_buf.get() + buf_remaining, _buf_size - buf_remaining, 
&to_read, &eof));
+    RETURN_IF_ERROR(_file->read(_buf.get() + buf_remaining, to_read, &to_read, 
&eof));
     *bytes_to_read = buf_remaining + to_read;
     _buf_end_offset += to_read;
     *buf = _buf.get();
     return Status::OK();
 }
 
-Status BufferedFileStreamReader::read_bytes(Slice& slice, int64_t offset) {
-    int64_t bytes_to_read = slice.size;
-    Status st = read_bytes((const uint8_t**)&slice.data, offset, 
&bytes_to_read);
-    slice.size = bytes_to_read;
-    return st;
+Status BufferedFileStreamReader::read_bytes(Slice& slice, uint64_t offset) {
+    return read_bytes((const uint8_t**)&slice.data, offset, &slice.size);
 }
 
 } // namespace doris
diff --git a/be/src/io/buffered_reader.h b/be/src/io/buffered_reader.h
index d4a5f37927..2cfcaaa413 100644
--- a/be/src/io/buffered_reader.h
+++ b/be/src/io/buffered_reader.h
@@ -93,34 +93,34 @@ public:
      * @param offset start offset ot read in stream
      * @param bytes_to_read bytes to read
      */
-    virtual Status read_bytes(const uint8_t** buf, int64_t offset, int64_t* 
bytes_to_read) = 0;
+    virtual Status read_bytes(const uint8_t** buf, uint64_t offset, size_t* 
bytes_to_read) = 0;
     /**
      * Save the data address to slice.data, and the slice.size is the bytes to 
read.
      */
-    virtual Status read_bytes(Slice& slice, int64_t offset) = 0;
+    virtual Status read_bytes(Slice& slice, uint64_t offset) = 0;
     virtual ~BufferedStreamReader() = default;
 };
 
 class BufferedFileStreamReader : public BufferedStreamReader {
 public:
-    BufferedFileStreamReader(FileReader* file, int64_t offset, int64_t length);
+    BufferedFileStreamReader(FileReader* file, uint64_t offset, uint64_t 
length);
     ~BufferedFileStreamReader() override = default;
 
-    Status read_bytes(const uint8_t** buf, int64_t stream_offset, int64_t* 
bytes_to_read) override;
-    Status read_bytes(Slice& slice, int64_t stream_offset) override;
+    Status read_bytes(const uint8_t** buf, uint64_t offset, size_t* 
bytes_to_read) override;
+    Status read_bytes(Slice& slice, uint64_t offset) override;
 
 private:
     std::unique_ptr<uint8_t[]> _buf;
     FileReader* _file;
-    int64_t _file_start_offset;
-    int64_t _file_end_offset;
+    uint64_t _file_start_offset;
+    uint64_t _file_end_offset;
 
     int64_t _file_position = -1;
-    int64_t _buf_start_offset = 0;
-    int64_t _buf_end_offset = 0;
-    int64_t _buf_size = 0;
+    uint64_t _buf_start_offset = 0;
+    uint64_t _buf_end_offset = 0;
+    size_t _buf_size = 0;
 
-    Status seek(int64_t position);
+    Status seek(uint64_t position);
 };
 
 } // namespace doris
diff --git a/be/src/util/block_compression.cpp 
b/be/src/util/block_compression.cpp
index 4279080b21..1dafd20f54 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -550,4 +550,34 @@ Status 
get_block_compression_codec(segment_v2::CompressionTypePB type,
     return st;
 }
 
+Status get_block_compression_codec(tparquet::CompressionCodec::type 
parquet_codec,
+                                   std::unique_ptr<BlockCompressionCodec>& 
codec) {
+    BlockCompressionCodec* ptr = nullptr;
+    switch (parquet_codec) {
+    case tparquet::CompressionCodec::UNCOMPRESSED:
+        codec.reset(nullptr);
+        return Status::OK();
+    case tparquet::CompressionCodec::SNAPPY:
+        ptr = new SnappyBlockCompression();
+        break;
+    case tparquet::CompressionCodec::LZ4:
+        ptr = new Lz4BlockCompression();
+        break;
+    case tparquet::CompressionCodec::ZSTD:
+        ptr = new ZstdBlockCompression();
+        break;
+    default:
+        return Status::NotFound("unknown compression type({})", parquet_codec);
+    }
+
+    Status st = ptr->init();
+    if (st.ok()) {
+        codec.reset(ptr);
+    } else {
+        delete ptr;
+    }
+
+    return st;
+}
+
 } // namespace doris
diff --git a/be/src/util/block_compression.h b/be/src/util/block_compression.h
index ddda23a3ba..9de8eb16ff 100644
--- a/be/src/util/block_compression.h
+++ b/be/src/util/block_compression.h
@@ -21,6 +21,7 @@
 #include <vector>
 
 #include "common/status.h"
+#include "gen_cpp/parquet_types.h"
 #include "gen_cpp/segment_v2.pb.h"
 #include "util/slice.h"
 
@@ -70,4 +71,7 @@ public:
 Status get_block_compression_codec(segment_v2::CompressionTypePB type,
                                    std::unique_ptr<BlockCompressionCodec>& 
codec);
 
+Status get_block_compression_codec(tparquet::CompressionCodec::type 
parquet_codec,
+                                   std::unique_ptr<BlockCompressionCodec>& 
codec);
+
 } // namespace doris
diff --git a/be/src/util/rle_encoding.h b/be/src/util/rle_encoding.h
index 3340669b78..08a7a23a4d 100644
--- a/be/src/util/rle_encoding.h
+++ b/be/src/util/rle_encoding.h
@@ -110,6 +110,14 @@ public:
     // GetNextRun will return more from the same run.
     size_t GetNextRun(T* val, size_t max_run);
 
+    size_t get_values(T* values, size_t num_values);
+
+    // Get the count of current repeated value
+    size_t repeated_count();
+
+    // Get current repeated value, make sure that count equals repeated_count()
+    T get_repeated_value(size_t count);
+
 private:
     bool ReadHeader();
 
@@ -334,6 +342,54 @@ inline size_t RleDecoder<T>::GetNextRun(T* val, size_t 
max_run) {
     return ret;
 }
 
+template <typename T>
+inline size_t RleDecoder<T>::get_values(T* values, size_t num_values) {
+    size_t read_num = 0;
+    while (read_num < num_values) {
+        size_t read_this_time = num_values - read_num;
+
+        if (LIKELY(repeat_count_ > 0)) {
+            read_this_time = std::min((size_t)repeat_count_, read_this_time);
+            std::fill(values, values + read_this_time, current_value_);
+            values += read_this_time;
+            repeat_count_ -= read_this_time;
+            read_num += read_this_time;
+        } else if (literal_count_ > 0) {
+            read_this_time = std::min((size_t)literal_count_, read_this_time);
+            for (int i = 0; i < read_this_time; ++i) {
+                bool result = bit_reader_.GetValue(bit_width_, values);
+                DCHECK(result);
+                values++;
+            }
+            literal_count_ -= read_this_time;
+            read_num += read_this_time;
+        } else {
+            if (!ReadHeader()) {
+                return read_num;
+            }
+        }
+    }
+    return read_num;
+}
+
+template <typename T>
+inline size_t RleDecoder<T>::repeated_count() {
+    if (repeat_count_ > 0) {
+        return repeat_count_;
+    }
+    if (literal_count_ == 0) {
+        ReadHeader();
+    }
+    return repeat_count_;
+}
+
+template <typename T>
+inline T RleDecoder<T>::get_repeated_value(size_t count) {
+    DCHECK_GE(repeat_count_, count);
+    repeat_count_ -= count;
+    return current_value_;
+}
+
 template <typename T>
 inline size_t RleDecoder<T>::Skip(size_t to_skip) {
     DCHECK(bit_reader_.is_initialized());
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 56fdb70a4b..bc37c49310 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -229,7 +229,9 @@ set(VEC_FILES
   exec/format/parquet/vparquet_file_metadata.cpp
   exec/format/parquet/vparquet_page_reader.cpp
   exec/format/parquet/schema_desc.cpp
-  exec/format/parquet/vparquet_column_reader.cpp)
+  exec/format/parquet/vparquet_column_reader.cpp
+  exec/format/parquet/level_decoder.cpp
+  exec/format/parquet/parquet_common.cpp)
 
 add_library(Vec STATIC
     ${VEC_FILES}
diff --git a/be/src/vec/exec/format/parquet/level_decoder.cpp 
b/be/src/vec/exec/format/parquet/level_decoder.cpp
new file mode 100644
index 0000000000..616da10b8e
--- /dev/null
+++ b/be/src/vec/exec/format/parquet/level_decoder.cpp
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "level_decoder.h"
+
+#include "util/bit_util.h"
+#include "util/coding.h"
+
+doris::Status doris::vectorized::LevelDecoder::init(doris::Slice* slice,
+                                                    tparquet::Encoding::type 
encoding,
+                                                    doris::vectorized::level_t 
max_level,
+                                                    uint32_t num_levels) {
+    _encoding = encoding;
+    _bit_width = BitUtil::log2(max_level + 1);
+    _max_level = max_level;
+    _num_levels = num_levels;
+    switch (encoding) {
+    case tparquet::Encoding::RLE: {
+        if (slice->size < 4) {
+            return Status::Corruption("Wrong parquet level format");
+        }
+
+        uint8_t* data = (uint8_t*)slice->data;
+        uint32_t num_bytes = decode_fixed32_le(data);
+        if (num_bytes > slice->size - 4) {
+            return Status::Corruption("Wrong parquet level format");
+        }
+        _rle_decoder = RleDecoder<level_t>(data + 4, num_bytes, _bit_width);
+
+        slice->data += 4 + num_bytes;
+        slice->size -= 4 + num_bytes;
+        break;
+    }
+    case tparquet::Encoding::BIT_PACKED: {
+        uint32_t num_bits = num_levels * _bit_width;
+        uint32_t num_bytes = BitUtil::RoundUpNumBytes(num_bits);
+        if (num_bytes > slice->size) {
+            return Status::Corruption("Wrong parquet level format");
+        }
+        _bit_packed_decoder = BitReader((uint8_t*)slice->data, num_bytes);
+
+        slice->data += num_bytes;
+        slice->size -= num_bytes;
+        break;
+    }
+    default:
+        return Status::IOError("Unsupported encoding for parquet level");
+    }
+    return Status::OK();
+}
+
+size_t doris::vectorized::LevelDecoder::get_levels(doris::vectorized::level_t* 
levels, size_t n) {
+    if (_encoding == tparquet::Encoding::RLE) {
+        n = std::min((size_t)_num_levels, n);
+        auto num_decoded = _rle_decoder.get_values(levels, n);
+        _num_levels -= num_decoded;
+        return num_decoded;
+    } else if (_encoding == tparquet::Encoding::BIT_PACKED) {
+        // TODO(gaoxin): BIT_PACKED encoding
+    }
+    return 0;
+}
diff --git a/be/src/vec/exec/format/parquet/level_decoder.h 
b/be/src/vec/exec/format/parquet/level_decoder.h
new file mode 100644
index 0000000000..be6c6c6154
--- /dev/null
+++ b/be/src/vec/exec/format/parquet/level_decoder.h
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "common/status.h"
+#include "gen_cpp/parquet_types.h"
+#include "parquet_common.h"
+#include "util/bit_stream_utils.h"
+#include "util/rle_encoding.h"
+
+namespace doris::vectorized {
+
+class LevelDecoder {
+public:
+    LevelDecoder() = default;
+    ~LevelDecoder() = default;
+
+    Status init(Slice* slice, tparquet::Encoding::type encoding, level_t 
max_level,
+                uint32_t num_levels);
+
+    bool has_levels() const { return _num_levels > 0; }
+
+    size_t get_levels(level_t* levels, size_t n);
+
+    size_t next_repeated_count() {
+        DCHECK_EQ(_encoding, tparquet::Encoding::RLE);
+        return _rle_decoder.repeated_count();
+    }
+
+    level_t get_repeated_value(size_t count) {
+        DCHECK_EQ(_encoding, tparquet::Encoding::RLE);
+        return _rle_decoder.get_repeated_value(count);
+    }
+
+private:
+    tparquet::Encoding::type _encoding;
+    level_t _bit_width = 0;
+    level_t _max_level = 0;
+    uint32_t _num_levels = 0;
+    RleDecoder<level_t> _rle_decoder;
+    BitReader _bit_packed_decoder;
+};
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp 
b/be/src/vec/exec/format/parquet/parquet_common.cpp
new file mode 100644
index 0000000000..ec0c3ce411
--- /dev/null
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet_common.h"
+
+namespace doris::vectorized {
+
+Status Decoder::getDecoder(tparquet::Type::type type, tparquet::Encoding::type 
encoding,
+                           std::unique_ptr<Decoder>& decoder) {
+    switch (encoding) {
+    case tparquet::Encoding::PLAIN:
+        switch (type) {
+        case tparquet::Type::INT32:
+            decoder.reset(new PlainDecoder<Int32>());
+            break;
+        case tparquet::Type::INT64:
+            decoder.reset(new PlainDecoder<Int64>());
+            break;
+        case tparquet::Type::FLOAT:
+            decoder.reset(new PlainDecoder<Float32>());
+            break;
+        case tparquet::Type::DOUBLE:
+            decoder.reset(new PlainDecoder<Float64>());
+            break;
+        default:
+            return Status::InternalError("Unsupported plain type {} in parquet 
decoder",
+                                         tparquet::to_string(type));
+        }
+    case tparquet::Encoding::RLE_DICTIONARY:
+        break;
+    default:
+        return Status::InternalError("Unsupported encoding {} in parquet 
decoder",
+                                     tparquet::to_string(encoding));
+    }
+    return Status::OK();
+}
+
+MutableColumnPtr Decoder::getMutableColumnPtr(ColumnPtr& doris_column) {
+    // src column always be nullable for simple converting
+    CHECK(doris_column->is_nullable());
+    auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
+            (*std::move(doris_column)).mutate().get());
+    return nullable_column->get_nested_column_ptr();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h 
b/be/src/vec/exec/format/parquet/parquet_common.h
new file mode 100644
index 0000000000..0f0e9f6e3d
--- /dev/null
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "common/status.h"
+#include "gen_cpp/parquet_types.h"
+#include "vec/columns/column_array.h"
+#include "vec/columns/column_nullable.h"
+
+namespace doris::vectorized {
+
+using level_t = int16_t;
+
+class Decoder {
+public:
+    Decoder() = default;
+    virtual ~Decoder() = default;
+
+    static Status getDecoder(tparquet::Type::type type, 
tparquet::Encoding::type encoding,
+                             std::unique_ptr<Decoder>& decoder);
+
+    static MutableColumnPtr getMutableColumnPtr(ColumnPtr& doris_column);
+
+    // The type with fix length
+    void set_type_length(int32_t type_length) { _type_length = type_length; }
+
+    // Set the data to be decoded
+    void set_data(Slice* data) {
+        _data = data;
+        _offset = 0;
+    }
+
+    // Write the decoded values batch to doris's column
+    virtual Status decode_values(ColumnPtr& doris_column, size_t num_values) = 
0;
+
+    virtual Status decode_values(Slice& slice, size_t num_values) = 0;
+
+    virtual Status skip_values(size_t num_values) = 0;
+
+protected:
+    int32_t _type_length;
+    Slice* _data = nullptr;
+    uint32_t _offset = 0;
+};
+
+template <typename T>
+class PlainDecoder final : public Decoder {
+public:
+    PlainDecoder() = default;
+    ~PlainDecoder() override = default;
+
+    Status decode_values(ColumnPtr& doris_column, size_t num_values) override {
+        size_t to_read_bytes = TYPE_LENGTH * num_values;
+        if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
+            return Status::IOError("Out-of-bounds access in parquet data 
decoder");
+        }
+        auto data_column = getMutableColumnPtr(doris_column);
+        auto& column_data = 
static_cast<ColumnVector<T>&>(*data_column).get_data();
+        const auto* raw_data = reinterpret_cast<const T*>(_data->data + 
_offset);
+        column_data.insert(raw_data, raw_data + num_values);
+        _offset += to_read_bytes;
+        return Status::OK();
+    }
+
+    Status decode_values(Slice& slice, size_t num_values) override {
+        size_t to_read_bytes = TYPE_LENGTH * num_values;
+        if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
+            return Status::IOError("Out-of-bounds access in parquet data 
decoder");
+        }
+        if (UNLIKELY(to_read_bytes > slice.size)) {
+            return Status::IOError(
+                    "Slice does not have enough space to write out the 
decoding data");
+        }
+        memcpy(slice.data, _data->data + _offset, to_read_bytes);
+        _offset += to_read_bytes;
+        return Status::OK();
+    }
+
+    Status skip_values(size_t num_values) override {
+        _offset += TYPE_LENGTH * num_values;
+        if (UNLIKELY(_offset > _data->size)) {
+            return Status::IOError("Out-of-bounds access in parquet data 
decoder");
+        }
+        return Status::OK();
+    }
+
+protected:
+    enum { TYPE_LENGTH = sizeof(T) };
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/schema_desc.cpp 
b/be/src/vec/exec/format/parquet/schema_desc.cpp
index 51969220ed..7f6f36f023 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.cpp
+++ b/be/src/vec/exec/format/parquet/schema_desc.cpp
@@ -17,8 +17,6 @@
 
 #include "schema_desc.h"
 
-#include "gutil/strings/substitute.h"
-
 namespace doris::vectorized {
 
 static bool is_group_node(const tparquet::SchemaElement& schema) {
@@ -98,15 +96,14 @@ Status FieldDescriptor::parse_from_thrift(const 
std::vector<tparquet::SchemaElem
     for (int i = 0; i < root_schema.num_children; ++i) {
         RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, 
&_fields[i]));
         if (_name_to_field.find(_fields[i].name) != _name_to_field.end()) {
-            return Status::InvalidArgument(
-                    strings::Substitute("Duplicated field name: {}", 
_fields[i].name));
+            return Status::InvalidArgument("Duplicated field name: {}", 
_fields[i].name);
         }
         _name_to_field.emplace(_fields[i].name, &_fields[i]);
     }
 
     if (_next_schema_pos != t_schemas.size()) {
-        return Status::InvalidArgument(strings::Substitute("Remaining {} 
unparsed schema elements",
-                                                           t_schemas.size() - 
_next_schema_pos));
+        return Status::InvalidArgument("Remaining {} unparsed schema elements",
+                                       t_schemas.size() - _next_schema_pos);
     }
 
     return Status::OK();
@@ -355,7 +352,7 @@ int FieldDescriptor::get_column_index(const std::string& 
column) const {
     return -1;
 }
 
-const FieldSchema* FieldDescriptor::get_column(const string& name) const {
+const FieldSchema* FieldDescriptor::get_column(const std::string& name) const {
     auto it = _name_to_field.find(name);
     if (it != _name_to_field.end()) {
         return it->second;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index 274882179b..11d6bfaf94 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -14,27 +14,135 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+
 #include "vparquet_column_chunk_reader.h"
 
 namespace doris::vectorized {
 
-Status ColumnChunkReader::init() {
-    return Status();
+ColumnChunkReader::ColumnChunkReader(BufferedStreamReader* reader,
+                                     tparquet::ColumnChunk* column_chunk, 
FieldSchema* fieldSchema)
+        : _max_rep_level(fieldSchema->repetition_level),
+          _max_def_level(fieldSchema->definition_level),
+          _stream_reader(reader),
+          _metadata(column_chunk->meta_data) {}
+
+Status ColumnChunkReader::init(size_t type_length) {
+    size_t start_offset = _metadata.__isset.dictionary_page_offset
+                                  ? _metadata.dictionary_page_offset
+                                  : _metadata.data_page_offset;
+    size_t chunk_size = _metadata.total_compressed_size;
+    _page_reader = std::make_unique<PageReader>(_stream_reader, start_offset, 
chunk_size);
+
+    if (_metadata.__isset.dictionary_page_offset) {
+        RETURN_IF_ERROR(_decode_dict_page());
+    }
+    // seek to the first data page
+    _page_reader->seek_to_page(_metadata.data_page_offset);
+
+    // get the block compression codec
+    RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, 
_block_compress_codec));
+    // -1 means unfixed length type
+    _type_length = type_length;
+
+    return Status::OK();
+}
+
+Status ColumnChunkReader::next_page() {
+    RETURN_IF_ERROR(_page_reader->next_page());
+    _num_values = _page_reader->get_page_header()->data_page_header.num_values;
+    return Status::OK();
+}
+
+Status ColumnChunkReader::load_page_data() {
+    const auto& header = *_page_reader->get_page_header();
+    // int32_t compressed_size = header.compressed_page_size;
+    int32_t uncompressed_size = header.uncompressed_page_size;
+
+    if (_block_compress_codec != nullptr) {
+        Slice compressed_data;
+        RETURN_IF_ERROR(_page_reader->get_page_date(compressed_data));
+        // check decompressed buffer size
+        _reserve_decompress_buf(uncompressed_size);
+        _page_data = Slice(_decompress_buf.get(), uncompressed_size);
+        RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, 
&_page_data));
+    } else {
+        RETURN_IF_ERROR(_page_reader->get_page_date(_page_data));
+    }
+
+    // Initialize repetition level and definition level. Skip when level = 0, 
which means required field.
+    if (_max_rep_level > 0) {
+        RETURN_IF_ERROR(_rep_level_decoder.init(&_page_data,
+                                                
header.data_page_header.repetition_level_encoding,
+                                                _max_rep_level, _num_values));
+    }
+    if (_max_def_level > 0) {
+        RETURN_IF_ERROR(_def_level_decoder.init(&_page_data,
+                                                
header.data_page_header.definition_level_encoding,
+                                                _max_def_level, _num_values));
+    }
+
+    auto encoding = header.data_page_header.encoding;
+    // change the deprecated encoding to RLE_DICTIONARY
+    if (encoding == tparquet::Encoding::PLAIN_DICTIONARY) {
+        encoding = tparquet::Encoding::RLE_DICTIONARY;
+    }
+    Decoder::getDecoder(_metadata.type, encoding, _page_decoder);
+    _page_decoder->set_data(&_page_data);
+    if (_type_length > 0) {
+        _page_decoder->set_type_length(_type_length);
+    }
+
+    return Status::OK();
+}
+
+Status ColumnChunkReader::_decode_dict_page() {
+    int64_t dict_offset = _metadata.dictionary_page_offset;
+    _page_reader->seek_to_page(dict_offset);
+    _page_reader->next_page();
+    const tparquet::PageHeader& header = *_page_reader->get_page_header();
+    DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header.type);
+    // TODO(gaoxin): decode dictionary page
+    return Status::OK();
+}
+
+void ColumnChunkReader::_reserve_decompress_buf(size_t size) {
+    if (size > _decompress_buf_size) {
+        _decompress_buf_size = BitUtil::next_power_of_two(size);
+        _decompress_buf.reset(new uint8_t[_decompress_buf_size]);
+    }
+}
+
+Status ColumnChunkReader::skip_values(size_t num_values) {
+    if (UNLIKELY(_num_values < num_values)) {
+        return Status::IOError("Skip too many values in current page");
+    }
+    _num_values -= num_values;
+    return _page_decoder->skip_values(num_values);
 }
 
-Status ColumnChunkReader::read_min_max_stat() {
-    return Status();
+size_t ColumnChunkReader::get_rep_levels(level_t* levels, size_t n) {
+    DCHECK_GT(_max_rep_level, 0);
+    return _def_level_decoder.get_levels(levels, n);
 }
 
-Status ColumnChunkReader::decode_dict_page() {
-    return Status();
+size_t ColumnChunkReader::get_def_levels(level_t* levels, size_t n) {
+    DCHECK_GT(_max_def_level, 0);
+    return _rep_level_decoder.get_levels(levels, n);
 }
 
-Status ColumnChunkReader::decode_nested_page() {
-    return Status();
+Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, size_t 
num_values) {
+    if (UNLIKELY(_num_values < num_values)) {
+        return Status::IOError("Decode too many values in current page");
+    }
+    _num_values -= num_values;
+    return _page_decoder->decode_values(doris_column, num_values);
 }
 
-Status ColumnChunkReader::read_next_page() {
-    return Status();
+Status ColumnChunkReader::decode_values(Slice& slice, size_t num_values) {
+    if (UNLIKELY(_num_values < num_values)) {
+        return Status::IOError("Decode too many values in current page");
+    }
+    _num_values -= num_values;
+    return _page_decoder->decode_values(slice, num_values);
 }
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index dac287096a..c3b58be5c0 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -16,20 +16,119 @@
 // under the License.
 
 #pragma once
-#include <common/status.h>
+
+#include <cstdint>
+#include <memory>
+#include <unordered_map>
+#include <vector>
+
+#include "common/status.h"
+#include "gen_cpp/parquet_types.h"
+#include "io/buffered_reader.h"
+#include "level_decoder.h"
+#include "parquet_common.h"
+#include "schema_desc.h"
+#include "util/block_compression.h"
+#include "vec/columns/column_array.h"
+#include "vec/columns/column_nullable.h"
+#include "vparquet_page_reader.h"
 
 namespace doris::vectorized {
 
+/**
+ * Read and decode parquet column data into doris block column.
+ * <p>Usage:</p>
+ * // Create chunk reader
+ * ColumnChunkReader chunk_reader(BufferedStreamReader* reader,
+ *                                tparquet::ColumnChunk* column_chunk,
+ *                                FieldSchema* fieldSchema);
+ * // Initialize chunk reader, we can set the type length if the length of 
column type is fixed.
+ * // If not set, default value = -1, then the decoder will infer the type 
length.
+ * chunk_reader.init();
+ * while (chunk_reader.has_next_page()) {
+ *   // Seek to next page header.  Only read and parse the page header, not 
page data.
+ *   chunk_reader.next_page();
+ *   // Load data to decoder. Load the page data into underlying container.
+ *   // Or, we can call the chunk_reader.skip_page() to skip current page.
+ *   chunk_reader.load_page_data();
+ *   // Decode values into column or slice.
+ *   // Or, we can call chunk_reader.slip_values(num_values) to skip some 
values.
+ *   chunk_reader.decode_values(slice, num_values);
+ * }
+ */
 class ColumnChunkReader {
 public:
-    Status init();
-    Status read_next_page();
+    ColumnChunkReader(BufferedStreamReader* reader, tparquet::ColumnChunk* 
column_chunk,
+                      FieldSchema* fieldSchema);
+    ~ColumnChunkReader() = default;
+
+    // Initialize chunk reader, will generate the decoder and codec.
+    // We can set the type_length if the length of colum type if fixed,
+    // or not set, the decoder will try to infer the type_length.
+    Status init(size_t type_length = -1);
+
+    // Whether the chunk reader has a more page to read.
+    bool has_next_page() { return _page_reader->has_next_page(); }
+
+    // Seek to the specific page, page_header_offset must be the start offset 
of the page header.
+    void seek_to_page(int64_t page_header_offset) {
+        _page_reader->seek_to_page(page_header_offset);
+    }
+
+    // Seek to next page. Only read and parse the page header.
+    Status next_page();
+
+    // Skip current page(will not read and parse) if the page is filtered by 
predicates.
+    Status skip_page() { return _page_reader->skip_page(); }
+    // Skip some values(will not read and parse) in current page if the values 
are filtered by predicates.
+    Status skip_values(size_t num_values);
 
-    Status read_min_max_stat();
-    Status decode_dict_page();
-    Status decode_nested_page();
+    // Load page data into the underlying container,
+    // and initialize the repetition and definition level decoder for current 
page data.
+    Status load_page_data();
+    // The remaining number of values in current page. Decreased when reading 
or skipping.
+    uint32_t num_values() const { return _num_values; };
+    // Get the raw data of current page.
+    Slice& get_page_data() { return _page_data; }
+
+    // Get the repetition levels
+    size_t get_rep_levels(level_t* levels, size_t n);
+    // Get the definition levels
+    size_t get_def_levels(level_t* levels, size_t n);
+
+    // Decode values in current page into doris column.
+    Status decode_values(ColumnPtr& doris_column, size_t num_values);
+    // For test, Decode values in current page into slice.
+    Status decode_values(Slice& slice, size_t num_values);
+
+    // Get the repetition level decoder of current page.
+    LevelDecoder& rep_level_decoder() { return _rep_level_decoder; }
+    // Get the definition level decoder of current page.
+    LevelDecoder& def_level_decoder() { return _def_level_decoder; }
 
 private:
+    Status _decode_dict_page();
+    void _reserve_decompress_buf(size_t size);
+
+    level_t _max_rep_level;
+    level_t _max_def_level;
+
+    BufferedStreamReader* _stream_reader;
+    // tparquet::ColumnChunk* _column_chunk;
+    tparquet::ColumnMetaData& _metadata;
+    // FieldSchema* _field_schema;
+
+    std::unique_ptr<PageReader> _page_reader = nullptr;
+    std::unique_ptr<BlockCompressionCodec> _block_compress_codec = nullptr;
+
+    LevelDecoder _rep_level_decoder;
+    LevelDecoder _def_level_decoder;
+    uint32_t _num_values = 0;
+    Slice _page_data;
+    std::unique_ptr<uint8_t[]> _decompress_buf;
+    size_t _decompress_buf_size = 0;
+    std::unique_ptr<Decoder> _page_decoder = nullptr;
+    size_t _type_length = -1;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
index 533caa3db0..f554be169e 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
@@ -23,12 +23,12 @@
 
 namespace doris::vectorized {
 
-static constexpr int64_t initPageHeaderSize = 1024;
+static constexpr size_t initPageHeaderSize = 1024;
 
-PageReader::PageReader(BufferedStreamReader* reader, int64_t start_offset, 
int64_t length)
-        : _reader(reader), _start_offset(start_offset), 
_end_offset(start_offset + length) {}
+PageReader::PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t 
length)
+        : _reader(reader), _start_offset(offset), _end_offset(offset + length) 
{}
 
-Status PageReader::next_page(Slice& slice) {
+Status PageReader::next_page() {
     if (_offset < _start_offset || _offset >= _end_offset) {
         return Status::IOError("Out-of-bounds Access");
     }
@@ -37,8 +37,8 @@ Status PageReader::next_page(Slice& slice) {
     }
 
     const uint8_t* page_header_buf = nullptr;
-    int64_t max_size = _end_offset - _offset;
-    int64_t header_size = std::min(initPageHeaderSize, max_size);
+    size_t max_size = _end_offset - _offset;
+    size_t header_size = std::min(initPageHeaderSize, max_size);
     uint32_t real_header_size = 0;
     while (true) {
         header_size = std::min(header_size, max_size);
@@ -49,7 +49,8 @@ Status PageReader::next_page(Slice& slice) {
         if (st.ok()) {
             break;
         }
-        if (_offset + header_size >= _end_offset) {
+        if (_offset + header_size >= _end_offset ||
+            real_header_size > config::parquet_header_max_size) {
             return Status::IOError("Failed to deserialize parquet page 
header");
         }
         header_size <<= 2;
@@ -57,11 +58,26 @@ Status PageReader::next_page(Slice& slice) {
 
     _offset += real_header_size;
     _next_header_offset = _offset + _cur_page_header.compressed_page_size;
+    return Status::OK();
+}
+
+Status PageReader::skip_page() {
+    if (_offset == _next_header_offset) {
+        return Status::InternalError("Should call next_page() to generate page 
header");
+    }
+    _offset = _next_header_offset;
+    return Status::OK();
+}
 
+Status PageReader::get_page_date(Slice& slice) {
+    if (_offset == _next_header_offset) {
+        return Status::InternalError("Should call next_page() to generate page 
header");
+    }
     slice.size = _cur_page_header.compressed_page_size;
     RETURN_IF_ERROR(_reader->read_bytes(slice, _offset));
     DCHECK_EQ(slice.size, _cur_page_header.compressed_page_size);
     _offset += slice.size;
     return Status::OK();
 }
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
index 523098cc44..cf95812ead 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
@@ -29,15 +29,19 @@ namespace doris::vectorized {
 class PageReader {
 public:
 public:
-    PageReader(BufferedStreamReader* reader, int64_t start_offset, int64_t 
length);
+    PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t length);
     ~PageReader() = default;
 
     bool has_next_page() const { return _offset < _end_offset; }
 
-    Status next_page(Slice& slice);
+    Status next_page();
+
+    Status skip_page();
 
     const tparquet::PageHeader* get_page_header() const { return 
&_cur_page_header; }
 
+    Status get_page_date(Slice& slice);
+
     void seek_to_page(int64_t page_header_offset) {
         _offset = page_header_offset;
         _next_header_offset = page_header_offset;
@@ -47,11 +51,11 @@ private:
     BufferedStreamReader* _reader;
     tparquet::PageHeader _cur_page_header;
 
-    int64_t _offset = 0;
-    int64_t _next_header_offset = 0;
+    uint64_t _offset = 0;
+    uint64_t _next_header_offset = 0;
 
-    int64_t _start_offset = 0;
-    int64_t _end_offset = 0;
+    uint64_t _start_offset = 0;
+    uint64_t _end_offset = 0;
 };
 
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/test/exec/test_data/parquet_scanner/type-decoder.parquet 
b/be/test/exec/test_data/parquet_scanner/type-decoder.parquet
new file mode 100644
index 0000000000..e4b5820161
Binary files /dev/null and 
b/be/test/exec/test_data/parquet_scanner/type-decoder.parquet differ
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp 
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index 5100ea32f3..02bcef6261 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -27,6 +27,7 @@
 #include "io/local_file_reader.h"
 #include "util/runtime_profile.h"
 #include "vec/exec/format/parquet/parquet_thrift_util.h"
+#include "vec/exec/format/parquet/vparquet_column_chunk_reader.h"
 #include "vec/exec/format/parquet/vparquet_file_metadata.h"
 
 namespace doris {
@@ -123,6 +124,60 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) {
     ASSERT_EQ(schemaDescriptor.get_column_index("mark"), 4);
 }
 
+TEST_F(ParquetThriftReaderTest, column_reader) {
+    // type-decoder.parquet is the part of following table:
+    // create table type-decoder (
+    //   int_col int)
+    // TODO(gaoxin): add more hive types
+    LocalFileReader 
reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
+    auto st = reader.open();
+    EXPECT_TRUE(st.ok());
+
+    std::shared_ptr<FileMetaData> metaData;
+    parse_thrift_footer(&reader, metaData);
+    tparquet::FileMetaData t_metadata = metaData->to_thrift_metadata();
+
+    // read the `int_col` column, it's the int-type column, and has ten values:
+    // -1, 2, -3, 4, -5, 6, -7, 8, -9, 10
+    tparquet::ColumnChunk column_chunk = t_metadata.row_groups[0].columns[0];
+    tparquet::ColumnMetaData chunk_meta = column_chunk.meta_data;
+    size_t start_offset = chunk_meta.__isset.dictionary_page_offset
+                                  ? chunk_meta.dictionary_page_offset
+                                  : chunk_meta.data_page_offset;
+    size_t chunk_size = chunk_meta.total_compressed_size;
+    BufferedFileStreamReader stream_reader(&reader, start_offset, chunk_size);
+
+    FieldDescriptor schema_descriptor;
+    schema_descriptor.parse_from_thrift(t_metadata.schema);
+    auto field_schema = 
const_cast<FieldSchema*>(schema_descriptor.get_column(0));
+
+    ColumnChunkReader chunk_reader(&stream_reader, &column_chunk, 
field_schema);
+    size_t batch_size = 10;
+    size_t int_length = 4;
+    char data[batch_size * int_length];
+    Slice slice(data, batch_size * int_length);
+    chunk_reader.init();
+    uint64_t int_sum = 0;
+    while (chunk_reader.has_next_page()) {
+        // seek to next page header
+        chunk_reader.next_page();
+        // load data to decoder
+        chunk_reader.load_page_data();
+        while (chunk_reader.num_values() > 0) {
+            size_t num_values = chunk_reader.num_values() < batch_size
+                                        ? chunk_reader.num_values() < 
batch_size
+                                        : batch_size;
+            chunk_reader.decode_values(slice, num_values);
+            auto out_data = reinterpret_cast<Int32*>(slice.data);
+            for (int i = 0; i < num_values; i++) {
+                Int32 value = out_data[i];
+                int_sum += value;
+            }
+        }
+    }
+    ASSERT_EQ(int_sum, 5);
+}
+
 } // namespace vectorized
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to