Copilot commented on code in PR #58396:
URL: https://github.com/apache/doris/pull/58396#discussion_r2570899819
##########
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:
##########
@@ -174,9 +276,19 @@ Status VIcebergTableWriter::write(RuntimeState* state,
vectorized::Block& block)
Block transformed_block;
SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
transformed_block.reserve(_iceberg_partition_columns.size());
- for (auto& iceberg_partition_columns : _iceberg_partition_columns) {
-
transformed_block.insert(iceberg_partition_columns.partition_column_transform().apply(
- output_block, iceberg_partition_columns.source_idx()));
+ for (int i = 0; i < _iceberg_partition_columns.size(); ++i) {
+ auto& iceberg_partition_columns = _iceberg_partition_columns[i];
+ if (_has_static_partition && _partition_column_is_static[i]) {
+ auto result_type =
+
iceberg_partition_columns.partition_column_transform().get_result_type();
+ auto col =
result_type->create_column_const_with_default_value(output_block.rows());
Review Comment:
In hybrid mode (lines 281-291), when a partition column is static, a
constant column with default values is created (line 284). However, the actual
static partition value is never used to populate this column - it just uses the
default value. This means the `transformed_block` contains incorrect data for
static partition columns. While this might not affect the final output because
`_partition_to_path` and `_partition_values` use
`_partition_column_static_values` directly (lines 435-437, 466-468), having
incorrect data in the intermediate transformed_block is confusing and could
lead to bugs if the code is modified later. The constant column should be
populated with the actual static partition value from
`_partition_column_static_values[i]` instead of using default values.
```suggestion
// Create a column with the actual static partition value
auto data_col = result_type->create_column();
result_type->get_default_serializer()->deserialize_column(
{_partition_column_static_values[i].data(),
_partition_column_static_values[i].size()},
*data_col, 1);
auto col = ColumnConst::create(std::move(data_col),
output_block.rows());
```
##########
regression-test/data/external_table_p0/iceberg/write/test_iceberg_static_partition_overwrite.out:
##########
@@ -0,0 +1,104 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !q01 --
+1 Alice 2025-01-25 bj
+2 Bob 2025-01-25 sh
+3 Charlie 2025-01-26 bj
+4 David 2025-01-26 sh
+
+-- !q02 --
+10 Eve 2025-01-25 bj
+2 Bob 2025-01-25 sh
+3 Charlie 2025-01-26 bj
+4 David 2025-01-26 sh
+
+-- !q03_before --
+1 Alice 2025-01-25 bj
+2 Bob 2025-01-25 sh
+3 Charlie 2025-01-26 bj
+4 David 2025-01-26 sh
+
+-- !q03_after --
+10 Eve 2025-01-25 bj
+11 Frank 2025-01-25 sh
+12 Grace 2025-01-25 gz
+3 Charlie 2025-01-26 bj
+4 David 2025-01-26 sh
+
+-- !q03_partition_25 --
+10 Eve 2025-01-25 bj
+11 Frank 2025-01-25 sh
+12 Grace 2025-01-25 gz
+
+-- !q03_partition_26 --
+3 Charlie 2025-01-26 bj
+4 David 2025-01-26 sh
+
+-- !q04 --
+2 Bob 2025-01-25 sh
+3 Charlie 2025-01-26 bj
+
+-- !q05 --
+10 Eve 2025-01-25 bj 100
+2 Bob 2025-01-25 bj 200
+3 Charlie 2025-01-25 sh 100
+4 David 2025-01-26 bj 100
+
+-- !q06_before --
+1 Alice 2025-01-25 bj food
+2 Bob 2025-01-25 bj drink
+3 Charlie 2025-01-25 sh food
+4 David 2025-01-26 bj food
+
+-- !q06_after --
+10 Eve 2025-01-25 bj electronics
+11 Frank 2025-01-25 bj clothing
+3 Charlie 2025-01-25 sh food
+4 David 2025-01-26 bj food
+
+-- !q06_static_partition --
+10 Eve 2025-01-25 bj electronics
+11 Frank 2025-01-25 bj clothing
+
+-- !q06_other_partitions --
+3 Charlie 2025-01-25 sh food
+4 David 2025-01-26 bj food
+
+-- !q07 --
+10 Eve 1706140800000
+2 Bob 1706227200000
+3 Charlie 1706313600000
+
+-- !q08 --
+10 Eve 1706140800000 bj
+11 Frank 1706140800000 sh
+3 Charlie 1706227200000 bj
+
+-- !q09 --
+10 Eve 85.5
+2 Bob 90.0
+3 Charlie 75.5
+
+-- !q10 --
+10 Eve 85.5 1
+11 Frank 85.5 2
+3 Charlie 90.0 1
+
+-- !q11 --
+10 Eve 99.98999999999999
+2 Bob 199.99
+3 Charlie 299.99
+
+-- !q12 --
+10 Eve 99.98999999999999 A
+11 Frank 99.98999999999999 B
+3 Charlie 199.99 A
+
+-- !q13 --
+10 Eve true
+2 Bob false
+
+-- !q14 --
+10 Eve true 1
+11 Frank true 2
+3 Charlie false 1
+
Review Comment:
The test file is missing expected output for queries q15-q20. The test
contains Test Cases 15-20 (lines 549-741) which execute various queries (q15,
q16, q17, q18, q19, q20), but the corresponding output file only contains
results up to q14. This means these test cases will fail when executed.
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java:
##########
@@ -797,6 +797,72 @@ public static Literal<?> parseIcebergLiteral(String value,
org.apache.iceberg.ty
}
}
+ /**
+ * Convert human-readable partition value string to appropriate Java type
for
+ * Iceberg expression.
+ * This is used for static partition overwrite where user specifies
partition
+ * values like PARTITION (dt='2025-01-01', region='bj').
+ *
+ * @param valueStr Partition value as human-readable string (e.g.,
+ * "2025-01-01" for date)
+ * @param icebergType Iceberg type of the partition field
+ * @return Converted value object suitable for Iceberg Expression, or null
if
+ * value is null
+ */
+ public static Object parsePartitionValueFromString(String valueStr,
org.apache.iceberg.types.Type icebergType) {
+ if (valueStr == null) {
+ return null;
+ }
+
+ try {
+ switch (icebergType.typeId()) {
+ case STRING:
+ return valueStr;
+ case INTEGER:
+ return Integer.parseInt(valueStr);
+ case LONG:
+ return Long.parseLong(valueStr);
+ case FLOAT:
+ return Float.parseFloat(valueStr);
+ case DOUBLE:
+ return Double.parseDouble(valueStr);
+ case BOOLEAN:
+ return Boolean.parseBoolean(valueStr);
+ case DATE:
+ // Parse date string (format: yyyy-MM-dd) to epoch day
+ return (int) LocalDate.parse(valueStr,
DateTimeFormatter.ISO_LOCAL_DATE).toEpochDay();
+ case TIMESTAMP:
+ // Parse timestamp string (format: yyyy-MM-dd HH:mm:ss or
ISO format) to
+ // microseconds
+ return parseTimestampToMicros(valueStr, (TimestampType)
icebergType);
+ case DECIMAL:
+ return new BigDecimal(valueStr);
+ default:
+ throw new IllegalArgumentException("Unsupported partition
value type: " + icebergType);
+ }
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("Failed to
convert partition value '%s' to type %s",
+ valueStr, icebergType), e);
+ }
+ }
+
+ private static long parseTimestampToMicros(String valueStr, TimestampType
timestampType) {
+ LocalDateTime ldt;
+ try {
+ ldt = LocalDateTime.parse(valueStr,
DateTimeFormatter.ISO_LOCAL_DATE_TIME);
+ } catch (Exception e) {
+ // Try alternative format: yyyy-MM-dd HH:mm:ss
+ ldt = LocalDateTime.parse(valueStr,
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+ }
+
+ // Convert to microseconds
+ if (timestampType.shouldAdjustToUTC()) {
+ return
ldt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() * 1000;
+ } else {
+ return ldt.toInstant(ZoneOffset.UTC).toEpochMilli() * 1000;
+ }
Review Comment:
The timestamp parsing logic has potential issues with timezone handling. On
line 855, when the ISO format parsing fails, it falls back to parsing
"yyyy-MM-dd HH:mm:ss" format. However, the exception from line 852 is caught
and ignored without logging. If both formats fail, line 855 will throw an
exception that propagates up. More importantly, the timezone handling on lines
859-862 may be incorrect:
- When `shouldAdjustToUTC()` is true, it converts using system default
timezone (line 860)
- When false, it assumes the input is already in UTC (line 862)
This is confusing and could lead to incorrect timestamp values. The logic
should be clarified or documented better to explain when each branch should be
used.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java:
##########
@@ -605,16 +629,99 @@ private Plan
bindIcebergTableSink(MatchingContext<UnboundIcebergTableSink<Plan>>
Optional.empty(),
Optional.empty(),
child);
- // we need to insert all the columns of the target table
+
+ // Check column count: SELECT columns should match bindColumns
(excluding static
+ // partition columns)
if (boundSink.getCols().size() != child.getOutput().size()) {
- throw new AnalysisException("insert into cols should be
corresponding to the query output");
+ throw new AnalysisException("insert into cols should be
corresponding to the query output. "
+ + "Expected " + boundSink.getCols().size() + " columns but
got " + child.getOutput().size());
}
+
Map<String, NamedExpression> columnToOutput = getColumnToOutput(ctx,
table, false,
boundSink, child);
+
+ // For static partition columns, add constant expressions from
PARTITION clause
+ // This ensures partition column values are written to the data file
+ if (!staticPartitionColNames.isEmpty()) {
+ for (Map.Entry<String, Expression> entry :
staticPartitions.entrySet()) {
+ String colName = entry.getKey();
+ Expression valueExpr = entry.getValue();
+ Column column = table.getColumn(colName);
+ if (column != null) {
+ // Cast the literal to the correct column type
+ Expression castExpr = TypeCoercionUtils.castIfNotSameType(
+ valueExpr,
DataType.fromCatalogType(column.getType()));
+ columnToOutput.put(colName, new Alias(castExpr, colName));
+ }
+ }
+ }
+
LogicalProject<?> fullOutputProject =
getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput);
return boundSink.withChildAndUpdateOutput(fullOutputProject);
}
+ /**
+ * Validate static partition specification for Iceberg table
+ */
+ private void validateStaticPartition(UnboundIcebergTableSink<?> sink,
IcebergExternalTable table) {
+ Map<String, Expression> staticPartitions =
sink.getStaticPartitionKeyValues();
+ if (staticPartitions == null || staticPartitions.isEmpty()) {
+ return;
+ }
+
+ Table icebergTable = table.getIcebergTable();
+ PartitionSpec partitionSpec = icebergTable.spec();
+
+ // Check if table is partitioned
+ if (!partitionSpec.isPartitioned()) {
+ throw new AnalysisException(
+ String.format("Table %s is not partitioned, cannot use
static partition syntax", table.getName()));
+ }
+
+ // Get partition field names
+ Map<String, PartitionField> partitionFieldMap = Maps.newHashMap();
+ for (PartitionField field : partitionSpec.fields()) {
+ String fieldName = field.name();
+ partitionFieldMap.put(fieldName, field);
+ }
+
+ // Validate each static partition column
+ for (Map.Entry<String, Expression> entry :
staticPartitions.entrySet()) {
+ String partitionColName = entry.getKey();
+ Expression partitionValue = entry.getValue();
+
+ // 1. Check if partition column exists
+ if (!partitionFieldMap.containsKey(partitionColName)) {
+ throw new AnalysisException(
+ String.format("Unknown partition column '%s' in table
'%s'. Available partition columns: %s",
+ partitionColName, table.getName(),
partitionFieldMap.keySet()));
+ }
+
+ // 2. Check if it's an identity partition.
+ // Static partition overwrite is only supported for identity
partitions.
+ PartitionField field = partitionFieldMap.get(partitionColName);
+ if (!field.transform().isIdentity()) {
+ throw new AnalysisException(
+ String.format("Cannot use static partition syntax for
non-identity partition field '%s'"
+ + " (transform: %s).", partitionColName,
field.transform().toString()));
+ }
+
+ // 3. Check if partition value is a constant expression
+ if (!partitionValue.isConstant()) {
+ throw new AnalysisException(
+ String.format("Partition value for column '%s' must be
a constant expression, but got: %s",
+ partitionColName, partitionValue));
+ }
+
+ // 4. Validate partition value type
+ if (!(partitionValue instanceof Literal)) {
+ throw new AnalysisException(
+ String.format("Partition value for column '%s' must be
a literal, but got: %s",
+ partitionColName, partitionValue));
+ }
Review Comment:
[nitpick] The validation checks if `partitionValue.isConstant()` on line
710, and then checks if it's an instance of `Literal` on line 717. These checks
are redundant - if something is a `Literal`, it's already constant. The
`isConstant()` check could be removed, or the comment should explain why both
checks are needed if there's a subtle distinction.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java:
##########
@@ -1434,13 +1433,64 @@ public Pair<Boolean, List<String>>
visitPartitionSpec(PartitionSpecContext ctx)
partitions = null;
} else if (ctx.partition != null) {
partitions = ImmutableList.of(ctx.partition.getText());
+ } else if (ctx.partitionKeyValue() != null &&
!ctx.partitionKeyValue().isEmpty()) {
+ // Static partition: PARTITION (col1='val1', col2='val2')
+ // For backward compatibility with callers expecting
Pair<Boolean, List<String>>,
+ // return empty list here. Use parseInsertPartitionSpec() for
full support.
+ partitions = ImmutableList.of();
Review Comment:
[nitpick] When static partition syntax is detected (lines 1436-1440), the
method returns an empty list for backward compatibility. However, this is not
clearly documented and could cause confusion for callers of this method who
don't expect an empty list for static partitions. Consider adding a comment
explaining that static partition handling requires using
`parseInsertPartitionSpec()` instead, or update the method documentation.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java:
##########
@@ -394,16 +400,33 @@ private void insertIntoAutoDetect(ConnectContext ctx,
StmtExecutor executor, lon
} else if (logicalQuery instanceof UnboundHiveTableSink) {
insertCtx = new HiveInsertCommandContext();
((HiveInsertCommandContext) insertCtx).setOverwrite(true);
- } else if (logicalQuery instanceof UnboundIcebergTableSink) {
- insertCtx = new IcebergInsertCommandContext();
- ((IcebergInsertCommandContext) insertCtx).setOverwrite(true);
- branchName.ifPresent(notUsed -> ((IcebergInsertCommandContext)
insertCtx).setBranchName(branchName));
} else {
- throw new UserException("Current catalog does not support insert
overwrite yet.");
+ throw new UserException("Current catalog does not support insert
overwrite with auto-detect partition.");
Review Comment:
The removed Iceberg support in `insertIntoAutoDetect` appears to be a
regression. The method previously supported `UnboundIcebergTableSink` but now
throws an exception for all catalog types except `UnboundTableSink` and
`UnboundHiveTableSink`. This breaks auto-detect functionality for Iceberg
tables. If this removal is intentional (e.g., Iceberg doesn't support
auto-detect), it should be clearly documented or the error message should be
more specific about why Iceberg is excluded.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]