This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d9bd1fad67a [fix](spark-load)fix-Unique-key-with-MOR-by-sparkload (#26383) d9bd1fad67a is described below commit d9bd1fad67af08d4d8027035e39855535de4a433 Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Mon Nov 6 09:56:46 2023 +0800 [fix](spark-load)fix-Unique-key-with-MOR-by-sparkload (#26383) When a Unique key table carries the `enable_unique_key_merge_on_write` attribute, the value of the agg type is none. Therefore, when doing sparkload, we need to specify the agg type as `REPLACE`. --- .../org/apache/doris/load/loadv2/SparkLoadPendingTask.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java index 8e9599f774b..32749fd8a77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java @@ -247,10 +247,13 @@ public class SparkLoadPendingTask extends LoadTask { long indexId = entry.getKey(); int schemaHash = table.getSchemaHashByIndexId(indexId); + boolean changeAggType = table.getKeysTypeByIndexId(indexId).equals(KeysType.UNIQUE_KEYS) + && table.getTableProperty().getEnableUniqueKeyMergeOnWrite(); + // columns List<EtlColumn> etlColumns = Lists.newArrayList(); for (Column column : entry.getValue()) { - etlColumns.add(createEtlColumn(column)); + etlColumns.add(createEtlColumn(column, changeAggType)); } // check distribution type @@ -290,7 +293,7 @@ public class SparkLoadPendingTask extends LoadTask { return etlIndexes; } - private EtlColumn createEtlColumn(Column column) { + private EtlColumn createEtlColumn(Column column, boolean changeAggType) { // column name String name = column.getName().toLowerCase(Locale.ROOT); // column type @@ -304,7 +307,11 @@ public class SparkLoadPendingTask extends LoadTask { // aggregation type String aggregationType = null; if (column.getAggregationType() != null) { - aggregationType = column.getAggregationType().toString(); + if (changeAggType && !column.isKey()) { + aggregationType = AggregateType.REPLACE.toSql(); + } else { + aggregationType = column.getAggregationType().toString(); + } } // default value --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org