morningman commented on code in PR #22115: URL: https://github.com/apache/doris/pull/22115#discussion_r1271375451
########## be/src/vec/exec/format/parquet/vparquet_reader.cpp: ########## @@ -511,6 +511,34 @@ Status ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor return Status::OK(); } +Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof, + TPushAggOp::type push_down_agg_type_opt) { + if (push_down_agg_type_opt != TPushAggOp::type::COUNT) { + return Status::NotSupported("min/max push down is not supported for parquet files"); + } + size_t rows = 0; + + // out of use _t_metadata->num_rows , because for the same file, + // the optimizer may generate multiple VFileScanner with different _scan_range + while (_read_row_groups.size() > 0) { + _next_row_group_reader(); + rows += _current_group_reader->get__remaining_rows(); + } + + //fill one column is enough + auto cols = block->mutate_columns(); + for (auto& col : cols) { + col->resize(rows); Review Comment: The `rows` maybe too large for the resize? Normally, a block only return 4096 rows. But here you may return unlimited rows. I think it should be splitted in batch? ########## be/src/vec/exec/format/generic_reader.h: ########## @@ -31,6 +31,12 @@ class Block; class GenericReader { public: virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0; + + virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof, Review Comment: How about merge these 2 methods? ########## be/src/vec/exec/scan/vscan_node.h: ########## @@ -351,6 +351,9 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { std::unordered_map<std::string, int> _colname_to_slot_id; std::vector<int> _col_distribute_ids; +public: + TPushAggOp::type push_down_agg_type_opt; Review Comment: Better not using public to define a field ########## fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java: ########## @@ -1363,8 +1360,8 @@ protected void toThrift(TPlanNode msg) { msg.olap_scan_node.setTableName(olapTable.getName()); msg.olap_scan_node.setEnableUniqueKeyMergeOnWrite(olapTable.getEnableUniqueKeyMergeOnWrite()); - if (pushDownAggNoGroupingOp != null) { - msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); + if (pushDownAggNoGroupingOp != TPushAggOp.NONE) { + msg.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); Review Comment: I think we can ALWAYS set this field ########## fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java: ########## @@ -310,4 +299,28 @@ private void genSlotToSchemaIdMap() { } params.setSlotNameToSchemaPos(columnNameToPosition); } + + @Override + public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) { + TFileFormatType fileFormatType; + try { + fileFormatType = getFileFormatType(); + } catch (UserException e) { + throw new RuntimeException(e); + } + + String aggFunctionName = aggExpr.getFnName().getFunction(); + if (aggFunctionName.equalsIgnoreCase("COUNT") && fileFormatType == TFileFormatType.FORMAT_PARQUET) { Review Comment: Need to implement orc too ########## gensrc/thrift/PlanNodes.thrift: ########## @@ -638,12 +638,11 @@ struct TOlapScanNode { // It's limit for scanner instead of scanNode so we add a new limit. 10: optional i64 sort_limit 11: optional bool enable_unique_key_merge_on_write - 12: optional TPushAggOp push_down_agg_type_opt - 13: optional bool use_topn_opt - 14: optional list<Descriptors.TOlapTableIndex> indexes_desc - 15: optional set<i32> output_column_unique_ids - 16: optional list<i32> distribute_column_ids - 17: optional i32 schema_version + 12: optional bool use_topn_opt Review Comment: You can't modify the origin structure of thrift, or it will cause problem when upgrading. You can mark the old `push_down_agg_type_opt` as `Deprecated`, and make some compatibility when visiting this field ########## fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java: ########## @@ -310,4 +299,28 @@ private void genSlotToSchemaIdMap() { } params.setSlotNameToSchemaPos(columnNameToPosition); } + + @Override + public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) { + TFileFormatType fileFormatType; + try { + fileFormatType = getFileFormatType(); + } catch (UserException e) { + throw new RuntimeException(e); + } + + String aggFunctionName = aggExpr.getFnName().getFunction(); + if (aggFunctionName.equalsIgnoreCase("COUNT") && fileFormatType == TFileFormatType.FORMAT_PARQUET) { + return true; + } + return false; + } + + @Override + public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, Column col) { Review Comment: For external table, always return false. ########## be/src/vec/exec/scan/vfile_scanner.cpp: ########## @@ -245,7 +245,19 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo RETURN_IF_ERROR(_init_src_block(block)); { SCOPED_TIMER(_get_block_timer); + // Read next block. + + if (_parent->push_down_agg_type_opt != TPushAggOp::type ::NONE) { + //Prevent FE misjudging the "select count/min/max ..." statement + if (Status::OK() == _cur_reader->get_next_block(_src_block_ptr, &read_rows, Review Comment: So if here `_cur_reader->get_next_block` return error, it will go on calling another `get_next_block()`, just like a retry? ########## be/src/vec/exec/scan/vfile_scanner.cpp: ########## @@ -245,7 +245,19 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo RETURN_IF_ERROR(_init_src_block(block)); { SCOPED_TIMER(_get_block_timer); + // Read next block. + + if (_parent->push_down_agg_type_opt != TPushAggOp::type ::NONE) { + //Prevent FE misjudging the "select count/min/max ..." statement + if (Status::OK() == _cur_reader->get_next_block(_src_block_ptr, &read_rows, Review Comment: I suggest that we should make sure FE give the right plan, and here we just use `if...else`. ########## fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java: ########## @@ -278,16 +274,9 @@ protected Map<String, String> getLocationProperties() throws UserException { @Override protected TFileAttributes getFileAttributes() throws UserException { TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); - java.util.Map<String, String> delimiter = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters(); - textParams.setColumnSeparator(delimiter.getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER)); - textParams.setLineDelimiter(delimiter.getOrDefault(PROP_LINE_DELIMITER, DEFAULT_LINE_DELIMITER)); - if (delimiter.get(PROP_ARRAY_DELIMITER_HIVE2) != null) { - textParams.setArrayDelimiter(delimiter.get(PROP_ARRAY_DELIMITER_HIVE2)); - } else if (delimiter.get(PROP_ARRAY_DELIMITER_HIVE3) != null) { - textParams.setArrayDelimiter(delimiter.get(PROP_ARRAY_DELIMITER_HIVE3)); - } else { - textParams.setArrayDelimiter(DEFAULT_ARRAY_DELIMITER); - } + textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters() + .getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER)); + textParams.setLineDelimiter(DEFAULT_LINE_DELIMITER); Review Comment: Why changing this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org