Copilot commented on code in PR #59123: URL: https://github.com/apache/doris/pull/59123#discussion_r2638492063
########## regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_min_max.groovy: ########## @@ -0,0 +1,221 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_optimize_min_max", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "test_iceberg_optimize_count" + + try { + + sql """drop catalog if exists ${catalog_name}""" + sql """CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """ switch ${catalog_name} """ + sql """ use format_v2 """ + + sql """ set enable_iceberg_min_max_optimization=false; """ + String SQLSTR = """ + MIN(id) AS id_min, + MAX(id) AS id_max, + MIN(col_boolean) AS col_boolean_min, + MAX(col_boolean) AS col_boolean_max, + MIN(col_short) AS col_short_min, + MAX(col_short) AS col_short_max, + MIN(col_byte) AS col_byte_min, + MAX(col_byte) AS col_byte_max, + MIN(col_integer) AS col_integer_min, + MAX(col_integer) AS col_integer_max, + MIN(col_long) AS col_long_min, + MAX(col_long) AS col_long_max, + MIN(col_float) AS col_float_min, + MAX(col_float) AS col_float_max, + MIN(col_double) AS col_double_min, + MAX(col_double) AS col_double_max, + MIN(col_date) AS col_date_min, + MAX(col_date) AS col_date_max + """; + def sqlstr1 = """ SELECT ${SQLSTR} FROM sample_mor_parquet; """ + def sqlstr2 = """ select ${SQLSTR} from sample_cow_parquet; """ + def sqlstr3 = """ select ${SQLSTR} from sample_mor_orc; """ + def sqlstr4 = """ select ${SQLSTR} from sample_mor_parquet; """ + + // don't use push down count + sql """ set enable_iceberg_min_max_optimization=false; """ + qt_false01 """${sqlstr1}""" + qt_false02 """${sqlstr2}""" + qt_false03 """${sqlstr3}""" + qt_false04 """${sqlstr4}""" + + // use push down count + sql """ set enable_iceberg_min_max_optimization=true; """ + for (String val: ["1K", "0"]) { + sql "set file_split_size=${val}" + qt_q01 """${sqlstr1}""" + qt_q02 """${sqlstr2}""" + qt_q03 """${sqlstr3}""" + qt_q04 """${sqlstr4}""" + } + sql "unset variable file_split_size;" + + // traditional mode + sql """set num_files_in_batch_mode=100000""" + explain { + sql("""select * from sample_cow_orc""") + notContains "approximate" + } + explain { + sql("""${sqlstr1}""") + contains """pushdown agg=MINMAX""" + } + explain { + sql("""select * from sample_cow_parquet""") + notContains "approximate" + } + explain { + sql("""${sqlstr2}""") + contains """pushdown agg=MINMAX""" + } + explain { + sql("""select * from sample_mor_orc""") + notContains "approximate" + } + explain { + sql("""${sqlstr3}""") + contains """pushdown agg=MINMAX""" + } + // because it has dangling delete + explain { + sql("""${sqlstr4}""") + contains """pushdown agg=MINMAX""" + } + + // batch mode + sql """set num_files_in_batch_mode=1""" + explain { + sql("""select * from sample_cow_orc""") + contains "approximate" + } + explain { + sql("""${sqlstr1}""") + contains """pushdown agg=MINMAX""" + } + explain { + sql("""select * from sample_cow_parquet""") + contains "approximate" + } + explain { + sql("""${sqlstr2}""") + contains """pushdown agg=MINMAX""" + } + explain { + sql("""select * from sample_mor_orc""") + contains "approximate" + } + explain { + sql("""${sqlstr3}""") + contains """pushdown agg=MINMAX""" + } + explain { + sql("""select * from sample_mor_parquet""") + contains "approximate" + } + // because it has dangling delete + explain { + sql("""${sqlstr4}""") + contains """pushdown agg=MINMAX""" + contains "approximate" + } + + // don't use push down count Review Comment: The comment at line 158 says "don't use push down count" which is incorrect. This should say "don't use push down min/max" since this test is about min/max optimization, not count optimization. This is likely a copy-paste error from the count optimization test. ```suggestion // don't use push down min/max ``` ########## be/src/vec/exec/format/table/iceberg_reader.cpp: ########## @@ -146,6 +157,82 @@ Status IcebergTableReader::init_row_filters() { return Status::OK(); } +Status IcebergTableReader::_insert_min_max_value_column(Block* block) { + const auto& min_max_values = _range.min_max_values; + auto mutate_columns = block->mutate_columns(); + for (const auto& [slot_id, min_max_value] : min_max_values) { + size_t column_index = 0; + bool found = false; + for (size_t i = 0; i < _params.required_slots.size(); ++i) { + if (_params.required_slots[i].slot_id == slot_id) { + column_index = i; + found = true; + break; + } + } + + if (!found) { + return Status::InternalError( + "Slot id {} from min_max_values not found in required_slots", slot_id); + } + + if (column_index >= mutate_columns.size()) { + return Status::InternalError("Column index {} out of range, mutate_columns size: {}", + column_index, mutate_columns.size()); + } + + auto& column = mutate_columns[column_index]; + RETURN_IF_ERROR(_insert_value_to_column(column, min_max_value)); + } + block->set_columns(std::move(mutate_columns)); + return Status::OK(); +} + +Status IcebergTableReader::_insert_value_to_column(MutableColumnPtr& column, + const TExprMinMaxValue& value) { + if (value.has_null) { + column->insert_default(); + } + if (value.type != TPrimitiveType::NULL_TYPE) { + switch (value.type) { + case TPrimitiveType::BOOLEAN: + case TPrimitiveType::TINYINT: + case TPrimitiveType::SMALLINT: + case TPrimitiveType::INT: + case TPrimitiveType::BIGINT: { + column->insert_data(reinterpret_cast<const char*>(&value.min_int_value), + sizeof(int64_t)); + column->insert_data(reinterpret_cast<const char*>(&value.max_int_value), + sizeof(int64_t)); + break; + } + case TPrimitiveType::DOUBLE: + case TPrimitiveType::FLOAT: { + column->insert_data(reinterpret_cast<const char*>(&value.min_float_value), + sizeof(double)); + column->insert_data(reinterpret_cast<const char*>(&value.max_float_value), + sizeof(double)); + break; + } + case TPrimitiveType::DATE: { + auto min_date = static_cast<uint32_t>(value.min_int_value); + auto max_date = static_cast<uint32_t>(value.max_int_value); + column->insert_data(reinterpret_cast<const char*>(&min_date), sizeof(uint32_t)); + column->insert_data(reinterpret_cast<const char*>(&max_date), sizeof(uint32_t)); + break; + } + + default: + return Status::InternalError("Unsupported TExprNodeType {}", value.type); Review Comment: The error message at line 226 says "Unsupported TExprNodeType" but the parameter being formatted is `value.type`, which is of type `TPrimitiveType`, not `TExprNodeType`. The error message should say "Unsupported TPrimitiveType" to accurately reflect what is being checked. ```suggestion return Status::InternalError("Unsupported TPrimitiveType {}", value.type); ``` ########## be/src/vec/exec/format/table/iceberg_reader.cpp: ########## @@ -146,6 +157,82 @@ Status IcebergTableReader::init_row_filters() { return Status::OK(); } +Status IcebergTableReader::_insert_min_max_value_column(Block* block) { + const auto& min_max_values = _range.min_max_values; + auto mutate_columns = block->mutate_columns(); + for (const auto& [slot_id, min_max_value] : min_max_values) { + size_t column_index = 0; + bool found = false; + for (size_t i = 0; i < _params.required_slots.size(); ++i) { + if (_params.required_slots[i].slot_id == slot_id) { + column_index = i; + found = true; + break; + } + } + + if (!found) { + return Status::InternalError( + "Slot id {} from min_max_values not found in required_slots", slot_id); + } + + if (column_index >= mutate_columns.size()) { + return Status::InternalError("Column index {} out of range, mutate_columns size: {}", + column_index, mutate_columns.size()); + } + + auto& column = mutate_columns[column_index]; + RETURN_IF_ERROR(_insert_value_to_column(column, min_max_value)); + } + block->set_columns(std::move(mutate_columns)); + return Status::OK(); +} + +Status IcebergTableReader::_insert_value_to_column(MutableColumnPtr& column, + const TExprMinMaxValue& value) { + if (value.has_null) { + column->insert_default(); + } + if (value.type != TPrimitiveType::NULL_TYPE) { + switch (value.type) { + case TPrimitiveType::BOOLEAN: + case TPrimitiveType::TINYINT: + case TPrimitiveType::SMALLINT: + case TPrimitiveType::INT: + case TPrimitiveType::BIGINT: { + column->insert_data(reinterpret_cast<const char*>(&value.min_int_value), + sizeof(int64_t)); + column->insert_data(reinterpret_cast<const char*>(&value.max_int_value), + sizeof(int64_t)); Review Comment: There's an inconsistency between the Java and C++ code for handling integer types. In the Java code (IcebergUtils.java line 1586-1587, 1591-1607), BOOLEAN, TINYINT, SMALLINT, INT, and DATE types all set min/max values using setMinIntValue/setMaxIntValue. However, in the C++ code (iceberg_reader.cpp lines 203-206), these types are inserted using sizeof(int64_t) which forces all values to be 8 bytes. This is incorrect for TINYINT (1 byte), SMALLINT (2 bytes), INT (4 bytes), BOOLEAN (1 byte), and DATE (4 bytes in Doris DATEV2). The insert_data call should match the actual column type size, not use sizeof(int64_t) for all integer types. For example: - BOOLEAN should insert 1 byte (bool) - TINYINT should insert 1 byte (int8_t) - SMALLINT should insert 2 bytes (int16_t) - INT should insert 4 bytes (int32_t) - BIGINT should insert 8 bytes (int64_t) The DATE handling at lines 217-222 correctly uses uint32_t and sizeof(uint32_t), which is the right approach and should be applied to other integer types as well. ```suggestion case TPrimitiveType::BOOLEAN: { auto min_val = static_cast<int8_t>(value.min_int_value); auto max_val = static_cast<int8_t>(value.max_int_value); column->insert_data(reinterpret_cast<const char*>(&min_val), sizeof(int8_t)); column->insert_data(reinterpret_cast<const char*>(&max_val), sizeof(int8_t)); break; } case TPrimitiveType::TINYINT: { auto min_val = static_cast<int8_t>(value.min_int_value); auto max_val = static_cast<int8_t>(value.max_int_value); column->insert_data(reinterpret_cast<const char*>(&min_val), sizeof(int8_t)); column->insert_data(reinterpret_cast<const char*>(&max_val), sizeof(int8_t)); break; } case TPrimitiveType::SMALLINT: { auto min_val = static_cast<int16_t>(value.min_int_value); auto max_val = static_cast<int16_t>(value.max_int_value); column->insert_data(reinterpret_cast<const char*>(&min_val), sizeof(int16_t)); column->insert_data(reinterpret_cast<const char*>(&max_val), sizeof(int16_t)); break; } case TPrimitiveType::INT: { auto min_val = static_cast<int32_t>(value.min_int_value); auto max_val = static_cast<int32_t>(value.max_int_value); column->insert_data(reinterpret_cast<const char*>(&min_val), sizeof(int32_t)); column->insert_data(reinterpret_cast<const char*>(&max_val), sizeof(int32_t)); break; } case TPrimitiveType::BIGINT: { auto min_val = static_cast<int64_t>(value.min_int_value); auto max_val = static_cast<int64_t>(value.max_int_value); column->insert_data(reinterpret_cast<const char*>(&min_val), sizeof(int64_t)); column->insert_data(reinterpret_cast<const char*>(&max_val), sizeof(int64_t)); ``` ########## be/src/vec/exec/format/table/iceberg_reader.cpp: ########## @@ -146,6 +157,82 @@ Status IcebergTableReader::init_row_filters() { return Status::OK(); } +Status IcebergTableReader::_insert_min_max_value_column(Block* block) { + const auto& min_max_values = _range.min_max_values; + auto mutate_columns = block->mutate_columns(); + for (const auto& [slot_id, min_max_value] : min_max_values) { + size_t column_index = 0; + bool found = false; + for (size_t i = 0; i < _params.required_slots.size(); ++i) { + if (_params.required_slots[i].slot_id == slot_id) { + column_index = i; + found = true; + break; + } + } + + if (!found) { + return Status::InternalError( + "Slot id {} from min_max_values not found in required_slots", slot_id); + } + + if (column_index >= mutate_columns.size()) { + return Status::InternalError("Column index {} out of range, mutate_columns size: {}", + column_index, mutate_columns.size()); + } + + auto& column = mutate_columns[column_index]; + RETURN_IF_ERROR(_insert_value_to_column(column, min_max_value)); + } + block->set_columns(std::move(mutate_columns)); + return Status::OK(); +} + +Status IcebergTableReader::_insert_value_to_column(MutableColumnPtr& column, + const TExprMinMaxValue& value) { + if (value.has_null) { + column->insert_default(); + } + if (value.type != TPrimitiveType::NULL_TYPE) { + switch (value.type) { + case TPrimitiveType::BOOLEAN: + case TPrimitiveType::TINYINT: + case TPrimitiveType::SMALLINT: + case TPrimitiveType::INT: + case TPrimitiveType::BIGINT: { + column->insert_data(reinterpret_cast<const char*>(&value.min_int_value), + sizeof(int64_t)); + column->insert_data(reinterpret_cast<const char*>(&value.max_int_value), + sizeof(int64_t)); + break; + } + case TPrimitiveType::DOUBLE: + case TPrimitiveType::FLOAT: { + column->insert_data(reinterpret_cast<const char*>(&value.min_float_value), + sizeof(double)); + column->insert_data(reinterpret_cast<const char*>(&value.max_float_value), + sizeof(double)); + break; + } + case TPrimitiveType::DATE: { + auto min_date = static_cast<uint32_t>(value.min_int_value); + auto max_date = static_cast<uint32_t>(value.max_int_value); + column->insert_data(reinterpret_cast<const char*>(&min_date), sizeof(uint32_t)); + column->insert_data(reinterpret_cast<const char*>(&max_date), sizeof(uint32_t)); + break; + } Review Comment: The case at line 217 handles `TPrimitiveType::DATE`, but in the Java code (IcebergUtils.java line 1605), when the slot type is DATE, it sets the thrift type to `TPrimitiveType.DATEV2`. This means that in the C++ code, we're looking for the wrong type constant. The case should be `case TPrimitiveType::DATEV2:` instead of `case TPrimitiveType::DATE:` to match what's sent from the frontend. Alternatively, if DATE is intentional here, then the frontend should set TPrimitiveType.DATE instead of DATEV2, but based on the codebase convention (lines 481-482 in tablet_schema.cpp show DATEV2 is 4 bytes while DATE is 3 bytes), DATEV2 seems to be the correct type to use. ########## be/src/vec/exec/format/table/iceberg_reader.cpp: ########## @@ -146,6 +157,82 @@ Status IcebergTableReader::init_row_filters() { return Status::OK(); } +Status IcebergTableReader::_insert_min_max_value_column(Block* block) { + const auto& min_max_values = _range.min_max_values; + auto mutate_columns = block->mutate_columns(); + for (const auto& [slot_id, min_max_value] : min_max_values) { + size_t column_index = 0; + bool found = false; + for (size_t i = 0; i < _params.required_slots.size(); ++i) { + if (_params.required_slots[i].slot_id == slot_id) { + column_index = i; + found = true; + break; + } + } + + if (!found) { + return Status::InternalError( + "Slot id {} from min_max_values not found in required_slots", slot_id); + } + + if (column_index >= mutate_columns.size()) { + return Status::InternalError("Column index {} out of range, mutate_columns size: {}", + column_index, mutate_columns.size()); + } + + auto& column = mutate_columns[column_index]; + RETURN_IF_ERROR(_insert_value_to_column(column, min_max_value)); + } + block->set_columns(std::move(mutate_columns)); + return Status::OK(); +} + +Status IcebergTableReader::_insert_value_to_column(MutableColumnPtr& column, + const TExprMinMaxValue& value) { + if (value.has_null) { + column->insert_default(); + } + if (value.type != TPrimitiveType::NULL_TYPE) { + switch (value.type) { + case TPrimitiveType::BOOLEAN: + case TPrimitiveType::TINYINT: + case TPrimitiveType::SMALLINT: + case TPrimitiveType::INT: + case TPrimitiveType::BIGINT: { + column->insert_data(reinterpret_cast<const char*>(&value.min_int_value), + sizeof(int64_t)); + column->insert_data(reinterpret_cast<const char*>(&value.max_int_value), + sizeof(int64_t)); + break; + } + case TPrimitiveType::DOUBLE: + case TPrimitiveType::FLOAT: { + column->insert_data(reinterpret_cast<const char*>(&value.min_float_value), + sizeof(double)); + column->insert_data(reinterpret_cast<const char*>(&value.max_float_value), + sizeof(double)); + break; + } + case TPrimitiveType::DATE: { + auto min_date = static_cast<uint32_t>(value.min_int_value); + auto max_date = static_cast<uint32_t>(value.max_int_value); + column->insert_data(reinterpret_cast<const char*>(&min_date), sizeof(uint32_t)); + column->insert_data(reinterpret_cast<const char*>(&max_date), sizeof(uint32_t)); + break; + } + + default: + return Status::InternalError("Unsupported TExprNodeType {}", value.type); + } + } + while (column->size() < 3) { + column->insert_from(*column, 0); Review Comment: The logic at lines 229-231 uses `insert_from(*column, 0)` to pad the column to size 3. However, this only works correctly if the column already has at least one element. If `value.type == TPrimitiveType::NULL_TYPE`, no values are inserted (line 196 checks this), which means the column might still be empty when we reach line 229. In that case, `insert_from(*column, 0)` will fail because there's no element at index 0 to copy from. The code should either: 1. Ensure at least one element is inserted before the while loop (e.g., insert a default value for NULL_TYPE case), or 2. Handle the empty column case explicitly by inserting default values instead of trying to copy from index 0 ```suggestion if (column->empty()) { // When no values were inserted (e.g., NULL_TYPE without has_null), // pad the column with default values instead of copying from index 0. while (column->size() < 3) { column->insert_default(); } } else { while (column->size() < 3) { column->insert_from(*column, 0); } ``` ########## regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_min_max.groovy: ########## @@ -0,0 +1,221 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_optimize_min_max", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "test_iceberg_optimize_count" + + try { + + sql """drop catalog if exists ${catalog_name}""" + sql """CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """ switch ${catalog_name} """ + sql """ use format_v2 """ + + sql """ set enable_iceberg_min_max_optimization=false; """ + String SQLSTR = """ + MIN(id) AS id_min, + MAX(id) AS id_max, + MIN(col_boolean) AS col_boolean_min, + MAX(col_boolean) AS col_boolean_max, + MIN(col_short) AS col_short_min, + MAX(col_short) AS col_short_max, + MIN(col_byte) AS col_byte_min, + MAX(col_byte) AS col_byte_max, + MIN(col_integer) AS col_integer_min, + MAX(col_integer) AS col_integer_max, + MIN(col_long) AS col_long_min, + MAX(col_long) AS col_long_max, + MIN(col_float) AS col_float_min, + MAX(col_float) AS col_float_max, + MIN(col_double) AS col_double_min, + MAX(col_double) AS col_double_max, + MIN(col_date) AS col_date_min, + MAX(col_date) AS col_date_max + """; + def sqlstr1 = """ SELECT ${SQLSTR} FROM sample_mor_parquet; """ + def sqlstr2 = """ select ${SQLSTR} from sample_cow_parquet; """ + def sqlstr3 = """ select ${SQLSTR} from sample_mor_orc; """ + def sqlstr4 = """ select ${SQLSTR} from sample_mor_parquet; """ + + // don't use push down count + sql """ set enable_iceberg_min_max_optimization=false; """ + qt_false01 """${sqlstr1}""" + qt_false02 """${sqlstr2}""" + qt_false03 """${sqlstr3}""" + qt_false04 """${sqlstr4}""" + + // use push down count + sql """ set enable_iceberg_min_max_optimization=true; """ + for (String val: ["1K", "0"]) { + sql "set file_split_size=${val}" + qt_q01 """${sqlstr1}""" + qt_q02 """${sqlstr2}""" + qt_q03 """${sqlstr3}""" + qt_q04 """${sqlstr4}""" + } + sql "unset variable file_split_size;" + + // traditional mode + sql """set num_files_in_batch_mode=100000""" + explain { + sql("""select * from sample_cow_orc""") + notContains "approximate" + } + explain { + sql("""${sqlstr1}""") + contains """pushdown agg=MINMAX""" + } + explain { + sql("""select * from sample_cow_parquet""") + notContains "approximate" + } + explain { + sql("""${sqlstr2}""") + contains """pushdown agg=MINMAX""" + } + explain { + sql("""select * from sample_mor_orc""") + notContains "approximate" + } + explain { + sql("""${sqlstr3}""") + contains """pushdown agg=MINMAX""" + } + // because it has dangling delete + explain { + sql("""${sqlstr4}""") + contains """pushdown agg=MINMAX""" + } + + // batch mode + sql """set num_files_in_batch_mode=1""" + explain { + sql("""select * from sample_cow_orc""") + contains "approximate" + } + explain { + sql("""${sqlstr1}""") + contains """pushdown agg=MINMAX""" + } + explain { + sql("""select * from sample_cow_parquet""") + contains "approximate" + } + explain { + sql("""${sqlstr2}""") + contains """pushdown agg=MINMAX""" + } + explain { + sql("""select * from sample_mor_orc""") + contains "approximate" + } + explain { + sql("""${sqlstr3}""") + contains """pushdown agg=MINMAX""" + } + explain { + sql("""select * from sample_mor_parquet""") + contains "approximate" + } + // because it has dangling delete + explain { + sql("""${sqlstr4}""") + contains """pushdown agg=MINMAX""" + contains "approximate" + } + + // don't use push down count + sql """ set enable_iceberg_min_max_optimization=false; """ + + explain { + sql("""${sqlstr1}""") + contains """pushdown agg=NONE""" + } + explain { + sql("""${sqlstr2}""") + contains """pushdown agg=NONE""" + } + explain { + sql("""${sqlstr3}""") + contains """pushdown agg=NONE""" + } + explain { + sql("""${sqlstr4}""") + contains """pushdown agg=NONE""" + } + + // There has `dangling delete` after rewrite + sql """ set enable_iceberg_min_max_optimization=true; """ + def sqlstr5 = """ select min(id),max(id) from ${catalog_name}.test_db.dangling_delete_after_write; """ + explain { + sql("""${sqlstr5}""") + contains """pushdown agg=MINMAX""" + } + qt_q09 """${sqlstr5}""" + + sql """ set enable_iceberg_min_max_optimization=false""" + explain { + sql("""${sqlstr5}""") + contains """pushdown agg=NONE""" + } + qt_q10 """${sqlstr5}""" + + sql """ set enable_iceberg_min_max_optimization=true; """ + // "col_timestamp", "col_timestamp_ntz","col_decimal" is not supported, but show pushdown agg=MINMAX + // need to fix it later in explain + for (String val: [ "col_char", "col_varchar", "col_string", "col_binary","city"]) { + explain { + sql """select min(${val}),max(${val}) from ${catalog_name}.format_v2.sample_cow_orc;""" + contains """pushdown agg=NONE""" + } + explain { + sql """select min(${val}),max(${val}) from ${catalog_name}.format_v2.sample_cow_parquet;""" + contains """pushdown agg=NONE""" + } + explain { + sql """select min(${val}),max(${val}) from ${catalog_name}.format_v2.sample_mor_orc;""" + contains """pushdown agg=NONE""" + } + explain { + sql """select min(${val}),max(${val}) from ${catalog_name}.format_v2.sample_mor_parquet;""" + contains """pushdown agg=NONE""" + } + } + + + } finally { + // sql """drop catalog if exists ${catalog_name}""" Review Comment: The cleanup code at line 218 is commented out. This means the test catalog will not be cleaned up after the test runs, which can cause resource leaks in test environments. The cleanup should be uncommented to ensure proper resource management. ```suggestion sql """drop catalog if exists ${catalog_name}""" ``` ########## regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_min_max.groovy: ########## @@ -0,0 +1,221 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_optimize_min_max", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "test_iceberg_optimize_count" + + try { + + sql """drop catalog if exists ${catalog_name}""" + sql """CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """ switch ${catalog_name} """ + sql """ use format_v2 """ + + sql """ set enable_iceberg_min_max_optimization=false; """ + String SQLSTR = """ + MIN(id) AS id_min, + MAX(id) AS id_max, + MIN(col_boolean) AS col_boolean_min, + MAX(col_boolean) AS col_boolean_max, + MIN(col_short) AS col_short_min, + MAX(col_short) AS col_short_max, + MIN(col_byte) AS col_byte_min, + MAX(col_byte) AS col_byte_max, + MIN(col_integer) AS col_integer_min, + MAX(col_integer) AS col_integer_max, + MIN(col_long) AS col_long_min, + MAX(col_long) AS col_long_max, + MIN(col_float) AS col_float_min, + MAX(col_float) AS col_float_max, + MIN(col_double) AS col_double_min, + MAX(col_double) AS col_double_max, + MIN(col_date) AS col_date_min, + MAX(col_date) AS col_date_max + """; + def sqlstr1 = """ SELECT ${SQLSTR} FROM sample_mor_parquet; """ + def sqlstr2 = """ select ${SQLSTR} from sample_cow_parquet; """ + def sqlstr3 = """ select ${SQLSTR} from sample_mor_orc; """ + def sqlstr4 = """ select ${SQLSTR} from sample_mor_parquet; """ + + // don't use push down count + sql """ set enable_iceberg_min_max_optimization=false; """ + qt_false01 """${sqlstr1}""" + qt_false02 """${sqlstr2}""" + qt_false03 """${sqlstr3}""" + qt_false04 """${sqlstr4}""" + + // use push down count Review Comment: The comment at line 78 says "use push down count" which is incorrect. This should say "use push down min/max" since this test is about min/max optimization, not count optimization. This is likely a copy-paste error from the count optimization test. ```suggestion // use push down min/max ``` ########## be/src/vec/exec/format/table/iceberg_reader.cpp: ########## @@ -146,6 +157,82 @@ Status IcebergTableReader::init_row_filters() { return Status::OK(); } +Status IcebergTableReader::_insert_min_max_value_column(Block* block) { + const auto& min_max_values = _range.min_max_values; + auto mutate_columns = block->mutate_columns(); + for (const auto& [slot_id, min_max_value] : min_max_values) { + size_t column_index = 0; + bool found = false; + for (size_t i = 0; i < _params.required_slots.size(); ++i) { + if (_params.required_slots[i].slot_id == slot_id) { + column_index = i; + found = true; + break; + } + } + + if (!found) { + return Status::InternalError( + "Slot id {} from min_max_values not found in required_slots", slot_id); + } + + if (column_index >= mutate_columns.size()) { + return Status::InternalError("Column index {} out of range, mutate_columns size: {}", + column_index, mutate_columns.size()); + } + + auto& column = mutate_columns[column_index]; + RETURN_IF_ERROR(_insert_value_to_column(column, min_max_value)); + } + block->set_columns(std::move(mutate_columns)); + return Status::OK(); +} + +Status IcebergTableReader::_insert_value_to_column(MutableColumnPtr& column, + const TExprMinMaxValue& value) { + if (value.has_null) { + column->insert_default(); + } + if (value.type != TPrimitiveType::NULL_TYPE) { + switch (value.type) { + case TPrimitiveType::BOOLEAN: + case TPrimitiveType::TINYINT: + case TPrimitiveType::SMALLINT: + case TPrimitiveType::INT: + case TPrimitiveType::BIGINT: { + column->insert_data(reinterpret_cast<const char*>(&value.min_int_value), + sizeof(int64_t)); + column->insert_data(reinterpret_cast<const char*>(&value.max_int_value), + sizeof(int64_t)); + break; + } + case TPrimitiveType::DOUBLE: + case TPrimitiveType::FLOAT: { + column->insert_data(reinterpret_cast<const char*>(&value.min_float_value), + sizeof(double)); + column->insert_data(reinterpret_cast<const char*>(&value.max_float_value), + sizeof(double)); + break; + } + case TPrimitiveType::DATE: { + auto min_date = static_cast<uint32_t>(value.min_int_value); + auto max_date = static_cast<uint32_t>(value.max_int_value); + column->insert_data(reinterpret_cast<const char*>(&min_date), sizeof(uint32_t)); + column->insert_data(reinterpret_cast<const char*>(&max_date), sizeof(uint32_t)); + break; + } + + default: + return Status::InternalError("Unsupported TExprNodeType {}", value.type); + } + } + while (column->size() < 3) { + column->insert_from(*column, 0); Review Comment: When `value.has_null` is true, line 194 calls `column->insert_default()` to insert a null/default value. However, for the non-null case (lines 196-228), the code inserts exactly 2 values (min and max). Then lines 229-231 pad the column to size 3 by copying from index 0. This creates an inconsistency: - If has_null is true: the column will have [null, min, max] (3 elements after padding) - If has_null is false: the column will have [min, max, min] (the first element is duplicated to reach size 3) This asymmetry seems unintentional. The padding logic should be reconsidered to ensure consistent behavior regardless of whether nulls exist. It's unclear why exactly 3 rows are needed - this magic number should be documented or the logic should be restructured. ```suggestion // Ensure there are at least three rows: [null (optional), min, max]. // Some downstream logic expects a fixed number of rows for min/max filters. constexpr size_t kMinMaxValueRowCount = 3; if (column->empty()) { // No value was inserted (e.g. NULL_TYPE without has_null); pad with defaults. while (column->size() < kMinMaxValueRowCount) { column->insert_default(); } } else { // Duplicate the last valid value (typically max) to reach the required size. while (column->size() < kMinMaxValueRowCount) { size_t last_index = column->size() - 1; column->insert_from(*column, last_index); } ``` ########## regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_min_max.groovy: ########## @@ -0,0 +1,221 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_optimize_min_max", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "test_iceberg_optimize_count" + + try { + + sql """drop catalog if exists ${catalog_name}""" + sql """CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """ switch ${catalog_name} """ + sql """ use format_v2 """ + + sql """ set enable_iceberg_min_max_optimization=false; """ + String SQLSTR = """ + MIN(id) AS id_min, + MAX(id) AS id_max, + MIN(col_boolean) AS col_boolean_min, + MAX(col_boolean) AS col_boolean_max, + MIN(col_short) AS col_short_min, + MAX(col_short) AS col_short_max, + MIN(col_byte) AS col_byte_min, + MAX(col_byte) AS col_byte_max, + MIN(col_integer) AS col_integer_min, + MAX(col_integer) AS col_integer_max, + MIN(col_long) AS col_long_min, + MAX(col_long) AS col_long_max, + MIN(col_float) AS col_float_min, + MAX(col_float) AS col_float_max, + MIN(col_double) AS col_double_min, + MAX(col_double) AS col_double_max, + MIN(col_date) AS col_date_min, + MAX(col_date) AS col_date_max + """; + def sqlstr1 = """ SELECT ${SQLSTR} FROM sample_mor_parquet; """ + def sqlstr2 = """ select ${SQLSTR} from sample_cow_parquet; """ + def sqlstr3 = """ select ${SQLSTR} from sample_mor_orc; """ + def sqlstr4 = """ select ${SQLSTR} from sample_mor_parquet; """ + + // don't use push down count + sql """ set enable_iceberg_min_max_optimization=false; """ + qt_false01 """${sqlstr1}""" + qt_false02 """${sqlstr2}""" + qt_false03 """${sqlstr3}""" + qt_false04 """${sqlstr4}""" + + // use push down count Review Comment: The comment at line 71 says "don't use push down count" which is incorrect. This should say "don't use push down min/max" or similar, since this test is about min/max optimization, not count optimization. This is likely a copy-paste error from the count optimization test. ```suggestion // don't use push down min/max sql """ set enable_iceberg_min_max_optimization=false; """ qt_false01 """${sqlstr1}""" qt_false02 """${sqlstr2}""" qt_false03 """${sqlstr3}""" qt_false04 """${sqlstr4}""" // use push down min/max ``` ########## be/src/vec/exec/format/table/iceberg_reader.cpp: ########## @@ -146,6 +157,82 @@ Status IcebergTableReader::init_row_filters() { return Status::OK(); } +Status IcebergTableReader::_insert_min_max_value_column(Block* block) { + const auto& min_max_values = _range.min_max_values; + auto mutate_columns = block->mutate_columns(); + for (const auto& [slot_id, min_max_value] : min_max_values) { + size_t column_index = 0; + bool found = false; + for (size_t i = 0; i < _params.required_slots.size(); ++i) { + if (_params.required_slots[i].slot_id == slot_id) { + column_index = i; + found = true; + break; + } + } + + if (!found) { + return Status::InternalError( + "Slot id {} from min_max_values not found in required_slots", slot_id); + } + + if (column_index >= mutate_columns.size()) { + return Status::InternalError("Column index {} out of range, mutate_columns size: {}", + column_index, mutate_columns.size()); + } + + auto& column = mutate_columns[column_index]; + RETURN_IF_ERROR(_insert_value_to_column(column, min_max_value)); + } + block->set_columns(std::move(mutate_columns)); + return Status::OK(); +} + +Status IcebergTableReader::_insert_value_to_column(MutableColumnPtr& column, + const TExprMinMaxValue& value) { + if (value.has_null) { + column->insert_default(); + } + if (value.type != TPrimitiveType::NULL_TYPE) { + switch (value.type) { + case TPrimitiveType::BOOLEAN: + case TPrimitiveType::TINYINT: + case TPrimitiveType::SMALLINT: + case TPrimitiveType::INT: + case TPrimitiveType::BIGINT: { + column->insert_data(reinterpret_cast<const char*>(&value.min_int_value), + sizeof(int64_t)); + column->insert_data(reinterpret_cast<const char*>(&value.max_int_value), + sizeof(int64_t)); + break; + } + case TPrimitiveType::DOUBLE: + case TPrimitiveType::FLOAT: { + column->insert_data(reinterpret_cast<const char*>(&value.min_float_value), + sizeof(double)); + column->insert_data(reinterpret_cast<const char*>(&value.max_float_value), + sizeof(double)); + break; + } Review Comment: Similar to the integer types issue, FLOAT types should use sizeof(float) instead of sizeof(double). The Java code sets min_float_value and max_float_value for FLOAT type (line 1624-1625), but the C++ code inserts it as sizeof(double) (line 212). This size mismatch can lead to incorrect data being written to the column. FLOAT should insert 4 bytes (float) while DOUBLE should insert 8 bytes (double). ```suggestion case TPrimitiveType::DOUBLE: { column->insert_data(reinterpret_cast<const char*>(&value.min_float_value), sizeof(double)); column->insert_data(reinterpret_cast<const char*>(&value.max_float_value), sizeof(double)); break; } case TPrimitiveType::FLOAT: { float min_val = static_cast<float>(value.min_float_value); float max_val = static_cast<float>(value.max_float_value); column->insert_data(reinterpret_cast<const char*>(&min_val), sizeof(float)); column->insert_data(reinterpret_cast<const char*>(&max_val), sizeof(float)); break; } ``` ########## be/src/vec/exec/scan/file_scanner.cpp: ########## @@ -1644,6 +1646,9 @@ Status FileScanner::_init_expr_ctxes() { return Status::InternalError( fmt::format("Unknown source slot descriptor, slot_id={}", slot_id)); } + if (!_current_range.min_max_values.contains(slot_id)) { + _could_use_iceberg_min_max_optimization = false; + } Review Comment: The logic at lines 1649-1651 sets `_could_use_iceberg_min_max_optimization` to false if ANY slot is missing from `min_max_values`. However, this happens inside a loop that iterates over `_params->required_slots`. This means that the optimization is disabled if even a single slot doesn't have min/max values. The issue is that this check happens AFTER line 1640 already read `_current_range.could_use_iceberg_min_max_optimization`. The variable is being overwritten during iteration, which could lead to inconsistent behavior. A better approach would be to check ALL required slots first before setting the final value, or rely on the frontend to properly set `could_use_iceberg_min_max_optimization` to false when any required slot is missing min/max values. The current implementation has the right intent but executes it in a way that's hard to reason about. ########## regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_min_max.groovy: ########## @@ -0,0 +1,221 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_optimize_min_max", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "test_iceberg_optimize_count" Review Comment: The catalog name is `test_iceberg_optimize_count` which appears to be a copy-paste error from the count optimization feature. Since this PR is about min/max optimization, the catalog name should be something like `test_iceberg_optimize_min_max` to match the test suite name and make the test more maintainable. ```suggestion String catalog_name = "test_iceberg_optimize_min_max" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
