kangkaisen commented on a change in pull request #3296: spark data preparation 
process, issue #3295
URL: https://github.com/apache/incubator-doris/pull/3296#discussion_r407070222
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
 ##########
 @@ -0,0 +1,848 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import com.google.common.base.Strings;
+import com.google.gson.Gson;
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.function.ForeachPartitionFunction;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
+import org.apache.spark.util.LongAccumulator;
+import scala.Tuple2;
+import scala.collection.Seq;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.math.BigInteger;
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.LinkedList;
+import java.util.HashSet;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import scala.collection.JavaConverters;
+
+// This class is a Spark-based data preprocessing program,
+// which will make use of the distributed compute framework of spark to
+// do ETL job/sort/preaggregate/convert data format job in spark job
+// to boost the process of large amount of data load.
+public final class SparkDpp implements java.io.Serializable {
+    private static final String NULL_FLAG = "\\N";
+    private static final String DPP_RESULT_FILE = "dpp_result.json";
+    private static final String BITMAP_TYPE = "bitmap";
+    private EtlJobConfig etlJobConfig = null;
+    private LongAccumulator abnormalRowAcc = null;
+    private LongAccumulator unselectedRowAcc = null;
+    private LongAccumulator scannedRowsAcc = null;
+    private LongAccumulator fileNumberAcc = null;
+    private LongAccumulator fileSizeAcc = null;
+    private DppResult dppResult;
+    private SparkSession spark = null;
+    private UserDefinedAggregateFunction bitmap_union = null;
+
+    public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig) {
+        this.spark = spark;
+        this.etlJobConfig = etlJobConfig;
+    }
+
+    public void init() {
+        bitmap_union = spark.udf().register("bitmap_union", new BitmapUnion());
+        abnormalRowAcc = spark.sparkContext().longAccumulator();
+        unselectedRowAcc = spark.sparkContext().longAccumulator();
+        scannedRowsAcc = spark.sparkContext().longAccumulator();
+        fileNumberAcc = spark.sparkContext().longAccumulator();
+        fileSizeAcc = spark.sparkContext().longAccumulator();
+    }
+
+    private Dataset<Row> processDataframeAgg(Dataset<Row> dataframe, 
EtlJobConfig.EtlIndex indexMeta) {
+        dataframe.createOrReplaceTempView("base_table");
+        StringBuilder sb = new StringBuilder();
+        sb.append("select ");
+        sb.append(DppUtils.BUCKET_ID + ",");
+
+        // assume that keys are all before values
+        StringBuilder groupBySb = new StringBuilder();
+        groupBySb.append(DppUtils.BUCKET_ID + ",");
+        Map<String, DataType> valueColumnsOriginalType = new HashMap<>();
+        for (EtlJobConfig.EtlColumn column : indexMeta.columns) {
+            if (column.isKey) {
+                sb.append(column.columnName + ",");
+                groupBySb.append(column.columnName + ",");
+            } else {
+                // get the value columns's original type
+                DataType originalType = 
dataframe.schema().apply(column.columnName).dataType();
+                valueColumnsOriginalType.put(column.columnName, originalType);
+                if (column.aggregationType.equalsIgnoreCase("MAX")) {
+                    sb.append("max(" + column.columnName + ") as " + 
column.columnName);
+                    sb.append(",");
+                } else if (column.aggregationType.equalsIgnoreCase("MIN")) {
+                    sb.append("min(" + column.columnName + ") as " + 
column.columnName);
+                    sb.append(",");
+                } else if (column.aggregationType.equalsIgnoreCase("SUM")) {
+                    sb.append("sum(" + column.columnName + ") as " + 
column.columnName);
+                    sb.append(",");
+                }  else if 
(column.aggregationType.equalsIgnoreCase("BITMAP_UNION")) {
+                    sb.append("bitmap_union(" + column.columnName + ") as " + 
column.columnName);
+                    sb.append(",");
+                }
+            }
+        }
+        sb.deleteCharAt(sb.length() - 1);
+        groupBySb.deleteCharAt(groupBySb.length() - 1);
+        sb.append(" from base_table ");
+        sb.append(" group by ");
+        sb.append(groupBySb.toString());
+        String aggSql = sb.toString();
+        Dataset<Row> aggDataFrame = spark.sql(aggSql);
+        // after agg, the type of sum column maybe be changed, so should add 
type cast for value column
+        for (Map.Entry<String, DataType> entry : 
valueColumnsOriginalType.entrySet()) {
+            DataType currentType = 
aggDataFrame.schema().apply(entry.getKey()).dataType();
+            if (!currentType.equals(entry.getValue())) {
+                aggDataFrame = aggDataFrame.withColumn(entry.getKey(), 
aggDataFrame.col(entry.getKey()).cast(entry.getValue()));
+            }
+        }
+        return aggDataFrame;
+    }
+
+    private void writePartitionedAndSortedDataframeToParquet(Dataset<Row> 
dataframe, String pathPattern, long tableId,
+                                                             
EtlJobConfig.EtlIndex indexMeta) {
+        dataframe.foreachPartition(new ForeachPartitionFunction<Row>() {
+            @Override
+            public void call(Iterator<Row> t) throws Exception {
+                // write the data to dst file
+                Configuration conf = new Configuration();
+                FileSystem fs = 
FileSystem.get(URI.create(etlJobConfig.outputPath), conf);
+                String lastBucketKey = null;
+                ParquetWriter<Group> writer = null;
+                Types.MessageTypeBuilder builder = Types.buildMessage();
+                for (EtlJobConfig.EtlColumn column : indexMeta.columns) {
+                    if (column.isAllowNull) {
+                        if (column.columnType.equals("SMALLINT") 
||column.columnType.equals("INT")) {
+                            
builder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(column.columnName);
+                        } else if (column.columnType.equals("BIGINT")) {
+                            
builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(column.columnName);
+                        } else if (column.columnType.equals("BOOL")) {
+                            
builder.optional(PrimitiveType.PrimitiveTypeName.BOOLEAN).named(column.columnName);
+                        } else if (column.columnType.equals("VARCHAR")) {
+                            // should use as(OriginalType.UTF8), or result 
will be binary
+                            
builder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(column.columnName);
+                        } else if (column.columnType.equals("FLOAT")) {
+                            
builder.optional(PrimitiveType.PrimitiveTypeName.FLOAT).named(column.columnName);
+                        } else if (column.columnType.equals("DOUBLE")) {
+                            
builder.optional(PrimitiveType.PrimitiveTypeName.DOUBLE).named(column.columnName);
+                        } else {
+                            // TODO: exception handle
+                            System.err.println("invalid column type:" + 
column);
+                        }
+                    } else {
+                        if (column.columnType.equals("SMALLINT") 
||column.columnType.equals("INT")) {
+                            
builder.required(PrimitiveType.PrimitiveTypeName.INT32).named(column.columnName);
+                        } else if (column.columnType.equals("BIGINT")) {
+                            
builder.required(PrimitiveType.PrimitiveTypeName.INT64).named(column.columnName);
+                        } else if (column.columnType.equals("BOOL")) {
+                            
builder.required(PrimitiveType.PrimitiveTypeName.BOOLEAN).named(column.columnName);
+                        }  else if (column.columnType.equals("VARCHAR")) {
+                            // should use as(OriginalType.UTF8), or result 
will be binary
+                            
builder.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(column.columnName);
+                        } else if (column.columnType.equals("FLOAT")) {
+                            
builder.required(PrimitiveType.PrimitiveTypeName.FLOAT).named(column.columnName);
+                        } else if (column.columnType.equals("DOUBLE")) {
+                            
builder.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named(column.columnName);
+                        } else {
+                            // TODO: exception handle
 
 Review comment:
   This todo should fix.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to