This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f39f57636b [feature-wip](parquet-reader) update column read model and 
add page index (#11601)
f39f57636b is described below

commit f39f57636bec019c646b5ed53aa80dff65e59360
Author: slothever <18522955+w...@users.noreply.github.com>
AuthorDate: Tue Aug 16 15:04:07 2022 +0800

    [feature-wip](parquet-reader) update column read model and add page index 
(#11601)
---
 be/src/exprs/expr_context.h                        |   6 +-
 be/src/vec/exec/file_hdfs_scanner.cpp              |  55 +++--
 be/src/vec/exec/file_hdfs_scanner.h                |  19 +-
 be/src/vec/exec/file_scan_node.cpp                 |   3 +
 be/src/vec/exec/format/parquet/parquet_pred_cmp.h  |  30 +--
 .../vec/exec/format/parquet/parquet_thrift_util.h  |   5 -
 .../parquet/vparquet_column_chunk_reader.cpp       |  29 ++-
 .../format/parquet/vparquet_column_chunk_reader.h  |   7 +-
 .../exec/format/parquet/vparquet_column_reader.cpp |  85 +++++--
 .../exec/format/parquet/vparquet_column_reader.h   |  64 ++++-
 .../exec/format/parquet/vparquet_file_metadata.cpp |   2 +-
 .../exec/format/parquet/vparquet_file_metadata.h   |   2 +-
 .../exec/format/parquet/vparquet_group_reader.cpp  | 234 +++---------------
 .../exec/format/parquet/vparquet_group_reader.h    |  81 +------
 .../exec/format/parquet/vparquet_page_index.cpp    |  35 +--
 .../vec/exec/format/parquet/vparquet_page_index.h  |  22 +-
 .../exec/format/parquet/vparquet_page_reader.cpp   |   2 +-
 .../vec/exec/format/parquet/vparquet_page_reader.h |   2 +-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp | 263 +++++++++++++++++----
 be/src/vec/exec/format/parquet/vparquet_reader.h   |  64 +++--
 be/test/vec/exec/parquet/parquet_thrift_test.cpp   |   2 +-
 21 files changed, 548 insertions(+), 464 deletions(-)

diff --git a/be/src/exprs/expr_context.h b/be/src/exprs/expr_context.h
index 26d655a2e2..b1df684a9d 100644
--- a/be/src/exprs/expr_context.h
+++ b/be/src/exprs/expr_context.h
@@ -27,7 +27,7 @@
 #include "exprs/expr_value.h"
 #include "exprs/slot_ref.h"
 #include "udf/udf.h"
-#include "vec/exec/format/parquet/vparquet_group_reader.h"
+#include "vec/exec/format/parquet/vparquet_reader.h"
 
 #undef USING_DORIS_UDF
 #define USING_DORIS_UDF using namespace doris_udf
@@ -38,7 +38,7 @@ namespace doris {
 
 namespace vectorized {
 class VOlapScanNode;
-class RowGroupReader;
+class ParquetReader;
 } // namespace vectorized
 
 class Expr;
@@ -166,7 +166,7 @@ private:
     friend class OlapScanNode;
     friend class EsPredicate;
     friend class RowGroupReader;
-    friend class vectorized::RowGroupReader;
+    friend class vectorized::ParquetReader;
     friend class vectorized::VOlapScanNode;
 
     /// FunctionContexts for each registered expression. The FunctionContexts 
are created
diff --git a/be/src/vec/exec/file_hdfs_scanner.cpp 
b/be/src/vec/exec/file_hdfs_scanner.cpp
index 08c5084b6d..ab6401de8b 100644
--- a/be/src/vec/exec/file_hdfs_scanner.cpp
+++ b/be/src/vec/exec/file_hdfs_scanner.cpp
@@ -21,43 +21,62 @@
 
 namespace doris::vectorized {
 
+ParquetFileHdfsScanner::ParquetFileHdfsScanner(RuntimeState* state, 
RuntimeProfile* profile,
+                                               const TFileScanRangeParams& 
params,
+                                               const 
std::vector<TFileRangeDesc>& ranges,
+                                               const std::vector<TExpr>& 
pre_filter_texprs,
+                                               ScannerCounter* counter)
+        : HdfsFileScanner(state, profile, params, ranges, pre_filter_texprs, 
counter) {}
+
 Status ParquetFileHdfsScanner::open() {
+    RETURN_IF_ERROR(FileScanner::open());
+    if (_ranges.empty()) {
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(_get_next_reader(_next_range));
     return Status();
 }
 
+void ParquetFileHdfsScanner::_init_profiles(RuntimeProfile* profile) {}
+
 Status ParquetFileHdfsScanner::get_next(vectorized::Block* block, bool* eof) {
-    // todo: get block from queue
-    auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
-    if (_next_range >= _ranges.size()) {
-        _scanner_eof = true;
+    if (_next_range >= _ranges.size() || _scanner_eof) {
+        *eof = true;
         return Status::OK();
     }
-    const TFileRangeDesc& range = _ranges[_next_range++];
+    RETURN_IF_ERROR(init_block(block));
+    bool range_eof = false;
+    RETURN_IF_ERROR(_reader->read_next_batch(block, &range_eof));
+    if (range_eof) {
+        RETURN_IF_ERROR(_get_next_reader(_next_range++));
+    }
+    return Status::OK();
+}
+
+Status ParquetFileHdfsScanner::_get_next_reader(int _next_range) {
+    const TFileRangeDesc& range = _ranges[_next_range];
+    _current_range_offset = range.start_offset;
     std::unique_ptr<FileReader> file_reader;
     RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), 
_profile, _params, range,
                                                     file_reader));
     _reader.reset(new ParquetReader(file_reader.release(), 
_file_slot_descs.size(),
-                                    range.start_offset, range.size));
+                                    _state->query_options().batch_size, 
range.start_offset,
+                                    range.size));
+    auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
     Status status =
             _reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs, 
_state->timezone());
     if (!status.ok()) {
-        _scanner_eof = true;
-        return Status::OK();
-    }
-    while (_reader->has_next()) {
-        Status st = _reader->read_next_batch(block);
-        if (st.is_end_of_file()) {
-            break;
+        if (status.is_end_of_file()) {
+            _scanner_eof = true;
+            return Status::OK();
         }
+        return status;
     }
     return Status::OK();
 }
 
-void ParquetFileHdfsScanner::close() {}
-
-void ParquetFileHdfsScanner::_prefetch_batch() {
-    // 1. call file reader next batch
-    // 2. push batch to queue, when get_next is called, pop batch
+void ParquetFileHdfsScanner::close() {
+    FileScanner::close();
 }
 
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/file_hdfs_scanner.h 
b/be/src/vec/exec/file_hdfs_scanner.h
index 3db196a83c..e24063c89b 100644
--- a/be/src/vec/exec/file_hdfs_scanner.h
+++ b/be/src/vec/exec/file_hdfs_scanner.h
@@ -24,21 +24,34 @@
 
 namespace doris::vectorized {
 
-class HdfsFileScanner : public FileScanner {};
+class HdfsFileScanner : public FileScanner {
+public:
+    HdfsFileScanner(RuntimeState* state, RuntimeProfile* profile,
+                    const TFileScanRangeParams& params, const 
std::vector<TFileRangeDesc>& ranges,
+                    const std::vector<TExpr>& pre_filter_texprs, 
ScannerCounter* counter)
+            : FileScanner(state, profile, params, ranges, pre_filter_texprs, 
counter) {};
+};
 
 class ParquetFileHdfsScanner : public HdfsFileScanner {
 public:
+    ParquetFileHdfsScanner(RuntimeState* state, RuntimeProfile* profile,
+                           const TFileScanRangeParams& params,
+                           const std::vector<TFileRangeDesc>& ranges,
+                           const std::vector<TExpr>& pre_filter_texprs, 
ScannerCounter* counter);
     Status open() override;
 
     Status get_next(vectorized::Block* block, bool* eof) override;
-
     void close() override;
 
+protected:
+    void _init_profiles(RuntimeProfile* profile) override;
+
 private:
-    void _prefetch_batch();
+    Status _get_next_reader(int _next_range);
 
 private:
     std::shared_ptr<ParquetReader> _reader;
+    int64_t _current_range_offset;
 };
 
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/file_scan_node.cpp 
b/be/src/vec/exec/file_scan_node.cpp
index c653c84a15..ec8a5165ce 100644
--- a/be/src/vec/exec/file_scan_node.cpp
+++ b/be/src/vec/exec/file_scan_node.cpp
@@ -30,6 +30,7 @@
 #include "util/thread.h"
 #include "util/types.h"
 #include "vec/exec/file_arrow_scanner.h"
+#include "vec/exec/file_hdfs_scanner.h"
 #include "vec/exec/file_text_scanner.h"
 #include "vec/exprs/vcompound_pred.h"
 #include "vec/exprs/vexpr.h"
@@ -471,6 +472,8 @@ std::unique_ptr<FileScanner> 
FileScanNode::create_scanner(const TFileScanRange&
     case TFileFormatType::FORMAT_PARQUET:
         scan = new VFileParquetScanner(_runtime_state, runtime_profile(), 
scan_range.params,
                                        scan_range.ranges, _pre_filter_texprs, 
counter);
+        //        scan = new ParquetFileHdfsScanner(_runtime_state, 
runtime_profile(), scan_range.params,
+        //                                       scan_range.ranges, 
_pre_filter_texprs, counter);
         break;
     case TFileFormatType::FORMAT_ORC:
         scan = new VFileORCScanner(_runtime_state, runtime_profile(), 
scan_range.params,
diff --git a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h 
b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
index b07f9afbe9..b58701418e 100644
--- a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
+++ b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
@@ -79,8 +79,8 @@ namespace doris::vectorized {
         return true;                                                       \
     }
 
-bool RowGroupReader::_eval_in_val(PrimitiveType conjunct_type, 
std::vector<void*> in_pred_values,
-                                  const char* min_bytes, const char* 
max_bytes) {
+bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*> 
in_pred_values,
+                  const char* min_bytes, const char* max_bytes) {
     switch (conjunct_type) {
     case TYPE_TINYINT: {
         _FILTER_GROUP_BY_IN(int8_t, in_pred_values, min_bytes, max_bytes)
@@ -125,8 +125,8 @@ bool RowGroupReader::_eval_in_val(PrimitiveType 
conjunct_type, std::vector<void*
     return false;
 }
 
-void RowGroupReader::_eval_in_predicate(ExprContext* ctx, const char* 
min_bytes,
-                                        const char* max_bytes, bool& 
need_filter) {
+void ParquetReader::_eval_in_predicate(ExprContext* ctx, const char* min_bytes,
+                                       const char* max_bytes, bool& 
need_filter) {
     Expr* conjunct = ctx->root();
     std::vector<void*> in_pred_values;
     const InPredicate* pred = static_cast<const InPredicate*>(conjunct);
@@ -150,8 +150,8 @@ void RowGroupReader::_eval_in_predicate(ExprContext* ctx, 
const char* min_bytes,
     }
 }
 
-bool RowGroupReader::_eval_eq(PrimitiveType conjunct_type, void* value, const 
char* min_bytes,
-                              const char* max_bytes) {
+bool _eval_eq(PrimitiveType conjunct_type, void* value, const char* min_bytes,
+              const char* max_bytes) {
     switch (conjunct_type) {
     case TYPE_TINYINT: {
         _PLAIN_DECODE(int16_t, value, min_bytes, max_bytes, conjunct_value, 
min, max)
@@ -200,7 +200,7 @@ bool RowGroupReader::_eval_eq(PrimitiveType conjunct_type, 
void* value, const ch
     return false;
 }
 
-bool RowGroupReader::_eval_gt(PrimitiveType conjunct_type, void* value, const 
char* max_bytes) {
+bool _eval_gt(PrimitiveType conjunct_type, void* value, const char* max_bytes) 
{
     switch (conjunct_type) {
     case TYPE_TINYINT: {
         _PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max)
@@ -250,7 +250,7 @@ bool RowGroupReader::_eval_gt(PrimitiveType conjunct_type, 
void* value, const ch
     return false;
 }
 
-bool RowGroupReader::_eval_ge(PrimitiveType conjunct_type, void* value, const 
char* max_bytes) {
+bool _eval_ge(PrimitiveType conjunct_type, void* value, const char* max_bytes) 
{
     switch (conjunct_type) {
     case TYPE_TINYINT: {
         _PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max)
@@ -300,7 +300,7 @@ bool RowGroupReader::_eval_ge(PrimitiveType conjunct_type, 
void* value, const ch
     return false;
 }
 
-bool RowGroupReader::_eval_lt(PrimitiveType conjunct_type, void* value, const 
char* min_bytes) {
+bool _eval_lt(PrimitiveType conjunct_type, void* value, const char* min_bytes) 
{
     switch (conjunct_type) {
     case TYPE_TINYINT: {
         _PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min)
@@ -350,7 +350,7 @@ bool RowGroupReader::_eval_lt(PrimitiveType conjunct_type, 
void* value, const ch
     return false;
 }
 
-bool RowGroupReader::_eval_le(PrimitiveType conjunct_type, void* value, const 
char* min_bytes) {
+bool _eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes) 
{
     switch (conjunct_type) {
     case TYPE_TINYINT: {
         _PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min)
@@ -400,8 +400,8 @@ bool RowGroupReader::_eval_le(PrimitiveType conjunct_type, 
void* value, const ch
     return false;
 }
 
-void RowGroupReader::_eval_binary_predicate(ExprContext* ctx, const char* 
min_bytes,
-                                            const char* max_bytes, bool& 
need_filter) {
+void ParquetReader::_eval_binary_predicate(ExprContext* ctx, const char* 
min_bytes,
+                                           const char* max_bytes, bool& 
need_filter) {
     Expr* conjunct = ctx->root();
     Expr* expr = conjunct->get_child(1);
     if (expr == nullptr) {
@@ -433,9 +433,9 @@ void RowGroupReader::_eval_binary_predicate(ExprContext* 
ctx, const char* min_by
     }
 }
 
-bool RowGroupReader::_determine_filter_row_group(const 
std::vector<ExprContext*>& conjuncts,
-                                                 const std::string& 
encoded_min,
-                                                 const std::string& 
encoded_max) {
+bool ParquetReader::_determine_filter_min_max(const std::vector<ExprContext*>& 
conjuncts,
+                                              const std::string& encoded_min,
+                                              const std::string& encoded_max) {
     const char* min_bytes = encoded_min.data();
     const char* max_bytes = encoded_max.data();
     bool need_filter = false;
diff --git a/be/src/vec/exec/format/parquet/parquet_thrift_util.h 
b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
index 939500ce97..cb5dc1558b 100644
--- a/be/src/vec/exec/format/parquet/parquet_thrift_util.h
+++ b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
@@ -67,9 +67,4 @@ Status parse_thrift_footer(FileReader* file, 
std::shared_ptr<FileMetaData>& file
     RETURN_IF_ERROR(file_metadata->init_schema());
     return Status::OK();
 }
-
-//    Status parse_page_header() {
-//        uint8_t* page_buf;
-//
-//    }
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index a0a21b00ca..751780fbae 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -32,6 +32,7 @@ Status ColumnChunkReader::init() {
                                   ? _metadata.dictionary_page_offset
                                   : _metadata.data_page_offset;
     size_t chunk_size = _metadata.total_compressed_size;
+    VLOG_DEBUG << "create _page_reader";
     _page_reader = std::make_unique<PageReader>(_stream_reader, start_offset, 
chunk_size);
 
     if (_metadata.__isset.dictionary_page_offset) {
@@ -43,12 +44,13 @@ Status ColumnChunkReader::init() {
     // get the block compression codec
     RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, 
_block_compress_codec));
 
+    VLOG_DEBUG << "initColumnChunkReader finish";
     return Status::OK();
 }
 
 Status ColumnChunkReader::next_page() {
-    RETURN_IF_ERROR(_page_reader->next_page());
-    _num_values = _page_reader->get_page_header()->data_page_header.num_values;
+    RETURN_IF_ERROR(_page_reader->next_page_header());
+    _remaining_num_values = 
_page_reader->get_page_header()->data_page_header.num_values;
     return Status::OK();
 }
 
@@ -72,12 +74,12 @@ Status ColumnChunkReader::load_page_data() {
     if (_max_rep_level > 0) {
         RETURN_IF_ERROR(_rep_level_decoder.init(&_page_data,
                                                 
header.data_page_header.repetition_level_encoding,
-                                                _max_rep_level, _num_values));
+                                                _max_rep_level, 
_remaining_num_values));
     }
     if (_max_def_level > 0) {
         RETURN_IF_ERROR(_def_level_decoder.init(&_page_data,
                                                 
header.data_page_header.definition_level_encoding,
-                                                _max_def_level, _num_values));
+                                                _max_def_level, 
_remaining_num_values));
     }
 
     auto encoding = header.data_page_header.encoding;
@@ -85,6 +87,7 @@ Status ColumnChunkReader::load_page_data() {
     if (encoding == tparquet::Encoding::PLAIN_DICTIONARY) {
         encoding = tparquet::Encoding::RLE_DICTIONARY;
     }
+
     // Reuse page decoder
     if (_decoders.find(static_cast<int>(encoding)) != _decoders.end()) {
         _page_decoder = _decoders[static_cast<int>(encoding)].get();
@@ -104,7 +107,7 @@ Status ColumnChunkReader::load_page_data() {
 Status ColumnChunkReader::_decode_dict_page() {
     int64_t dict_offset = _metadata.dictionary_page_offset;
     _page_reader->seek_to_page(dict_offset);
-    _page_reader->next_page();
+    _page_reader->next_page_header();
     const tparquet::PageHeader& header = *_page_reader->get_page_header();
     DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header.type);
     // TODO(gaoxin): decode dictionary page
@@ -119,10 +122,10 @@ void ColumnChunkReader::_reserve_decompress_buf(size_t 
size) {
 }
 
 Status ColumnChunkReader::skip_values(size_t num_values) {
-    if (UNLIKELY(_num_values < num_values)) {
+    if (UNLIKELY(_remaining_num_values < num_values)) {
         return Status::IOError("Skip too many values in current page");
     }
-    _num_values -= num_values;
+    _remaining_num_values -= num_values;
     return _page_decoder->skip_values(num_values);
 }
 
@@ -138,27 +141,27 @@ size_t ColumnChunkReader::get_def_levels(level_t* levels, 
size_t n) {
 
 Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, DataTypePtr& 
data_type,
                                         size_t num_values) {
-    if (UNLIKELY(_num_values < num_values)) {
+    if (UNLIKELY(_remaining_num_values < num_values)) {
         return Status::IOError("Decode too many values in current page");
     }
-    _num_values -= num_values;
+    _remaining_num_values -= num_values;
     return _page_decoder->decode_values(doris_column, data_type, num_values);
 }
 
 Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column, 
DataTypePtr& data_type,
                                         size_t num_values) {
-    if (UNLIKELY(_num_values < num_values)) {
+    if (UNLIKELY(_remaining_num_values < num_values)) {
         return Status::IOError("Decode too many values in current page");
     }
-    _num_values -= num_values;
+    _remaining_num_values -= num_values;
     return _page_decoder->decode_values(doris_column, data_type, num_values);
 }
 
 Status ColumnChunkReader::decode_values(Slice& slice, size_t num_values) {
-    if (UNLIKELY(_num_values < num_values)) {
+    if (UNLIKELY(_remaining_num_values < num_values)) {
         return Status::IOError("Decode too many values in current page");
     }
-    _num_values -= num_values;
+    _remaining_num_values -= num_values;
     return _page_decoder->decode_values(slice, num_values);
 }
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index f8510d4b37..b248ba0a51 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -85,10 +85,10 @@ public:
     // and initialize the repetition and definition level decoder for current 
page data.
     Status load_page_data();
     // The remaining number of values in current page(including null values). 
Decreased when reading or skipping.
-    uint32_t num_values() const { return _num_values; };
+    uint32_t remaining_num_values() const { return _remaining_num_values; };
     // null values are not analyzing from definition levels
     // the caller should maintain the consistency after analyzing null values 
from definition levels.
-    void dec_num_values(uint32_t dec_num) { _num_values -= dec_num; };
+    void dec_num_values(uint32_t dec_num) { _remaining_num_values -= dec_num; 
};
     // Get the raw data of current page.
     Slice& get_page_data() { return _page_data; }
 
@@ -116,6 +116,7 @@ private:
     FieldSchema* _field_schema;
     level_t _max_rep_level;
     level_t _max_def_level;
+    tparquet::LogicalType _parquet_logical_type;
 
     BufferedStreamReader* _stream_reader;
     // tparquet::ColumnChunk* _column_chunk;
@@ -127,7 +128,7 @@ private:
 
     LevelDecoder _rep_level_decoder;
     LevelDecoder _def_level_decoder;
-    uint32_t _num_values = 0;
+    uint32_t _remaining_num_values = 0;
     Slice _page_data;
     std::unique_ptr<uint8_t[]> _decompress_buf;
     size_t _decompress_buf_size = 0;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 547dfba3bd..e7b189e40c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -19,50 +19,83 @@
 
 #include <common/status.h>
 #include <gen_cpp/parquet_types.h>
+#include <vec/columns/columns_number.h>
 
 #include "schema_desc.h"
 #include "vparquet_column_chunk_reader.h"
 
 namespace doris::vectorized {
 
-Status ScalarColumnReader::init(const FileReader* file, const FieldSchema* 
field,
-                                const tparquet::ColumnChunk* chunk, const 
TypeDescriptor& col_type,
-                                int64_t chunk_size) {
-    // todo1: init column chunk reader
-    // BufferedFileStreamReader stream_reader(reader, 0, chunk_size);
-    // _chunk_reader(&stream_reader, chunk, field);
-    // _chunk_reader.init();
-    return Status();
-}
-
-Status ParquetColumnReader::create(const FileReader* file, int64_t chunk_size,
-                                   const FieldSchema* field, const 
ParquetReadColumn& column,
-                                   const TypeDescriptor& col_type,
+Status ParquetColumnReader::create(FileReader* file, FieldSchema* field,
+                                   const ParquetReadColumn& column,
                                    const tparquet::RowGroup& row_group,
-                                   const ParquetColumnReader* reader) {
+                                   std::vector<RowRange>& row_ranges,
+                                   std::unique_ptr<ParquetColumnReader>& 
reader) {
     if (field->type.type == TYPE_MAP || field->type.type == TYPE_STRUCT) {
         return Status::Corruption("not supported type");
     }
     if (field->type.type == TYPE_ARRAY) {
         return Status::Corruption("not supported array type yet");
     } else {
+        VLOG_DEBUG << "field->physical_column_index: " << 
field->physical_column_index;
+        tparquet::ColumnChunk chunk = 
row_group.columns[field->physical_column_index];
         ScalarColumnReader* scalar_reader = new ScalarColumnReader(column);
-        RETURN_IF_ERROR(scalar_reader->init(file, field,
-                                            
&row_group.columns[field->physical_column_index],
-                                            col_type, chunk_size));
-        reader = scalar_reader;
+        scalar_reader->init_column_metadata(chunk);
+        RETURN_IF_ERROR(scalar_reader->init(file, field, &chunk, row_ranges));
+        reader.reset(scalar_reader);
     }
     return Status::OK();
 }
 
-Status ScalarColumnReader::read_column_data(const tparquet::RowGroup& 
row_group_meta,
-                                            ColumnPtr* data) {
-    // todo2: read data with chunk reader to load page data
-    // while (_chunk_reader.has_next) {
-    // _chunk_reader.next_page();
-    // _chunk_reader.load_page_data();
-    // }
-    return Status();
+void ParquetColumnReader::init_column_metadata(const tparquet::ColumnChunk& 
chunk) {
+    auto chunk_meta = chunk.meta_data;
+    int64_t chunk_start = chunk_meta.__isset.dictionary_page_offset
+                                  ? chunk_meta.dictionary_page_offset
+                                  : chunk_meta.data_page_offset;
+    size_t chunk_len = chunk_meta.total_compressed_size;
+    _metadata.reset(new ParquetColumnMetadata(chunk_start, chunk_len, 
chunk_meta));
+}
+
+void ParquetColumnReader::_skipped_pages() {}
+
+Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, 
tparquet::ColumnChunk* chunk,
+                                std::vector<RowRange>& row_ranges) {
+    BufferedFileStreamReader stream_reader(file, _metadata->start_offset(), 
_metadata->size());
+    _row_ranges.reset(&row_ranges);
+    _chunk_reader.reset(new ColumnChunkReader(&stream_reader, chunk, field));
+    _chunk_reader->init();
+    return Status::OK();
+}
+
+Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, 
DataTypePtr& type,
+                                            size_t batch_size, size_t* 
read_rows, bool* eof) {
+    if (_chunk_reader->remaining_num_values() <= 0) {
+        // seek to next page header
+        _chunk_reader->next_page();
+        if (_row_ranges->size() != 0) {
+            _skipped_pages();
+        }
+        // load data to decoder
+        _chunk_reader->load_page_data();
+    }
+    size_t read_values = _chunk_reader->remaining_num_values() < batch_size
+                                 ? _chunk_reader->remaining_num_values()
+                                 : batch_size;
+    *read_rows = read_values;
+    WhichDataType which_type(type);
+    switch (_metadata->t_metadata().type) {
+    case tparquet::Type::INT32: {
+        _chunk_reader->decode_values(doris_column, type, read_values);
+        return Status::OK();
+    }
+    case tparquet::Type::INT64: {
+        // todo: test int64
+        return Status::OK();
+    }
+    default:
+        return Status::Corruption("unsupported parquet data type");
+    }
+    return Status::OK();
 }
 
 void ScalarColumnReader::close() {}
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 68a38607e6..696fbe5db0 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -20,36 +20,78 @@
 #include <gen_cpp/parquet_types.h>
 
 #include "schema_desc.h"
+#include "vparquet_column_chunk_reader.h"
 #include "vparquet_reader.h"
-//#include "vparquet_column_chunk_reader.h"
 
 namespace doris::vectorized {
 
+struct RowRange;
 class ParquetReadColumn;
 
+class ParquetColumnMetadata {
+public:
+    ParquetColumnMetadata(int64_t chunk_start_offset, int64_t chunk_length,
+                          tparquet::ColumnMetaData metadata)
+            : _chunk_start_offset(chunk_start_offset),
+              _chunk_length(chunk_length),
+              _metadata(metadata) {};
+
+    ~ParquetColumnMetadata() = default;
+    int64_t start_offset() const { return _chunk_start_offset; };
+    int64_t size() const { return _chunk_length; };
+    tparquet::ColumnMetaData t_metadata() { return _metadata; };
+
+private:
+    int64_t _chunk_start_offset;
+    int64_t _chunk_length;
+    tparquet::ColumnMetaData _metadata;
+};
+
 class ParquetColumnReader {
 public:
     ParquetColumnReader(const ParquetReadColumn& column) : _column(column) {};
-    virtual ~ParquetColumnReader() = 0;
-    virtual Status read_column_data(const tparquet::RowGroup& row_group_meta, 
ColumnPtr* data) = 0;
-    static Status create(const FileReader* file, int64_t chunk_size, const 
FieldSchema* field,
-                         const ParquetReadColumn& column, const 
TypeDescriptor& col_type,
-                         const tparquet::RowGroup& row_group, const 
ParquetColumnReader* reader);
+    virtual ~ParquetColumnReader() = default;
+    virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& 
type, size_t batch_size,
+                                    size_t* read_rows, bool* eof) = 0;
+    static Status create(FileReader* file, FieldSchema* field, const 
ParquetReadColumn& column,
+                         const tparquet::RowGroup& row_group, 
std::vector<RowRange>& row_ranges,
+                         std::unique_ptr<ParquetColumnReader>& reader);
+    void init_column_metadata(const tparquet::ColumnChunk& chunk);
     virtual void close() = 0;
 
+protected:
+    void _skipped_pages();
+
 protected:
     const ParquetReadColumn& _column;
-    //    const ColumnChunkReader& _chunk_reader;
+    std::unique_ptr<ParquetColumnMetadata> _metadata;
+    std::unique_ptr<std::vector<RowRange>> _row_ranges;
 };
 
 class ScalarColumnReader : public ParquetColumnReader {
 public:
     ScalarColumnReader(const ParquetReadColumn& column) : 
ParquetColumnReader(column) {};
     ~ScalarColumnReader() override = default;
-    Status init(const FileReader* file, const FieldSchema* field,
-                const tparquet::ColumnChunk* chunk, const TypeDescriptor& 
col_type,
-                int64_t chunk_size);
-    Status read_column_data(const tparquet::RowGroup& row_group_meta, 
ColumnPtr* data) override;
+    Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* 
chunk,
+                std::vector<RowRange>& row_ranges);
+    Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t 
batch_size,
+                            size_t* read_rows, bool* eof) override;
     void close() override;
+
+private:
+    std::unique_ptr<ColumnChunkReader> _chunk_reader;
 };
+
+//class ArrayColumnReader : public ParquetColumnReader {
+//public:
+//    ArrayColumnReader(const ParquetReadColumn& column) : 
ParquetColumnReader(column) {};
+//    ~ArrayColumnReader() override = default;
+//    Status init(FileReader* file, FieldSchema* field,
+//                tparquet::ColumnChunk* chunk, const TypeDescriptor& col_type,
+//                int64_t chunk_size);
+//    Status read_column_data(ColumnPtr* data) override;
+//    void close() override;
+//private:
+//    std::unique_ptr<ColumnChunkReader> _chunk_reader;
+//};
 }; // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp 
b/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp
index 445ce76318..4e413ec9e9 100644
--- a/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp
@@ -39,7 +39,7 @@ Status FileMetaData::init_schema() {
     return Status();
 }
 
-const tparquet::FileMetaData& FileMetaData::to_thrift_metadata() {
+tparquet::FileMetaData& FileMetaData::to_thrift_metadata() {
     return _metadata;
 }
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h 
b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
index 53d08fa855..1f4727242d 100644
--- a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
+++ b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
@@ -27,7 +27,7 @@ public:
     FileMetaData(tparquet::FileMetaData& metadata);
     ~FileMetaData() = default;
     Status init_schema();
-    const tparquet::FileMetaData& to_thrift_metadata();
+    tparquet::FileMetaData& to_thrift_metadata();
     int32_t num_row_groups() const { return _num_groups; }
     int32_t num_columns() const { return _num_columns; };
     int32_t num_rows() const { return _num_rows; };
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 25b7819e8f..751e43863a 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -24,235 +24,61 @@
 namespace doris::vectorized {
 
 RowGroupReader::RowGroupReader(doris::FileReader* file_reader,
-                               const std::shared_ptr<FileMetaData>& 
file_metadata,
                                const std::vector<ParquetReadColumn>& 
read_columns,
-                               const std::map<std::string, int>& map_column,
-                               const std::vector<ExprContext*>& conjunct_ctxs)
+                               const int32_t row_group_id, tparquet::RowGroup& 
row_group)
         : _file_reader(file_reader),
-          _file_metadata(file_metadata),
           _read_columns(read_columns),
-          _map_column(map_column),
-          _conjunct_ctxs(conjunct_ctxs),
-          _current_row_group(-1) {}
+          _row_group_id(row_group_id),
+          _row_group_meta(row_group),
+          _total_rows(row_group.num_rows) {}
 
 RowGroupReader::~RowGroupReader() {
-    for (auto& column_reader : _column_readers) {
-        auto reader = column_reader.second;
-        reader->close();
-        delete reader;
-        reader = nullptr;
-    }
     _column_readers.clear();
 }
 
-Status RowGroupReader::init(const TupleDescriptor* tuple_desc, int64_t 
split_start_offset,
-                            int64_t split_size) {
-    _tuple_desc = tuple_desc;
-    _split_start_offset = split_start_offset;
-    _split_size = split_size;
-    _init_conjuncts(tuple_desc, _conjunct_ctxs);
-    RETURN_IF_ERROR(_init_column_readers());
+Status RowGroupReader::init(const FieldDescriptor& schema, 
std::vector<RowRange>& row_ranges) {
+    VLOG_DEBUG << "Row group id: " << _row_group_id;
+    RETURN_IF_ERROR(_init_column_readers(schema, row_ranges));
     return Status::OK();
 }
 
-void RowGroupReader::_init_conjuncts(const TupleDescriptor* tuple_desc,
-                                     const std::vector<ExprContext*>& 
conjunct_ctxs) {
-    if (tuple_desc->slots().empty()) {
-        return;
-    }
-    for (auto& read_col : _read_columns) {
-        _parquet_column_ids.emplace(read_col.parquet_column_id);
-    }
-
-    for (int i = 0; i < tuple_desc->slots().size(); i++) {
-        auto col_iter = _map_column.find(tuple_desc->slots()[i]->col_name());
-        if (col_iter == _map_column.end()) {
-            continue;
-        }
-        int parquet_col_id = col_iter->second;
-        if (_parquet_column_ids.end() == 
_parquet_column_ids.find(parquet_col_id)) {
-            continue;
-        }
-        for (int conj_idx = 0; conj_idx < conjunct_ctxs.size(); conj_idx++) {
-            Expr* conjunct = conjunct_ctxs[conj_idx]->root();
-            if (conjunct->get_num_children() == 0) {
-                continue;
-            }
-            Expr* raw_slot = conjunct->get_child(0);
-            if (TExprNodeType::SLOT_REF != raw_slot->node_type()) {
-                continue;
-            }
-            SlotRef* slot_ref = (SlotRef*)raw_slot;
-            SlotId conjunct_slot_id = slot_ref->slot_id();
-            if (conjunct_slot_id == tuple_desc->slots()[i]->id()) {
-                // Get conjuncts by conjunct_slot_id
-                auto iter = _slot_conjuncts.find(conjunct_slot_id);
-                if (_slot_conjuncts.end() == iter) {
-                    std::vector<ExprContext*> conjuncts;
-                    conjuncts.emplace_back(conjunct_ctxs[conj_idx]);
-                    _slot_conjuncts.emplace(std::make_pair(conjunct_slot_id, 
conjuncts));
-                } else {
-                    std::vector<ExprContext*> conjuncts = iter->second;
-                    conjuncts.emplace_back(conjunct_ctxs[conj_idx]);
-                }
-            }
-        }
-    }
-}
-
-Status RowGroupReader::_init_column_readers() {
+Status RowGroupReader::_init_column_readers(const FieldDescriptor& schema,
+                                            std::vector<RowRange>& row_ranges) 
{
     for (auto& read_col : _read_columns) {
         SlotDescriptor* slot_desc = read_col.slot_desc;
-        FieldDescriptor schema = _file_metadata->schema();
         TypeDescriptor col_type = slot_desc->type();
-        const auto& field = schema.get_column(slot_desc->col_name());
-        const tparquet::RowGroup row_group =
-                
_file_metadata->to_thrift_metadata().row_groups[_current_row_group];
-        ParquetColumnReader* reader = nullptr;
-        RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, 
MAX_PARQUET_BLOCK_SIZE, field,
-                                                    read_col, 
slot_desc->type(), row_group,
-                                                    reader));
+        auto field = 
const_cast<FieldSchema*>(schema.get_column(slot_desc->col_name()));
+        VLOG_DEBUG << "field: " << field->debug_string();
+        std::unique_ptr<ParquetColumnReader> reader;
+        RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, 
read_col, _row_group_meta,
+                                                    row_ranges, reader));
         if (reader == nullptr) {
+            VLOG_DEBUG << "Init row group reader failed";
             return Status::Corruption("Init row group reader failed");
         }
-        _column_readers[slot_desc->id()] = reader;
+        _column_readers[slot_desc->id()] = std::move(reader);
     }
     return Status::OK();
 }
 
-Status RowGroupReader::fill_columns_data(Block* block, const int32_t group_id) 
{
-    // get ColumnWithTypeAndName from src_block
+Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool* 
_batch_eof) {
+    if (_read_rows >= _total_rows) {
+        *_batch_eof = true;
+    }
     for (auto& read_col : _read_columns) {
-        const tparquet::RowGroup row_group =
-                
_file_metadata->to_thrift_metadata().row_groups[_current_row_group];
-        auto& column_with_type_and_name = 
block->get_by_name(read_col.slot_desc->col_name());
-        
RETURN_IF_ERROR(_column_readers[read_col.slot_desc->id()]->read_column_data(
-                row_group, &column_with_type_and_name.column));
-        VLOG_DEBUG << column_with_type_and_name.name;
+        auto slot_desc = read_col.slot_desc;
+        auto& column_with_type_and_name = 
block->get_by_name(slot_desc->col_name());
+        auto column_ptr = column_with_type_and_name.column;
+        auto column_type = column_with_type_and_name.type;
+        size_t batch_read_rows = 0;
+        RETURN_IF_ERROR(_column_readers[slot_desc->id()]->read_column_data(
+                column_ptr, column_type, batch_size, &batch_read_rows, 
_batch_eof));
+        _read_rows += batch_read_rows;
+        VLOG_DEBUG << "read column: " << column_with_type_and_name.name;
+        VLOG_DEBUG << "read rows in column: " << batch_read_rows;
     }
     // use data fill utils read column data to column ptr
     return Status::OK();
 }
 
-Status RowGroupReader::get_next_row_group(const int32_t* group_id) {
-    int32_t total_group = _file_metadata->num_row_groups();
-    if (total_group == 0 || _file_metadata->num_rows() == 0 || _split_size < 
0) {
-        return Status::EndOfFile("No row group need read");
-    }
-    while (_current_row_group < total_group) {
-        _current_row_group++;
-        const tparquet::RowGroup& row_group =
-                
_file_metadata->to_thrift_metadata().row_groups[_current_row_group];
-        if (!_is_misaligned_range_group(row_group)) {
-            continue;
-        }
-        bool filter_group = false;
-        RETURN_IF_ERROR(_process_row_group_filter(row_group, _conjunct_ctxs, 
&filter_group));
-        if (!filter_group) {
-            group_id = &_current_row_group;
-            break;
-        }
-    }
-    return Status::OK();
-}
-
-bool RowGroupReader::_is_misaligned_range_group(const tparquet::RowGroup& 
row_group) {
-    int64_t start_offset = 
_get_column_start_offset(row_group.columns[0].meta_data);
-
-    auto last_column = row_group.columns[row_group.columns.size() - 
1].meta_data;
-    int64_t end_offset = _get_column_start_offset(last_column) + 
last_column.total_compressed_size;
-
-    int64_t row_group_mid = start_offset + (end_offset - start_offset) / 2;
-    if (!(row_group_mid >= _split_start_offset &&
-          row_group_mid < _split_start_offset + _split_size)) {
-        return true;
-    }
-    return false;
-}
-
-Status RowGroupReader::_process_row_group_filter(const tparquet::RowGroup& 
row_group,
-                                                 const 
std::vector<ExprContext*>& conjunct_ctxs,
-                                                 bool* filter_group) {
-    _process_column_stat_filter(row_group, conjunct_ctxs, filter_group);
-    _init_chunk_dicts();
-    RETURN_IF_ERROR(_process_dict_filter(filter_group));
-    _init_bloom_filter();
-    RETURN_IF_ERROR(_process_bloom_filter(filter_group));
-    return Status::OK();
-}
-
-Status RowGroupReader::_process_column_stat_filter(const tparquet::RowGroup& 
row_group,
-                                                   const 
std::vector<ExprContext*>& conjunct_ctxs,
-                                                   bool* filter_group) {
-    int total_group = _file_metadata->num_row_groups();
-    // It will not filter if head_group_offset equals tail_group_offset
-    int64_t total_rows = 0;
-    int64_t total_bytes = 0;
-    for (int row_group_id = 0; row_group_id < total_group; row_group_id++) {
-        total_rows += row_group.num_rows;
-        total_bytes += row_group.total_byte_size;
-        for (SlotId slot_id = 0; slot_id < _tuple_desc->slots().size(); 
slot_id++) {
-            const std::string& col_name = 
_tuple_desc->slots()[slot_id]->col_name();
-            auto col_iter = _map_column.find(col_name);
-            if (col_iter == _map_column.end()) {
-                continue;
-            }
-            int parquet_col_id = col_iter->second;
-            if (_parquet_column_ids.end() == 
_parquet_column_ids.find(parquet_col_id)) {
-                // Column not exist in parquet file
-                continue;
-            }
-            auto slot_iter = _slot_conjuncts.find(slot_id);
-            if (slot_iter == _slot_conjuncts.end()) {
-                continue;
-            }
-            auto statistic = 
row_group.columns[parquet_col_id].meta_data.statistics;
-            if (!statistic.__isset.max || !statistic.__isset.min) {
-                continue;
-            }
-            // Min-max of statistic is plain-encoded value
-            *filter_group =
-                    _determine_filter_row_group(slot_iter->second, 
statistic.min, statistic.max);
-            if (*filter_group) {
-                _filtered_num_row_groups++;
-                VLOG_DEBUG << "Filter row group id: " << row_group_id;
-                break;
-            }
-        }
-    }
-    VLOG_DEBUG << "DEBUG total_rows: " << total_rows;
-    VLOG_DEBUG << "DEBUG total_bytes: " << total_bytes;
-    VLOG_DEBUG << "Parquet file: " << _file_metadata->schema().debug_string()
-               << ", Num of read row group: " << total_group
-               << ", and num of skip row group: " << _filtered_num_row_groups;
-    return Status::OK();
-}
-
-void RowGroupReader::_init_chunk_dicts() {}
-
-Status RowGroupReader::_process_dict_filter(bool* filter_group) {
-    return Status();
-}
-
-void RowGroupReader::_init_bloom_filter() {}
-
-Status RowGroupReader::_process_bloom_filter(bool* filter_group) {
-    RETURN_IF_ERROR(_file_reader->seek(0));
-    return Status();
-}
-
-int64_t RowGroupReader::_get_row_group_start_offset(const tparquet::RowGroup& 
row_group) {
-    if (row_group.__isset.file_offset) {
-        return row_group.file_offset;
-    }
-    return row_group.columns[0].meta_data.data_page_offset;
-}
-
-int64_t RowGroupReader::_get_column_start_offset(const 
tparquet::ColumnMetaData& column) {
-    if (column.__isset.dictionary_page_offset) {
-        DCHECK_LT(column.dictionary_page_offset, column.data_page_offset);
-        return column.dictionary_page_offset;
-    }
-    return column.data_page_offset;
-}
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index b69852f124..ea9eeed342 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -24,87 +24,30 @@
 #include "vparquet_file_metadata.h"
 #include "vparquet_reader.h"
 
-#define MAX_PARQUET_BLOCK_SIZE 1024
-
 namespace doris::vectorized {
 class ParquetReadColumn;
 class ParquetColumnReader;
+struct RowRange;
+
 class RowGroupReader {
 public:
     RowGroupReader(doris::FileReader* file_reader,
-                   const std::shared_ptr<FileMetaData>& file_metadata,
-                   const std::vector<ParquetReadColumn>& read_columns,
-                   const std::map<std::string, int>& map_column,
-                   const std::vector<ExprContext*>& conjunct_ctxs);
+                   const std::vector<ParquetReadColumn>& read_columns, const 
int32_t _row_group_id,
+                   tparquet::RowGroup& row_group);
     ~RowGroupReader();
-    Status init(const TupleDescriptor* tuple_desc, int64_t split_start_offset, 
int64_t split_size);
-    Status get_next_row_group(const int32_t* group_id);
-    Status fill_columns_data(Block* block, const int32_t group_id);
+    Status init(const FieldDescriptor& schema, std::vector<RowRange>& 
row_ranges);
+    Status next_batch(Block* block, size_t batch_size, bool* _batch_eof);
 
 private:
-    bool _is_misaligned_range_group(const tparquet::RowGroup& row_group);
-
-    Status _process_column_stat_filter(const tparquet::RowGroup& row_group,
-                                       const std::vector<ExprContext*>& 
conjunct_ctxs,
-                                       bool* filter_group);
-
-    void _init_conjuncts(const TupleDescriptor* tuple_desc,
-                         const std::vector<ExprContext*>& conjunct_ctxs);
-
-    Status _init_column_readers();
-
-    Status _process_row_group_filter(const tparquet::RowGroup& row_group,
-                                     const std::vector<ExprContext*>& 
conjunct_ctxs,
-                                     bool* filter_group);
-
-    void _init_chunk_dicts();
-
-    Status _process_dict_filter(bool* filter_group);
-
-    void _init_bloom_filter();
-
-    Status _process_bloom_filter(bool* filter_group);
-
-    int64_t _get_row_group_start_offset(const tparquet::RowGroup& row_group);
-    int64_t _get_column_start_offset(const tparquet::ColumnMetaData& 
column_init_column_readers);
-
-    bool _determine_filter_row_group(const std::vector<ExprContext*>& 
conjuncts,
-                                     const std::string& encoded_min,
-                                     const std::string& encoded_max);
-
-    void _eval_binary_predicate(ExprContext* ctx, const char* min_bytes, const 
char* max_bytes,
-                                bool& need_filter);
-
-    void _eval_in_predicate(ExprContext* ctx, const char* min_bytes, const 
char* max_bytes,
-                            bool& need_filter);
-
-    bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*> 
in_pred_values,
-                      const char* min_bytes, const char* max_bytes);
-
-    bool _eval_eq(PrimitiveType conjunct_type, void* value, const char* 
min_bytes,
-                  const char* max_bytes);
-
-    bool _eval_gt(PrimitiveType conjunct_type, void* value, const char* 
max_bytes);
-
-    bool _eval_ge(PrimitiveType conjunct_type, void* value, const char* 
max_bytes);
-
-    bool _eval_lt(PrimitiveType conjunct_type, void* value, const char* 
min_bytes);
-
-    bool _eval_le(PrimitiveType conjunct_type, void* value, const char* 
min_bytes);
+    Status _init_column_readers(const FieldDescriptor& schema, 
std::vector<RowRange>& row_ranges);
 
 private:
     doris::FileReader* _file_reader;
-    const std::shared_ptr<FileMetaData>& _file_metadata;
-    std::unordered_map<int32_t, ParquetColumnReader*> _column_readers;
-    const TupleDescriptor* _tuple_desc; // get all slot info
+    std::unordered_map<int32_t, std::unique_ptr<ParquetColumnReader>> 
_column_readers;
     const std::vector<ParquetReadColumn>& _read_columns;
-    const std::map<std::string, int>& _map_column;
-    std::unordered_set<int> _parquet_column_ids;
-    const std::vector<ExprContext*>& _conjunct_ctxs;
-    std::unordered_map<int, std::vector<ExprContext*>> _slot_conjuncts;
-    int64_t _split_start_offset;
-    int64_t _split_size;
-    int32_t _current_row_group;
-    int32_t _filtered_num_row_groups = 0;
+    const int32_t _row_group_id;
+    tparquet::RowGroup& _row_group_meta;
+    int64_t _read_rows = 0;
+    int64_t _total_rows;
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp 
b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
index 6365ec2163..40df65ace6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
@@ -21,22 +21,22 @@
 
 namespace doris::vectorized {
 
-PageIndex::~PageIndex() {
-    if (_column_index != nullptr) {
-        delete _column_index;
-        _column_index = nullptr;
+Status PageIndex::create_skipped_row_range(tparquet::OffsetIndex& offset_index,
+                                           int total_rows_of_group, int 
page_idx,
+                                           RowRange* row_range) {
+    const auto& page_locations = offset_index.page_locations;
+    DCHECK_LT(page_idx, page_locations.size());
+    row_range->first_row = page_locations[page_idx].first_row_index;
+    if (page_idx == page_locations.size() - 1) {
+        row_range->last_row = total_rows_of_group - 1;
+    } else {
+        row_range->last_row = page_locations[page_idx + 1].first_row_index - 1;
     }
-    if (_offset_index != nullptr) {
-        delete _offset_index;
-        _offset_index = nullptr;
-    }
-}
-
-Status PageIndex::get_row_range_for_page() {
-    return Status();
+    return Status::OK();
 }
 
-Status PageIndex::collect_skipped_page_range() {
+Status PageIndex::collect_skipped_page_range(std::vector<ExprContext*> 
conjuncts,
+                                             std::vector<int> page_range) {
     return Status();
 }
 
@@ -67,20 +67,21 @@ bool PageIndex::check_and_get_page_index_ranges(const 
std::vector<tparquet::Colu
     return has_page_index;
 }
 
-Status PageIndex::parse_column_index(const tparquet::ColumnChunk& chunk, const 
uint8_t* buff) {
+Status PageIndex::parse_column_index(const tparquet::ColumnChunk& chunk, const 
uint8_t* buff,
+                                     tparquet::ColumnIndex* column_index) {
     int64_t buffer_offset = chunk.column_index_offset - _column_index_start;
     uint32_t length = chunk.column_index_length;
     DCHECK_LE(buffer_offset + length, _column_index_size);
-    RETURN_IF_ERROR(deserialize_thrift_msg(buff + buffer_offset, &length, 
true, _column_index));
+    RETURN_IF_ERROR(deserialize_thrift_msg(buff + buffer_offset, &length, 
true, column_index));
     return Status::OK();
 }
 
 Status PageIndex::parse_offset_index(const tparquet::ColumnChunk& chunk, const 
uint8_t* buff,
-                                     int64_t buffer_size) {
+                                     int64_t buffer_size, 
tparquet::OffsetIndex* offset_index) {
     int64_t buffer_offset = chunk.offset_index_offset - _offset_index_start + 
_column_index_size;
     uint32_t length = chunk.offset_index_length;
     DCHECK_LE(buffer_offset + length, buffer_size);
-    RETURN_IF_ERROR(deserialize_thrift_msg(buff + buffer_offset, &length, 
true, _offset_index));
+    RETURN_IF_ERROR(deserialize_thrift_msg(buff + buffer_offset, &length, 
true, offset_index));
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.h 
b/be/src/vec/exec/format/parquet/vparquet_page_index.h
index a26074ab0d..5894a4e8d6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_index.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_index.h
@@ -19,30 +19,32 @@
 #include <common/status.h>
 #include <gen_cpp/parquet_types.h>
 
+#include "exprs/expr_context.h"
+
 namespace doris::vectorized {
+class ParquetReader;
+struct RowRange;
 
 class PageIndex {
 public:
     PageIndex() = default;
-    ~PageIndex();
-    Status get_row_range_for_page();
-    Status collect_skipped_page_range();
+    ~PageIndex() = default;
+    Status create_skipped_row_range(tparquet::OffsetIndex& offset_index, int 
total_rows_of_group,
+                                    int page_idx, RowRange* row_range);
+    Status collect_skipped_page_range(std::vector<ExprContext*> conjuncts,
+                                      std::vector<int> page_range);
     bool check_and_get_page_index_ranges(const 
std::vector<tparquet::ColumnChunk>& columns);
-    Status parse_column_index(const tparquet::ColumnChunk& chunk, const 
uint8_t* buff);
+    Status parse_column_index(const tparquet::ColumnChunk& chunk, const 
uint8_t* buff,
+                              tparquet::ColumnIndex* _column_index);
     Status parse_offset_index(const tparquet::ColumnChunk& chunk, const 
uint8_t* buff,
-                              int64_t buffer_size);
+                              int64_t buffer_size, tparquet::OffsetIndex* 
_offset_index);
 
-private:
 private:
     friend class ParquetReader;
     int64_t _column_index_start;
     int64_t _column_index_size;
     int64_t _offset_index_start;
     int64_t _offset_index_size;
-
-    tparquet::OffsetIndex* _offset_index;
-    tparquet::ColumnIndex* _column_index;
-    // row range define
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
index f554be169e..94a291f40e 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
@@ -28,7 +28,7 @@ static constexpr size_t initPageHeaderSize = 1024;
 PageReader::PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t 
length)
         : _reader(reader), _start_offset(offset), _end_offset(offset + length) 
{}
 
-Status PageReader::next_page() {
+Status PageReader::next_page_header() {
     if (_offset < _start_offset || _offset >= _end_offset) {
         return Status::IOError("Out-of-bounds Access");
     }
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
index cf95812ead..256ddd13d1 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
@@ -34,7 +34,7 @@ public:
 
     bool has_next_page() const { return _offset < _end_offset; }
 
-    Status next_page();
+    Status next_page_header();
 
     Status skip_page();
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 815cba3c15..b16df6b557 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -21,13 +21,14 @@
 
 namespace doris::vectorized {
 ParquetReader::ParquetReader(FileReader* file_reader, int32_t 
num_of_columns_from_file,
-                             int64_t range_start_offset, int64_t range_size)
+                             size_t batch_size, int64_t range_start_offset, 
int64_t range_size)
         : _num_of_columns_from_file(num_of_columns_from_file),
+          _batch_size(batch_size),
           _range_start_offset(range_start_offset),
           _range_size(range_size) {
     _file_reader = file_reader;
     _total_groups = 0;
-    //    _current_group = 0;
+    _current_row_group_id = 0;
     //        _statistics = std::make_shared<Statistics>();
 }
 
@@ -36,6 +37,10 @@ ParquetReader::~ParquetReader() {
 }
 
 void ParquetReader::close() {
+    for (auto& conjuncts : _slot_conjuncts) {
+        conjuncts.second.clear();
+    }
+    _slot_conjuncts.clear();
     if (_file_reader != nullptr) {
         _file_reader->close();
         delete _file_reader;
@@ -45,26 +50,26 @@ void ParquetReader::close() {
 
 Status ParquetReader::init_reader(const TupleDescriptor* tuple_desc,
                                   const std::vector<SlotDescriptor*>& 
tuple_slot_descs,
-                                  const std::vector<ExprContext*>& 
conjunct_ctxs,
+                                  std::vector<ExprContext*>& conjunct_ctxs,
                                   const std::string& timezone) {
     _file_reader->open();
+    _conjunct_ctxs.reset(&conjunct_ctxs);
     RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata));
-    auto metadata = _file_metadata->to_thrift_metadata();
-
-    _total_groups = metadata.row_groups.size();
+    _t_metadata.reset(&_file_metadata->to_thrift_metadata());
+    _total_groups = _file_metadata->num_row_groups();
     if (_total_groups == 0) {
         return Status::EndOfFile("Empty Parquet File");
     }
     auto schema_desc = _file_metadata->schema();
     for (int i = 0; i < _file_metadata->num_columns(); ++i) {
-        LOG(WARNING) << schema_desc.debug_string();
+        // for test
+        VLOG_DEBUG << schema_desc.debug_string();
         // Get the Column Reader for the boolean column
         _map_column.emplace(schema_desc.get_column(i)->name, i);
     }
-    LOG(WARNING) << "";
     RETURN_IF_ERROR(_init_read_columns(tuple_slot_descs));
     RETURN_IF_ERROR(
-            _init_row_group_reader(tuple_desc, _range_start_offset, 
_range_size, conjunct_ctxs));
+            _init_row_group_readers(tuple_desc, _range_start_offset, 
_range_size, conjunct_ctxs));
     return Status::OK();
 }
 
@@ -81,7 +86,7 @@ Status ParquetReader::_init_read_columns(const 
std::vector<SlotDescriptor*>& tup
         } else {
             std::stringstream str_error;
             str_error << "Invalid Column Name:" << slot_desc->col_name();
-            LOG(WARNING) << str_error.str();
+            VLOG_DEBUG << str_error.str();
             return Status::InvalidArgument(str_error.str());
         }
         ParquetReadColumn column;
@@ -90,63 +95,231 @@ Status ParquetReader::_init_read_columns(const 
std::vector<SlotDescriptor*>& tup
         auto physical_type = 
_file_metadata->schema().get_column(parquet_col_id)->physical_type;
         column.parquet_type = physical_type;
         _read_columns.emplace_back(column);
+        VLOG_DEBUG << "slot_desc " << slot_desc->debug_string();
     }
     return Status::OK();
 }
 
-Status ParquetReader::read_next_batch(Block* block) {
-    int32_t group_id = 0;
-    RETURN_IF_ERROR(_row_group_reader->get_next_row_group(&group_id));
-    auto metadata = _file_metadata->to_thrift_metadata();
-    auto column_chunks = metadata.row_groups[group_id].columns;
-    if (_has_page_index(column_chunks)) {
-        Status st = _process_page_index(column_chunks);
-        if (st.ok()) {
-            // todo: process filter page
-            return Status::OK();
-        } else {
-            // todo: record profile
-            LOG(WARNING) << "";
+Status ParquetReader::read_next_batch(Block* block, bool* eof) {
+    DCHECK(_total_groups == _row_group_readers.size());
+    if (_total_groups == 0) {
+        *eof = true;
+    }
+    bool _batch_eof = false;
+    auto row_group_reader = _row_group_readers[_current_row_group_id];
+    RETURN_IF_ERROR(row_group_reader->next_batch(block, _batch_size, 
&_batch_eof));
+    if (_batch_eof) {
+        _current_row_group_id++;
+        if (_current_row_group_id > _total_groups) {
+            *eof = true;
+        }
+    }
+    return Status::OK();
+}
+
+Status ParquetReader::_init_row_group_readers(const TupleDescriptor* 
tuple_desc,
+                                              int64_t range_start_offset, 
int64_t range_size,
+                                              const std::vector<ExprContext*>& 
conjunct_ctxs) {
+    std::vector<int32_t> read_row_groups;
+    RETURN_IF_ERROR(_filter_row_groups(&read_row_groups));
+    _init_conjuncts(tuple_desc, conjunct_ctxs);
+    for (auto row_group_id : read_row_groups) {
+        VLOG_DEBUG << "_has_page_index";
+        auto row_group = _t_metadata->row_groups[row_group_id];
+        auto column_chunks = row_group.columns;
+        std::vector<RowRange> skipped_row_ranges;
+        if (_has_page_index(column_chunks)) {
+            VLOG_DEBUG << "_process_page_index";
+            RETURN_IF_ERROR(_process_page_index(row_group, 
skipped_row_ranges));
         }
+        std::shared_ptr<RowGroupReader> row_group_reader;
+        row_group_reader.reset(
+                new RowGroupReader(_file_reader, _read_columns, row_group_id, 
row_group));
+        // todo: can filter row with candidate ranges rather than skipped 
ranges
+        RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(), 
skipped_row_ranges));
+        _row_group_readers.emplace_back(row_group_reader);
     }
-    // metadata has been processed, fill parquet data to block
-    // block is the batch data of a row group. a row group has N batch
-    // push to scanner queue
-    _fill_block_data(block, group_id);
+    VLOG_DEBUG << "_init_row_group_reader finished";
     return Status::OK();
 }
 
-void ParquetReader::_fill_block_data(Block* block, int group_id) {
-    // make and init src block here
-    // read column chunk
-    _row_group_reader->fill_columns_data(block, group_id);
+void ParquetReader::_init_conjuncts(const TupleDescriptor* tuple_desc,
+                                    const std::vector<ExprContext*>& 
conjunct_ctxs) {
+    if (tuple_desc->slots().empty()) {
+        return;
+    }
+    std::unordered_set<int> parquet_col_ids(_include_column_ids.begin(), 
_include_column_ids.end());
+    for (int i = 0; i < tuple_desc->slots().size(); i++) {
+        auto col_iter = _map_column.find(tuple_desc->slots()[i]->col_name());
+        if (col_iter == _map_column.end()) {
+            continue;
+        }
+        int parquet_col_id = col_iter->second;
+        if (parquet_col_ids.end() == parquet_col_ids.find(parquet_col_id)) {
+            continue;
+        }
+        for (int conj_idx = 0; conj_idx < conjunct_ctxs.size(); conj_idx++) {
+            Expr* conjunct = conjunct_ctxs[conj_idx]->root();
+            if (conjunct->get_num_children() == 0) {
+                continue;
+            }
+            Expr* raw_slot = conjunct->get_child(0);
+            if (TExprNodeType::SLOT_REF != raw_slot->node_type()) {
+                continue;
+            }
+            SlotRef* slot_ref = (SlotRef*)raw_slot;
+            SlotId conjunct_slot_id = slot_ref->slot_id();
+            if (conjunct_slot_id == tuple_desc->slots()[i]->id()) {
+                // Get conjuncts by conjunct_slot_id
+                auto iter = _slot_conjuncts.find(conjunct_slot_id);
+                if (_slot_conjuncts.end() == iter) {
+                    std::vector<ExprContext*> conjuncts;
+                    conjuncts.emplace_back(conjunct_ctxs[conj_idx]);
+                    _slot_conjuncts.emplace(std::make_pair(conjunct_slot_id, 
conjuncts));
+                } else {
+                    std::vector<ExprContext*> conjuncts = iter->second;
+                    conjuncts.emplace_back(conjunct_ctxs[conj_idx]);
+                }
+            }
+        }
+    }
 }
 
-Status ParquetReader::_init_row_group_reader(const TupleDescriptor* tuple_desc,
-                                             int64_t range_start_offset, 
int64_t range_size,
-                                             const std::vector<ExprContext*>& 
conjunct_ctxs) {
-    // todo: extract as create()
-    _row_group_reader.reset(new RowGroupReader(_file_reader, _file_metadata, 
_read_columns,
-                                               _map_column, conjunct_ctxs));
-    RETURN_IF_ERROR(_row_group_reader->init(tuple_desc, range_start_offset, 
range_size));
+Status ParquetReader::_filter_row_groups(std::vector<int32_t>* 
read_row_group_ids) {
+    if (_total_groups == 0 || _file_metadata->num_rows() == 0 || _range_size < 
0) {
+        return Status::EndOfFile("No row group need read");
+    }
+    int32_t row_group_idx = -1;
+    while (row_group_idx < _total_groups) {
+        row_group_idx++;
+        const tparquet::RowGroup& row_group = 
_t_metadata->row_groups[row_group_idx];
+        if (_is_misaligned_range_group(row_group)) {
+            continue;
+        }
+        bool filter_group = false;
+        RETURN_IF_ERROR(_process_row_group_filter(row_group, &filter_group));
+        if (!filter_group) {
+            read_row_group_ids->emplace_back(row_group_idx);
+            break;
+        }
+    }
     return Status::OK();
 }
 
+bool ParquetReader::_is_misaligned_range_group(const tparquet::RowGroup& 
row_group) {
+    int64_t start_offset = 
_get_column_start_offset(row_group.columns[0].meta_data);
+
+    auto last_column = row_group.columns[row_group.columns.size() - 
1].meta_data;
+    int64_t end_offset = _get_column_start_offset(last_column) + 
last_column.total_compressed_size;
+
+    int64_t row_group_mid = start_offset + (end_offset - start_offset) / 2;
+    if (!(row_group_mid >= _range_start_offset &&
+          row_group_mid < _range_start_offset + _range_size)) {
+        return true;
+    }
+    return false;
+}
+
 bool ParquetReader::_has_page_index(std::vector<tparquet::ColumnChunk> 
columns) {
     _page_index.reset(new PageIndex());
     return _page_index->check_and_get_page_index_ranges(columns);
 }
 
-Status ParquetReader::_process_page_index(std::vector<tparquet::ColumnChunk> 
columns) {
+Status ParquetReader::_process_page_index(tparquet::RowGroup& row_group,
+                                          std::vector<RowRange>& 
skipped_row_ranges) {
     int64_t buffer_size = _page_index->_column_index_size + 
_page_index->_offset_index_size;
-    uint8_t buff[buffer_size];
     for (auto col_id : _include_column_ids) {
-        auto chunk = columns[col_id];
-        RETURN_IF_ERROR(_page_index->parse_column_index(chunk, buff));
-        // todo: use page index filter min/max val
-        RETURN_IF_ERROR(_page_index->parse_offset_index(chunk, buff, 
buffer_size));
-        // todo: calculate row range
+        uint8_t buff[buffer_size];
+        auto chunk = row_group.columns[col_id];
+        tparquet::ColumnIndex column_index;
+        RETURN_IF_ERROR(_page_index->parse_column_index(chunk, buff, 
&column_index));
+        VLOG_DEBUG << "_column_index_size : " << 
_page_index->_column_index_size;
+        VLOG_DEBUG << "_page_index 0  max_values : " << 
column_index.max_values[0];
+        const int num_of_page = column_index.null_pages.size();
+        if (num_of_page <= 1) {
+            break;
+        }
+        auto conjunct_iter = _slot_conjuncts.find(col_id);
+        if (_slot_conjuncts.end() == conjunct_iter) {
+            continue;
+        }
+        auto conjuncts = conjunct_iter->second;
+        std::vector<int> candidate_page_range;
+        _page_index->collect_skipped_page_range(conjuncts, 
candidate_page_range);
+        tparquet::OffsetIndex offset_index;
+        RETURN_IF_ERROR(_page_index->parse_offset_index(chunk, buff, 
buffer_size, &offset_index));
+        VLOG_DEBUG << "page_locations size : " << 
offset_index.page_locations.size();
+        for (int page_id : candidate_page_range) {
+            RowRange skipped_row_range;
+            _page_index->create_skipped_row_range(offset_index, 
row_group.num_rows, page_id,
+                                                  &skipped_row_range);
+            skipped_row_ranges.emplace_back(skipped_row_range);
+        }
+    }
+    return Status::OK();
+}
+
+Status ParquetReader::_process_row_group_filter(const tparquet::RowGroup& 
row_group,
+                                                bool* filter_group) {
+    _process_column_stat_filter(row_group.columns, filter_group);
+    _init_chunk_dicts();
+    RETURN_IF_ERROR(_process_dict_filter(filter_group));
+    _init_bloom_filter();
+    RETURN_IF_ERROR(_process_bloom_filter(filter_group));
+    return Status::OK();
+}
+
+Status ParquetReader::_process_column_stat_filter(const 
std::vector<tparquet::ColumnChunk>& columns,
+                                                  bool* filter_group) {
+    // It will not filter if head_group_offset equals tail_group_offset
+    std::unordered_set<int> _parquet_column_ids(_include_column_ids.begin(),
+                                                _include_column_ids.end());
+    for (SlotId slot_id = 0; slot_id < _tuple_desc->slots().size(); slot_id++) 
{
+        auto slot_iter = _slot_conjuncts.find(slot_id);
+        if (slot_iter == _slot_conjuncts.end()) {
+            continue;
+        }
+        const std::string& col_name = 
_tuple_desc->slots()[slot_id]->col_name();
+        auto col_iter = _map_column.find(col_name);
+        if (col_iter == _map_column.end()) {
+            continue;
+        }
+        int parquet_col_id = col_iter->second;
+        if (_parquet_column_ids.end() == 
_parquet_column_ids.find(parquet_col_id)) {
+            // Column not exist in parquet file
+            continue;
+        }
+        auto statistic = columns[parquet_col_id].meta_data.statistics;
+        if (!statistic.__isset.max || !statistic.__isset.min) {
+            continue;
+        }
+        // Min-max of statistic is plain-encoded value
+        *filter_group = _determine_filter_min_max(slot_iter->second, 
statistic.min, statistic.max);
+        if (*filter_group) {
+            break;
+        }
     }
     return Status::OK();
 }
+
+void ParquetReader::_init_chunk_dicts() {}
+
+Status ParquetReader::_process_dict_filter(bool* filter_group) {
+    return Status();
+}
+
+void ParquetReader::_init_bloom_filter() {}
+
+Status ParquetReader::_process_bloom_filter(bool* filter_group) {
+    RETURN_IF_ERROR(_file_reader->seek(0));
+    return Status();
+}
+
+int64_t ParquetReader::_get_column_start_offset(const 
tparquet::ColumnMetaData& column) {
+    if (column.__isset.dictionary_page_offset) {
+        DCHECK_LT(column.dictionary_page_offset, column.data_page_offset);
+        return column.dictionary_page_offset;
+    }
+    return column.data_page_offset;
+}
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index a4cf4e70e2..c1d0ec4247 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -42,6 +42,12 @@ namespace doris::vectorized {
 //        int64_t total_bytes = 0;
 //    };
 class RowGroupReader;
+class PageIndex;
+
+struct RowRange {
+    int64_t first_row;
+    int64_t last_row;
+};
 
 class ParquetReadColumn {
 public:
@@ -58,49 +64,73 @@ private:
 
 class ParquetReader {
 public:
-    ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file,
+    ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file, 
size_t batch_size,
                   int64_t range_start_offset, int64_t range_size);
 
     ~ParquetReader();
 
     Status init_reader(const TupleDescriptor* tuple_desc,
                        const std::vector<SlotDescriptor*>& tuple_slot_descs,
-                       const std::vector<ExprContext*>& conjunct_ctxs, const 
std::string& timezone);
-
-    Status read_next_batch(Block* block);
+                       std::vector<ExprContext*>& conjunct_ctxs, const 
std::string& timezone);
 
-    bool has_next() const { return !_batch_eof; };
+    Status read_next_batch(Block* block, bool* eof);
 
-    //        std::shared_ptr<Statistics>& statistics() { return _statistics; }
+    // std::shared_ptr<Statistics>& statistics() { return _statistics; }
     void close();
 
     int64_t size() const { return _file_reader->size(); }
 
 private:
     Status _init_read_columns(const std::vector<SlotDescriptor*>& 
tuple_slot_descs);
-    Status _init_row_group_reader(const TupleDescriptor* tuple_desc, int64_t 
range_start_offset,
-                                  int64_t range_size,
-                                  const std::vector<ExprContext*>& 
conjunct_ctxs);
-    void _fill_block_data(Block* block, int group_id);
+    Status _init_row_group_readers(const TupleDescriptor* tuple_desc, int64_t 
range_start_offset,
+                                   int64_t range_size,
+                                   const std::vector<ExprContext*>& 
conjunct_ctxs);
+    void _init_conjuncts(const TupleDescriptor* tuple_desc,
+                         const std::vector<ExprContext*>& conjunct_ctxs);
+    // Page Index Filter
     bool _has_page_index(std::vector<tparquet::ColumnChunk> columns);
-    Status _process_page_index(std::vector<tparquet::ColumnChunk> columns);
+    Status _process_page_index(tparquet::RowGroup& row_group,
+                               std::vector<RowRange>& skipped_row_ranges);
+
+    // Row Group Filter
+    bool _is_misaligned_range_group(const tparquet::RowGroup& row_group);
+    Status _process_column_stat_filter(const 
std::vector<tparquet::ColumnChunk>& column_meta,
+                                       bool* filter_group);
+    Status _process_row_group_filter(const tparquet::RowGroup& row_group, 
bool* filter_group);
+    void _init_chunk_dicts();
+    Status _process_dict_filter(bool* filter_group);
+    void _init_bloom_filter();
+    Status _process_bloom_filter(bool* filter_group);
+    Status _filter_row_groups(std::vector<int32_t>* read_row_group_ids);
+    int64_t _get_column_start_offset(const tparquet::ColumnMetaData& 
column_init_column_readers);
+    bool _determine_filter_min_max(const std::vector<ExprContext*>& conjuncts,
+                                   const std::string& encoded_min, const 
std::string& encoded_max);
+    void _eval_binary_predicate(ExprContext* ctx, const char* min_bytes, const 
char* max_bytes,
+                                bool& need_filter);
+    void _eval_in_predicate(ExprContext* ctx, const char* min_bytes, const 
char* max_bytes,
+                            bool& need_filter);
 
 private:
     FileReader* _file_reader;
     std::shared_ptr<FileMetaData> _file_metadata;
-    std::shared_ptr<RowGroupReader> _row_group_reader;
+    std::unique_ptr<tparquet::FileMetaData> _t_metadata;
     std::shared_ptr<PageIndex> _page_index;
-    int _total_groups; // num of groups(stripes) of a parquet(orc) file
-    //    int _current_group;                     // current group(stripe)
+    std::vector<std::shared_ptr<RowGroupReader>> _row_group_readers;
+    int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file
+    int32_t _current_row_group_id;
     //        std::shared_ptr<Statistics> _statistics;
     const int32_t _num_of_columns_from_file;
-
     std::map<std::string, int> _map_column; // column-name <---> column-index
-    std::vector<int> _include_column_ids;   // columns that need to get from 
file
+    std::shared_ptr<std::vector<ExprContext*>> _conjunct_ctxs;
+    std::unordered_map<int, std::vector<ExprContext*>> _slot_conjuncts;
+    std::vector<int> _include_column_ids; // columns that need to get from file
     std::vector<ParquetReadColumn> _read_columns;
+    bool* _file_eof;
     // parquet file reader object
-    bool* _batch_eof;
+    size_t _batch_size;
     int64_t _range_start_offset;
     int64_t _range_size;
+
+    const TupleDescriptor* _tuple_desc; // get all slot info
 };
 } // namespace doris::vectorized
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp 
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index 95df8bd9a2..c334b105ed 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -147,7 +147,7 @@ static Status get_column_values(FileReader* file_reader, 
tparquet::ColumnChunk*
     // load page data into underlying container
     chunk_reader.load_page_data();
     // decode page data
-    return chunk_reader.decode_values(doris_column, data_type, 
chunk_reader.num_values());
+    return chunk_reader.decode_values(doris_column, data_type, 
chunk_reader.remaining_num_values());
 }
 
 static void create_block(std::unique_ptr<vectorized::Block>& block) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to