morningman commented on code in PR #13099: URL: https://github.com/apache/doris/pull/13099#discussion_r995288455
########## fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java: ########## @@ -620,13 +627,131 @@ private Dataset<Row> loadDataFromPath(SparkSession spark, if (fileGroup.columnsFromPath != null) { srcColumnsWithColumnsFromPath.addAll(fileGroup.columnsFromPath); } - StructType srcSchema = createScrSchema(srcColumnsWithColumnsFromPath); - JavaRDD<String> sourceDataRdd = spark.read().textFile(fileUrl).toJavaRDD(); + StructType srcSchema = createSrcSchema(srcColumnsWithColumnsFromPath); int columnSize = dataSrcColumns.size(); List<ColumnParser> parsers = new ArrayList<>(); for (EtlJobConfig.EtlColumn column : baseIndex.columns) { parsers.add(ColumnParser.create(column)); } + + String fileFormat = fileGroup.fileFormat; + if (Strings.isNullOrEmpty(fileFormat)) { + fileFormat = "csv"; + } + + JavaRDD<Row> rowRDD = null; + switch (fileFormat) { + case "csv": + rowRDD = getRowsFromText(spark, fileGroup, fileUrl, baseIndex, + columns, columnValueFromPath, dstColumnNameToIndex, srcSchema, columnSize, parsers); + break; + case "parquet": + case "orc": + rowRDD = getRowsFromParquetOrOrc(fileFormat, spark, fileGroup, fileUrl, baseIndex, + columns, columnValueFromPath, dstColumnNameToIndex, srcSchema, columnSize, parsers); + break; + default: + throw new SparkDppException("Unsupport file format: " + fileFormat); + } + + Dataset<Row> dataframe = spark.createDataFrame(rowRDD, srcSchema); + return dataframe; + } + + private JavaRDD<Row> getRowsFromParquetOrOrc(String fileFormat, SparkSession spark, EtlFileGroup fileGroup, + String fileUrl, + EtlIndex baseIndex, List<EtlColumn> columns, List<String> columnValueFromPath, + Map<String, Integer> dstColumnNameToIndex, StructType srcSchema, + int columnSize, List<ColumnParser> parsers) throws SparkDppException { + JavaRDD<Row> rows = null; + if (fileFormat.equalsIgnoreCase("parquet")) { + rows = spark.read().parquet(fileUrl).toJavaRDD(); + } else if (fileFormat.equalsIgnoreCase("orc")) { + rows = spark.read().orc(fileUrl).toJavaRDD(); + } else { + throw new SparkDppException("Unknown file format: " + fileFormat); + } + + JavaRDD<Row> rowRDD = rows.flatMap( + record -> { + scannedRowsAcc.add(1); + List<Row> result = new ArrayList<>(); + List<String> resStringCols = Lists.newArrayList(); + boolean validRow = true; + if (record.length() != columnSize) { + LOG.warn("invalid src schema, data columns:" + + record.length() + ", file group columns:" + + columnSize + ", row:" + record); + validRow = false; + } else { + for (int i = 0; i < record.length(); ++i) { + StructField field = srcSchema.apply(i); + String srcColumnName = field.name(); + Object col = record.get(i); + String strCol = null; + if (col == null && dstColumnNameToIndex.containsKey(srcColumnName)) { + if (!baseIndex.columns.get(dstColumnNameToIndex.get(srcColumnName)).isAllowNull) { + LOG.warn("column name:" + srcColumnName + ", attribute: " + i + + " can not be null. row:" + record); + validRow = false; + break; + } + } else { + strCol = col.toString(); Review Comment: I don't know what is the result of this `to_string()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org