morningman commented on code in PR #22115:
URL: https://github.com/apache/doris/pull/22115#discussion_r1271375451


##########
be/src/vec/exec/format/parquet/vparquet_reader.cpp:
##########
@@ -511,6 +511,34 @@ Status 
ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor
     return Status::OK();
 }
 
+Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* 
eof,
+                                     TPushAggOp::type push_down_agg_type_opt) {
+    if (push_down_agg_type_opt != TPushAggOp::type::COUNT) {
+        return Status::NotSupported("min/max push down is not supported for 
parquet files");
+    }
+    size_t rows = 0;
+
+    // out of use _t_metadata->num_rows , because for the same file,
+    // the optimizer may generate multiple VFileScanner with different 
_scan_range
+    while (_read_row_groups.size() > 0) {
+        _next_row_group_reader();
+        rows += _current_group_reader->get__remaining_rows();
+    }
+
+    //fill one column is enough
+    auto cols = block->mutate_columns();
+    for (auto& col : cols) {
+        col->resize(rows);

Review Comment:
   The `rows` maybe too large for the resize?
   Normally, a block only return 4096 rows. But here you may return unlimited 
rows.
   I think it should be splitted in batch?



##########
be/src/vec/exec/format/generic_reader.h:
##########
@@ -31,6 +31,12 @@ class Block;
 class GenericReader {
 public:
     virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) 
= 0;
+
+    virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof,

Review Comment:
   How about merge these 2 methods?



##########
be/src/vec/exec/scan/vscan_node.h:
##########
@@ -351,6 +351,9 @@ class VScanNode : public ExecNode, public 
RuntimeFilterConsumer {
     std::unordered_map<std::string, int> _colname_to_slot_id;
     std::vector<int> _col_distribute_ids;
 
+public:
+    TPushAggOp::type push_down_agg_type_opt;

Review Comment:
   Better not using public to define a field



##########
fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java:
##########
@@ -1363,8 +1360,8 @@ protected void toThrift(TPlanNode msg) {
         msg.olap_scan_node.setTableName(olapTable.getName());
         
msg.olap_scan_node.setEnableUniqueKeyMergeOnWrite(olapTable.getEnableUniqueKeyMergeOnWrite());
 
-        if (pushDownAggNoGroupingOp != null) {
-            msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
+        if (pushDownAggNoGroupingOp != TPushAggOp.NONE) {
+            msg.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);

Review Comment:
   I think we can ALWAYS set this field



##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java:
##########
@@ -310,4 +299,28 @@ private void genSlotToSchemaIdMap() {
         }
         params.setSlotNameToSchemaPos(columnNameToPosition);
     }
+
+    @Override
+    public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) {
+        TFileFormatType fileFormatType;
+        try {
+            fileFormatType = getFileFormatType();
+        } catch (UserException e) {
+            throw new RuntimeException(e);
+        }
+
+        String aggFunctionName = aggExpr.getFnName().getFunction();
+        if (aggFunctionName.equalsIgnoreCase("COUNT") && fileFormatType == 
TFileFormatType.FORMAT_PARQUET) {

Review Comment:
   Need to implement orc too



##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -638,12 +638,11 @@ struct TOlapScanNode {
   // It's limit for scanner instead of scanNode so we add a new limit.
   10: optional i64 sort_limit
   11: optional bool enable_unique_key_merge_on_write
-  12: optional TPushAggOp push_down_agg_type_opt
-  13: optional bool use_topn_opt
-  14: optional list<Descriptors.TOlapTableIndex> indexes_desc
-  15: optional set<i32> output_column_unique_ids
-  16: optional list<i32> distribute_column_ids
-  17: optional i32 schema_version
+  12: optional bool use_topn_opt

Review Comment:
   You can't modify the origin structure of thrift, or it will cause problem 
when upgrading.
   You can mark the old `push_down_agg_type_opt` as `Deprecated`, and make some 
compatibility
   when visiting this field



##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java:
##########
@@ -310,4 +299,28 @@ private void genSlotToSchemaIdMap() {
         }
         params.setSlotNameToSchemaPos(columnNameToPosition);
     }
+
+    @Override
+    public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) {
+        TFileFormatType fileFormatType;
+        try {
+            fileFormatType = getFileFormatType();
+        } catch (UserException e) {
+            throw new RuntimeException(e);
+        }
+
+        String aggFunctionName = aggExpr.getFnName().getFunction();
+        if (aggFunctionName.equalsIgnoreCase("COUNT") && fileFormatType == 
TFileFormatType.FORMAT_PARQUET) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, 
Column col) {

Review Comment:
   For external table, always return false.



##########
be/src/vec/exec/scan/vfile_scanner.cpp:
##########
@@ -245,7 +245,19 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, 
Block* block, bool* eo
         RETURN_IF_ERROR(_init_src_block(block));
         {
             SCOPED_TIMER(_get_block_timer);
+
             // Read next block.
+
+            if (_parent->push_down_agg_type_opt != TPushAggOp::type ::NONE) {
+                //Prevent FE  misjudging the "select count/min/max ..." 
statement
+                if (Status::OK() == 
_cur_reader->get_next_block(_src_block_ptr, &read_rows,

Review Comment:
   So if here `_cur_reader->get_next_block` return error, it will go on calling 
another `get_next_block()`, just like a retry?



##########
be/src/vec/exec/scan/vfile_scanner.cpp:
##########
@@ -245,7 +245,19 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, 
Block* block, bool* eo
         RETURN_IF_ERROR(_init_src_block(block));
         {
             SCOPED_TIMER(_get_block_timer);
+
             // Read next block.
+
+            if (_parent->push_down_agg_type_opt != TPushAggOp::type ::NONE) {
+                //Prevent FE  misjudging the "select count/min/max ..." 
statement
+                if (Status::OK() == 
_cur_reader->get_next_block(_src_block_ptr, &read_rows,

Review Comment:
   I suggest that we should make sure FE give the right plan, and here we just 
use `if...else`.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java:
##########
@@ -278,16 +274,9 @@ protected Map<String, String> getLocationProperties() 
throws UserException  {
     @Override
     protected TFileAttributes getFileAttributes() throws UserException {
         TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
-        java.util.Map<String, String> delimiter = 
hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters();
-        
textParams.setColumnSeparator(delimiter.getOrDefault(PROP_FIELD_DELIMITER, 
DEFAULT_FIELD_DELIMITER));
-        
textParams.setLineDelimiter(delimiter.getOrDefault(PROP_LINE_DELIMITER, 
DEFAULT_LINE_DELIMITER));
-        if (delimiter.get(PROP_ARRAY_DELIMITER_HIVE2) != null) {
-            
textParams.setArrayDelimiter(delimiter.get(PROP_ARRAY_DELIMITER_HIVE2));
-        } else if (delimiter.get(PROP_ARRAY_DELIMITER_HIVE3) != null) {
-            
textParams.setArrayDelimiter(delimiter.get(PROP_ARRAY_DELIMITER_HIVE3));
-        } else {
-            textParams.setArrayDelimiter(DEFAULT_ARRAY_DELIMITER);
-        }
+        
textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters()
+                .getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER));
+        textParams.setLineDelimiter(DEFAULT_LINE_DELIMITER);

Review Comment:
   Why changing this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to