AshinGau commented on code in PR #12652:
URL: https://github.com/apache/doris/pull/12652#discussion_r972541784


##########
be/src/vec/exec/format/parquet/vparquet_column_reader.h:
##########
@@ -77,6 +80,9 @@ class ParquetColumnReader {
     std::unique_ptr<ColumnChunkReader> _chunk_reader;
     std::unique_ptr<level_t[]> _def_levels_buf = nullptr;
     size_t _def_levels_buf_size = 0;
+    std::unique_ptr<tparquet::OffsetIndex> _offset_index;

Review Comment:
   Why should release `tparquet::OffsetIndex`? Is it a heap or stack variable? 
It's better to release it in parent class if it's a heap variable.



##########
be/src/vec/exec/format/parquet/vparquet_reader.cpp:
##########
@@ -233,28 +233,57 @@ Status 
ParquetReader::_process_page_index(tparquet::RowGroup& row_group) {
     int64_t bytes_read = 0;
     RETURN_IF_ERROR(
             _file_reader->readat(_page_index->_column_index_start, 
buffer_size, &bytes_read, buff));
-    for (auto col_id : _include_column_ids) {
-        auto conjunct_iter = _slot_conjuncts.find(col_id);
+
+    std::vector<RowRange> skipped_row_ranges;
+    for (auto& read_col : _read_columns) {
+        auto conjunct_iter = _slot_conjuncts.find(read_col._parquet_col_id);
         if (_slot_conjuncts.end() == conjunct_iter) {
             continue;
         }
-        auto& chunk = row_group.columns[col_id];
+        auto& chunk = row_group.columns[read_col._parquet_col_id];
         tparquet::ColumnIndex column_index;
         RETURN_IF_ERROR(_page_index->parse_column_index(chunk, buff, 
&column_index));
-        const int num_of_page = column_index.null_pages.size();
-        if (num_of_page <= 1) {
+        const int num_of_pages = column_index.null_pages.size();
+        if (num_of_pages <= 0) {
             break;
         }
         auto& conjuncts = conjunct_iter->second;
-        std::vector<int> candidate_page_range;
-        _page_index->collect_skipped_page_range(conjuncts, 
candidate_page_range);
+        std::vector<int> skipped_page_range;
+        _page_index->collect_skipped_page_range(&column_index, conjuncts, 
skipped_page_range);
+        if (skipped_page_range.empty()) {
+            return Status::OK();
+        }
         tparquet::OffsetIndex offset_index;
         RETURN_IF_ERROR(_page_index->parse_offset_index(chunk, buff, 
buffer_size, &offset_index));
-        for (int page_id : candidate_page_range) {
+        for (int page_id : skipped_page_range) {
             RowRange skipped_row_range;
             _page_index->create_skipped_row_range(offset_index, 
row_group.num_rows, page_id,
                                                   &skipped_row_range);
-            _skipped_row_ranges.emplace_back(skipped_row_range);
+            // use the union row range
+            skipped_row_ranges.emplace_back(skipped_row_range);
+        }
+        _col_offsets.emplace(read_col._parquet_col_id, offset_index);
+    }
+    if (skipped_row_ranges.empty()) {
+        return Status::OK();
+    }
+
+    std::sort(skipped_row_ranges.begin(), skipped_row_ranges.end(),
+              [](const RowRange& lhs, const RowRange& rhs) {
+                  return std::tie(lhs.first_row, lhs.last_row) <
+                         std::tie(rhs.first_row, rhs.last_row);
+              });
+    int skip_end = -1;
+    for (auto& skip_range : skipped_row_ranges) {
+        LOG(WARNING) << skip_range.first_row << " " << skip_range.last_row << 
" | ";

Review Comment:
   Too many logs



##########
be/src/vec/exec/format/parquet/vparquet_reader.cpp:
##########
@@ -232,28 +232,57 @@ Status 
ParquetReader::_process_page_index(tparquet::RowGroup& row_group) {
     int64_t bytes_read = 0;
     RETURN_IF_ERROR(
             _file_reader->readat(_page_index->_column_index_start, 
buffer_size, &bytes_read, buff));
-    for (auto col_id : _include_column_ids) {
-        auto conjunct_iter = _slot_conjuncts.find(col_id);
+
+    std::vector<RowRange> skipped_row_ranges;
+    for (auto& read_col : _read_columns) {
+        auto conjunct_iter = _slot_conjuncts.find(read_col._parquet_col_id);
         if (_slot_conjuncts.end() == conjunct_iter) {
             continue;
         }
-        auto& chunk = row_group.columns[col_id];
+        auto& chunk = row_group.columns[read_col._parquet_col_id];
         tparquet::ColumnIndex column_index;
         RETURN_IF_ERROR(_page_index->parse_column_index(chunk, buff, 
&column_index));
-        const int num_of_page = column_index.null_pages.size();
-        if (num_of_page <= 1) {
+        const int num_of_pages = column_index.null_pages.size();
+        if (num_of_pages <= 0) {
             break;
         }
         auto& conjuncts = conjunct_iter->second;
-        std::vector<int> candidate_page_range;
-        _page_index->collect_skipped_page_range(conjuncts, 
candidate_page_range);
+        std::vector<int> skipped_page_range;
+        _page_index->collect_skipped_page_range(&column_index, conjuncts, 
skipped_page_range);
+        if (skipped_page_range.empty()) {
+            return Status::OK();
+        }
         tparquet::OffsetIndex offset_index;
         RETURN_IF_ERROR(_page_index->parse_offset_index(chunk, buff, 
buffer_size, &offset_index));
-        for (int page_id : candidate_page_range) {
+        for (int page_id : skipped_page_range) {
             RowRange skipped_row_range;
             _page_index->create_skipped_row_range(offset_index, 
row_group.num_rows, page_id,
                                                   &skipped_row_range);
-            _skipped_row_ranges.emplace_back(skipped_row_range);
+            // use the union row range
+            skipped_row_ranges.emplace_back(skipped_row_range);
+        }
+        _col_offsets.emplace(read_col._parquet_col_id, offset_index);
+    }
+    if (skipped_row_ranges.empty()) {
+        return Status::OK();
+    }
+
+    std::sort(skipped_row_ranges.begin(), skipped_row_ranges.end(),
+              [](const RowRange& lhs, const RowRange& rhs) {
+                  return std::tie(lhs.first_row, lhs.last_row) <
+                         std::tie(rhs.first_row, rhs.last_row);
+              });
+    int skip_end = -1;
+    for (auto& skip_range : skipped_row_ranges) {
+        VLOG_DEBUG << skip_range.first_row << " " << skip_range.last_row << " 
| ";

Review Comment:
   There is no description. Only numbers are output. It's kind of meaningless.



##########
be/src/vec/exec/format/parquet/vparquet_page_index.cpp:
##########
@@ -35,9 +36,34 @@ Status 
PageIndex::create_skipped_row_range(tparquet::OffsetIndex& offset_index,
     return Status::OK();
 }
 
-Status PageIndex::collect_skipped_page_range(std::vector<ExprContext*> 
conjuncts,
-                                             std::vector<int> page_range) {
-    return Status();
+Status PageIndex::collect_skipped_page_range(tparquet::ColumnIndex* 
column_index,
+                                             std::vector<ExprContext*> 
conjuncts,
+                                             std::vector<int> skipped_ranges) {
+    const vector<std::string>& encoded_min_vals = column_index->min_values;
+    const vector<std::string>& encoded_max_vals = column_index->max_values;
+    DCHECK_EQ(encoded_min_vals.size(), encoded_max_vals.size());
+
+    const int num_of_pages = column_index->null_pages.size();
+    for (int page_id = 0; page_id < num_of_pages; page_id++) {
+        for (int i = 0; i < conjuncts.size(); i++) {
+            ExprContext* conjunct_expr = conjuncts[i];
+            if (conjunct_expr->root()->get_child(1) == nullptr) {
+                // conjunct value is null
+                continue;
+            }
+            //        bool is_null_page = column_index->null_pages[page_id];
+            //        if (UNLIKELY(is_null_page) && is_not_null_predicate()) {
+            //             skipped_ranges.emplace_back(page_id);
+            //        }
+            if (_filter_page_by_min_max(conjunct_expr, 
encoded_min_vals[page_id],
+                                        encoded_max_vals[page_id])) {
+                skipped_ranges.emplace_back(page_id);
+                break;
+            }
+        }
+    }
+    LOG(WARNING) << "skipped_ranges.size()=" << skipped_ranges.size();

Review Comment:
   Too many logs. Should remove, or use `DEBUG` instead.



##########
be/src/vec/exec/format/parquet/vparquet_reader.cpp:
##########
@@ -87,14 +89,14 @@ Status ParquetReader::_init_read_columns(const 
std::vector<SlotDescriptor*>& tup
         auto parquet_col_id = iter->second;
         if (iter != _map_column.end()) {
             _include_column_ids.emplace_back(parquet_col_id);
+            ParquetReadColumn column(parquet_col_id, slot_desc);
+            _read_columns.emplace_back(column);

Review Comment:
   Can write `_read_columns.emplace_back(parquet_col_id, slot_desc)` directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to