This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 60d5e4dfce [improvement](spark-load) support parquet and orc file (#13438) 60d5e4dfce is described below commit 60d5e4dfce1e0bb03023d632c8ded0b50e2be386 Author: liujinhui <965147...@qq.com> AuthorDate: Thu Oct 20 08:59:22 2022 +0800 [improvement](spark-load) support parquet and orc file (#13438) Add support for parquet/orc in SparkDpp.java Fixed sparkDpp checkstyle issue --- .../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 28 ++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java index 74357a9afe..d64c1080c8 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java @@ -22,6 +22,7 @@ import org.apache.doris.load.loadv2.etl.EtlJobConfig; import com.google.common.base.Strings; import com.google.gson.Gson; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; @@ -75,7 +76,6 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; - // This class is a Spark-based data preprocessing program, // which will make use of the distributed compute framework of spark to // do ETL job/sort/preaggregate jobs in spark job @@ -87,6 +87,7 @@ import java.util.Set; // 2. repartition data by using doris data model(partition and bucket) // 3. process aggregation if needed // 4. write data to parquet file + public final class SparkDpp implements java.io.Serializable { private static final Logger LOG = LoggerFactory.getLogger(SparkDpp.class); @@ -212,7 +213,6 @@ public final class SparkDpp implements java.io.Serializable { continue; } - String curBucketKey = keyColumns.get(0).toString(); List<Object> columnObjects = new ArrayList<>(); for (int i = 1; i < keyColumns.size(); ++i) { @@ -620,6 +620,30 @@ public final class SparkDpp implements java.io.Serializable { if (fileGroup.columnsFromPath != null) { srcColumnsWithColumnsFromPath.addAll(fileGroup.columnsFromPath); } + + if (fileGroup.fileFormat.equalsIgnoreCase("parquet")) { + // parquet had its own schema, just use it; perhaps we could add some validation in future. + Dataset<Row> dataFrame = spark.read().parquet(fileUrl); + if (!CollectionUtils.isEmpty(columnValueFromPath)) { + for (int k = 0; k < columnValueFromPath.size(); k++) { + dataFrame = dataFrame.withColumn( + fileGroup.columnsFromPath.get(k), functions.lit(columnValueFromPath.get(k))); + } + } + return dataFrame; + } + + if (fileGroup.fileFormat.equalsIgnoreCase("orc")) { + Dataset<Row> dataFrame = spark.read().orc(fileUrl); + if (!CollectionUtils.isEmpty(columnValueFromPath)) { + for (int k = 0; k < columnValueFromPath.size(); k++) { + dataFrame = dataFrame.withColumn( + fileGroup.columnsFromPath.get(k), functions.lit(columnValueFromPath.get(k))); + } + } + return dataFrame; + } + StructType srcSchema = createScrSchema(srcColumnsWithColumnsFromPath); JavaRDD<String> sourceDataRdd = spark.read().textFile(fileUrl).toJavaRDD(); int columnSize = dataSrcColumns.size(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org