This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 674601cb8eb3a54535fc9ae512e81ca7ce8c10c8 Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Sun Sep 10 19:45:01 2023 +0800 [Fix](spark-load) ignore column name case in spark load (#23947) Doris is not case sensitive to field names, so when doing spark load, we can convert all fields to lowercase for matching and loading. --- be/src/olap/push_handler.cpp | 2 +- .../java/org/apache/doris/load/loadv2/SparkLoadJob.java | 13 +++++++++++-- .../org/apache/doris/load/loadv2/SparkLoadPendingTask.java | 3 ++- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index ca4f6c1717..841ad0ecdf 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -384,7 +384,7 @@ Status PushBrokerReader::init() { auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots(); for (int i = 0; i < slot_descs.size(); i++) { - _all_col_names.push_back(slot_descs[i]->col_name()); + _all_col_names.push_back(to_lower((slot_descs[i]->col_name()))); } RETURN_IF_ERROR(_init_expr_ctxes()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index bbf06b21da..09a85d3dff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -103,6 +103,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; @@ -406,7 +407,13 @@ public class SparkLoadJob extends BulkLoadJob { private PushBrokerReaderParams getPushBrokerReaderParams(OlapTable table, long indexId) throws UserException { if (!indexToPushBrokerReaderParams.containsKey(indexId)) { PushBrokerReaderParams pushBrokerReaderParams = new PushBrokerReaderParams(); - pushBrokerReaderParams.init(table.getSchemaByIndexId(indexId), brokerDesc); + List<Column> columns = new ArrayList<>(); + table.getSchemaByIndexId(indexId).forEach(col -> { + Column column = new Column(col); + column.setName(col.getName().toLowerCase(Locale.ROOT)); + columns.add(column); + }); + pushBrokerReaderParams.init(columns, brokerDesc); indexToPushBrokerReaderParams.put(indexId, pushBrokerReaderParams); } return indexToPushBrokerReaderParams.get(indexId); @@ -463,7 +470,9 @@ public class SparkLoadJob extends BulkLoadJob { List<TColumn> columnsDesc = new ArrayList<TColumn>(); for (Column column : olapTable.getSchemaByIndexId(indexId)) { - columnsDesc.add(column.toThrift()); + TColumn tColumn = column.toThrift(); + tColumn.setColumnName(tColumn.getColumnName().toLowerCase(Locale.ROOT)); + columnsDesc.add(tColumn); } int bucket = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java index 67825c0327..b9f361ddd8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java @@ -71,6 +71,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; @@ -281,7 +282,7 @@ public class SparkLoadPendingTask extends LoadTask { private EtlColumn createEtlColumn(Column column) { // column name - String name = column.getName(); + String name = column.getName().toLowerCase(Locale.ROOT); // column type PrimitiveType type = column.getDataType(); String columnType = column.getDataType().toString(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org