This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.13
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 33b6206f699b30eb5b55f4bb28f3991a19c2862a
Author: wangbo <506340...@qq.com>
AuthorDate: Sun Sep 13 11:57:33 2020 +0800

    [SparkDpp] Support complete types (#4524)
    
    For[Spark Load]
    1 support decimal andl largeint
    2 add validate logic for char/varchar/decimal
    3 check data load from hive with strict mode
    4 support decimal/date/datetime aggregator
---
 be/src/olap/push_handler.cpp                       |   2 +-
 .../apache/doris/load/loadv2/dpp/ColumnParser.java |  89 +++++++++--
 .../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 170 ++++++++++++++++++---
 .../doris/load/loadv2/dpp/SparkRDDAggregator.java  |  53 +++++--
 .../doris/load/loadv2/dpp/ColumnParserTest.java    | 135 ++++++++++++++++
 .../apache/doris/load/loadv2/dpp/SparkDppTest.java |  67 ++++++++
 6 files changed, 467 insertions(+), 49 deletions(-)

diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 2a06329..cfd221d 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -1026,7 +1026,7 @@ OLAPStatus PushBrokerReader::next(ContiguousRow* row) {
         const void* value = _tuple->get_slot(slot->tuple_offset());
         // try execute init method defined in aggregateInfo
         // by default it only copies data into cell
-        _schema->column(i)->consume(&cell, (const char*)value, is_null, 
+        _schema->column(i)->consume(&cell, (const char*)value, is_null,
                                     _mem_pool.get(), 
_runtime_state->obj_pool());
         // if column(i) is a value column, try execute finalize method defined 
in aggregateInfo
         // to convert data into final format
diff --git 
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java 
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
index d5e0cee..c9d6a42 100644
--- 
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
+++ 
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
@@ -19,13 +19,20 @@ package org.apache.doris.load.loadv2.dpp;
 
 import org.apache.doris.common.SparkDppException;
 import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
 import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.Date;
 
 // Parser to validate value for different type
 public abstract class ColumnParser implements Serializable {
+
+    protected static final Logger LOG = 
LogManager.getLogger(ColumnParser.class);
+
     public static ColumnParser create(EtlJobConfig.EtlColumn etlColumn) throws 
SparkDppException {
         String columnType = etlColumn.columnType;
         if (columnType.equalsIgnoreCase("TINYINT")) {
@@ -51,6 +58,10 @@ public abstract class ColumnParser implements Serializable {
                 || columnType.equalsIgnoreCase("BITMAP")
                 || columnType.equalsIgnoreCase("HLL")) {
             return new StringParser(etlColumn);
+        } else if (columnType.equalsIgnoreCase("DECIMALV2")) {
+            return new DecimalParser(etlColumn);
+        } else if (columnType.equalsIgnoreCase("LARGEINT")) {
+            return new LargeIntParser();
         } else {
             throw new SparkDppException("unsupported type:" + columnType);
         }
@@ -63,10 +74,7 @@ class TinyIntParser extends ColumnParser {
     @Override
     public boolean parse(String value) {
         try {
-            Short parsed = Short.parseShort(value);
-            if (parsed > 127 || parsed < -128) {
-                return false;
-            }
+            Byte.parseByte(value);
         } catch (NumberFormatException e) {
             return false;
         }
@@ -102,7 +110,7 @@ class BigIntParser extends ColumnParser {
     @Override
     public boolean parse(String value) {
         try {
-            Integer.parseInt(value);
+            Long.parseLong(value);
         } catch (NumberFormatException e) {
             return false;
         }
@@ -114,11 +122,11 @@ class FloatParser extends ColumnParser {
     @Override
     public boolean parse(String value) {
         try {
-            Float.parseFloat(value);
+            Float ret = Float.parseFloat(value);
+            return !ret.isNaN() && !ret.isInfinite();
         } catch (NumberFormatException e) {
             return false;
         }
-        return true;
     }
 }
 
@@ -126,11 +134,11 @@ class DoubleParser extends ColumnParser {
     @Override
     public boolean parse(String value) {
         try {
-            Double.parseDouble(value);
+            Double ret = Double.parseDouble(value);
+            return !ret.isInfinite() && !ret.isNaN();
         } catch (NumberFormatException e) {
             return false;
         }
-        return true;
     }
 }
 
@@ -186,4 +194,67 @@ class StringParser extends ColumnParser {
             throw new RuntimeException("string check failed ", e);
         }
     }
+}
+
+class DecimalParser extends ColumnParser {
+
+    public static int PRECISION = 27;
+    public static int SCALE = 9;
+
+    private BigDecimal maxValue;
+    private BigDecimal minValue;
+
+    public DecimalParser(EtlJobConfig.EtlColumn etlColumn) {
+        StringBuilder precisionStr = new StringBuilder();
+        for (int i = 0; i < etlColumn.precision - etlColumn.scale; i++) {
+            precisionStr.append("9");
+        }
+        StringBuilder scaleStr = new StringBuilder();
+        for (int i = 0; i < etlColumn.scale; i++) {
+            scaleStr.append("9");
+        }
+        maxValue = new BigDecimal(precisionStr.toString() + "." + 
scaleStr.toString());
+        minValue = new BigDecimal("-" + precisionStr.toString() + "." + 
scaleStr.toString());
+    }
+
+    @Override
+    public boolean parse(String value) {
+        try {
+            BigDecimal bigDecimal = new BigDecimal(value);
+            return bigDecimal.precision() - bigDecimal.scale() <= PRECISION - 
SCALE && bigDecimal.scale() <= SCALE;
+        } catch (NumberFormatException e) {
+            return false;
+        } catch (Exception e) {
+            throw new RuntimeException("decimal parse failed ", e);
+        }
+    }
+
+    public BigDecimal getMaxValue() {
+        return maxValue;
+    }
+
+    public BigDecimal getMinValue() {
+        return minValue;
+    }
+}
+
+class LargeIntParser extends ColumnParser {
+
+    private BigInteger maxValue = new 
BigInteger("170141183460469231731687303715884105727");
+    private BigInteger minValue = new 
BigInteger("-170141183460469231731687303715884105728");
+
+    @Override
+    public boolean parse(String value) {
+        try {
+            BigInteger inputValue = new BigInteger(value);
+            return inputValue.compareTo(maxValue) < 0 && 
inputValue.compareTo(minValue) > 0;
+        } catch (NumberFormatException e) {
+            return false;
+        } catch (ArithmeticException e) {
+            LOG.warn("int value is too big even for java BigInteger,value={}" 
+ value);
+            return false;
+        } catch (Exception e) {
+            throw new RuntimeException("large int parse failed:" + value, e);
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java 
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index fd71add..e4c3a23 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -40,6 +40,7 @@ import org.apache.spark.Partitioner;
 import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.sql.Dataset;
@@ -59,6 +60,8 @@ import org.apache.spark.util.LongAccumulator;
 import org.apache.spark.util.SerializableConfiguration;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -107,6 +110,8 @@ public final class SparkDpp implements java.io.Serializable 
{
     private SerializableConfiguration serializableHadoopConf;
     private DppResult dppResult = new DppResult();
 
+    // just for ut
+    public SparkDpp() {}
 
     public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig) {
         this.spark = spark;
@@ -180,6 +185,8 @@ public final class SparkDpp implements java.io.Serializable 
{
                                                              long tableId,
                                                              
EtlJobConfig.EtlIndex indexMeta,
                                                              
SparkRDDAggregator[] sparkRDDAggregators) throws SparkDppException {
+        // TODO(wb) should deal largint as BigInteger instead of string when 
using biginteger as key,
+        // data type may affect sorting logic
         StructType dstSchema = 
DppUtils.createDstTableSchema(indexMeta.columns, false, true);
         ExpressionEncoder encoder = RowEncoder.apply(dstSchema);
 
@@ -359,12 +366,49 @@ public final class SparkDpp implements 
java.io.Serializable {
         return Pair.of(keyMap.toArray(new Integer[keyMap.size()]), 
valueMap.toArray(new Integer[valueMap.size()]));
     }
 
-    // repartition dataframe by partitionid_bucketid
-    // so data in the same bucket will be consecutive.
-    private JavaPairRDD<List<Object>, Object[]> 
fillTupleWithPartitionColumn(SparkSession spark, Dataset<Row> dataframe,
+    /**
+     *   check decimal,char/varchar
+     */
+    public boolean validateData(Object srcValue, EtlJobConfig.EtlColumn 
etlColumn, ColumnParser columnParser, Row row) {
+
+        switch (etlColumn.columnType.toUpperCase()) {
+            case "DECIMALV2":
+                // TODO(wb):  support decimal round; see be 
DecimalV2Value::round
+                DecimalParser decimalParser = (DecimalParser) columnParser;
+                BigDecimal srcBigDecimal = (BigDecimal) srcValue;
+                if (srcValue != null && 
(decimalParser.getMaxValue().compareTo(srcBigDecimal) < 0 || 
decimalParser.getMinValue().compareTo(srcBigDecimal) > 0)) {
+                    LOG.warn(String.format("decimal value is not valid for 
defination, column=%s, value=%s,precision=%s,scale=%s",
+                            etlColumn.columnName, srcValue.toString(), 
srcBigDecimal.precision(), srcBigDecimal.scale()));
+                    return false;
+                }
+                break;
+            case "CHAR":
+            case "VARCHAR":
+                // TODO(wb) padding char type
+                try {
+                    int strSize = 0;
+                    if (srcValue != null && (strSize = 
srcValue.toString().getBytes("UTF-8").length) > etlColumn.stringLength) {
+                        LOG.warn(String.format("the length of input is too 
long than schema. column_name:%s,input_str[%s],schema length:%s,actual 
length:%s",
+                                etlColumn.columnName, row.toString(), 
etlColumn.stringLength, strSize));
+                        return false;
+                    }
+                } catch (UnsupportedEncodingException e) {
+                    LOG.warn("input string value can not encode with 
utf-8,value=" + srcValue.toString());
+                    return false;
+                }
+                break;
+        }
+        return true;
+    }
+
+    /**
+     *   1 project column and reorder column
+     *   2 validate data
+     *   3 fill tuple with partition column
+     */
+    private JavaPairRDD<List<Object>, Object[]> 
fillTupleWithPartitionColumn(Dataset<Row> dataframe,
                                                         
EtlJobConfig.EtlPartitionInfo partitionInfo,
                                                         List<Integer> 
partitionKeyIndex,
-                                                        List<Class> 
partitionKeySchema,
                                                         
List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys,
                                                         List<String> 
keyColumnNames,
                                                         List<String> 
valueColumnNames,
@@ -385,25 +429,42 @@ public final class SparkDpp implements 
java.io.Serializable {
                 }
             }
         }
+
+        List<ColumnParser> parsers = new ArrayList<>();
+        for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
+            parsers.add(ColumnParser.create(column));
+        }
+
         // use PairFlatMapFunction instead of PairMapFunction because the 
there will be
         // 0 or 1 output row for 1 input row
         JavaPairRDD<List<Object>, Object[]> resultPairRDD = 
dataframe.toJavaRDD().flatMapToPair(new PairFlatMapFunction<Row, List<Object>, 
Object[]>() {
             @Override
             public Iterator<Tuple2<List<Object>, Object[]>> call(Row row) 
throws Exception {
+                List<Tuple2<List<Object>, Object[]>> result = new 
ArrayList<>();
                 List<Object> keyColumns = new ArrayList<>();
-                Object[] valueColumns = new Object[valueColumnNames.size()];
-                for (String columnName : keyColumnNames) {
+                List<Object> valueColumns = new 
ArrayList<>(valueColumnNames.size());
+                for (int i = 0; i < keyColumnNames.size(); i++) {
+                    String columnName = keyColumnNames.get(i);
                     Object columnObject = row.get(row.fieldIndex(columnName));
+                    if(!validateData(columnObject, 
baseIndex.getColumn(columnName), parsers.get(i), row)) {
+                        abnormalRowAcc.add(1);
+                        return result.iterator();
+                    };
                     keyColumns.add(columnObject);
                 }
 
                 for (int i = 0; i < valueColumnNames.size(); i++) {
-                    valueColumns[i] = 
row.get(row.fieldIndex(valueColumnNames.get(i)));
+                    String columnName = valueColumnNames.get(i);
+                    Object columnObject = row.get(row.fieldIndex(columnName));
+                    if(!validateData(columnObject,  
baseIndex.getColumn(columnName), parsers.get(i + keyColumnNames.size()),row)) {
+                        abnormalRowAcc.add(1);
+                        return result.iterator();
+                    };
+                    valueColumns.add(columnObject);
                 }
 
                 DppColumns key = new DppColumns(keyColumns);
                 int pid = partitioner.getPartition(key);
-                List<Tuple2<List<Object>, Object[]>> result = new 
ArrayList<>();
                 if (!validPartitionIndex.contains(pid)) {
                     LOG.warn("invalid partition for row:" + row + ", pid:" + 
pid);
                     abnormalRowAcc.add(1);
@@ -414,6 +475,7 @@ public final class SparkDpp implements java.io.Serializable 
{
                         LOG.info("invalid rows contents:" + 
invalidRows.value());
                     }
                 } else {
+                    // TODO(wb) support lagreint for hash
                     long hashValue = DppUtils.getHashValue(row, 
distributeColumns, dstTableSchema);
                     int bucketId = (int) ((hashValue & 0xffffffff) % 
partitionInfo.partitions.get(pid).bucketNum);
                     long partitionId = 
partitionInfo.partitions.get(pid).partitionId;
@@ -423,7 +485,7 @@ public final class SparkDpp implements java.io.Serializable 
{
                     List<Object> tuple = new ArrayList<>();
                     tuple.add(bucketKey);
                     tuple.addAll(keyColumns);
-                    result.add(new Tuple2<>(tuple, valueColumns));
+                    result.add(new Tuple2<>(tuple, valueColumns.toArray()));
                 }
                 return result.iterator();
             }
@@ -508,17 +570,6 @@ public final class SparkDpp implements 
java.io.Serializable {
             dataframe = dataframe.withColumn(mappingColumn,
                     
functions.expr(mappingDescription).cast(dstTableSchema.apply(mappingColumn).dataType()));
         }
-        // projection and reorder the columns
-        dataframe.createOrReplaceTempView("src_table");
-        StringBuilder selectSqlBuilder = new StringBuilder();
-        selectSqlBuilder.append("select ");
-        for (String name : dstColumnNames) {
-            selectSqlBuilder.append(name + ",");
-        }
-        selectSqlBuilder.deleteCharAt(selectSqlBuilder.length() - 1);
-        selectSqlBuilder.append(" from src_table");
-        String selectSql = selectSqlBuilder.toString();
-        dataframe = spark.sql(selectSql);
         return dataframe;
     }
 
@@ -588,7 +639,8 @@ public final class SparkDpp implements java.io.Serializable 
{
                                     int index = 
dstColumnNameToIndex.get(srcColumnName);
                                     String type = 
columns.get(index).columnType;
                                     if (type.equalsIgnoreCase("CHAR")
-                                            || 
type.equalsIgnoreCase("VARCHAR")) {
+                                            || type.equalsIgnoreCase("VARCHAR")
+                                            || 
fileGroup.columnMappings.containsKey(field.name())) {
                                         continue;
                                     }
                                     ColumnParser parser = parsers.get(index);
@@ -675,6 +727,9 @@ public final class SparkDpp implements java.io.Serializable 
{
             } else if (dstClass.equals(Long.class)) {
                 return ((Double) srcValue).longValue();
             } else if (dstClass.equals(BigInteger.class)) {
+                // TODO(wb) gson will cast origin value to double by default
+                // when the partition column is largeint, this will cause 
error data
+                // need fix it thoroughly
                 return new BigInteger(((Double) srcValue).toString());
             } else if (dstClass.equals(java.sql.Date.class) || 
dstClass.equals(java.util.Date.class)) {
                 double srcValueDouble = (double)srcValue;
@@ -800,10 +855,78 @@ public final class SparkDpp implements 
java.io.Serializable {
         });
         sql.deleteCharAt(sql.length() - 1).append(" from 
").append(hiveDbTableName);
         Dataset<Row> dataframe = spark.sql(sql.toString());
+        dataframe = checkDataFromHiveWithStrictMode(dataframe, baseIndex, 
fileGroup.columnMappings.keySet(), etlJobConfig.properties.strictMode,
+                    dstTableSchema);
         dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, 
dstTableSchema, fileGroup);
         return dataframe;
     }
 
+    private Dataset<Row> checkDataFromHiveWithStrictMode(
+            Dataset<Row> dataframe, EtlJobConfig.EtlIndex baseIndex, 
Set<String> mappingColKeys, boolean isStrictMode, StructType dstTableSchema) 
throws SparkDppException {
+        List<EtlJobConfig.EtlColumn> columnNameNeedCheckArrayList = new 
ArrayList<>();
+        List<ColumnParser> columnParserArrayList = new ArrayList<>();
+        for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
+            if (!StringUtils.equalsIgnoreCase(column.columnType, "varchar") &&
+                    !StringUtils.equalsIgnoreCase(column.columnType, "char") &&
+                    !mappingColKeys.contains(column.columnName)) {
+                columnNameNeedCheckArrayList.add(column);
+                columnParserArrayList.add(ColumnParser.create(column));
+            }
+        }
+
+        ColumnParser[] columnParserArray = columnParserArrayList.toArray(new 
ColumnParser[columnParserArrayList.size()]);
+        EtlJobConfig.EtlColumn[] columnNameArray = 
columnNameNeedCheckArrayList.toArray(new 
EtlJobConfig.EtlColumn[columnNameNeedCheckArrayList.size()]);
+
+        JavaRDD<Row> result = dataframe.toJavaRDD().flatMap(new 
FlatMapFunction<Row, Row>() {
+            @Override
+            public Iterator<Row> call(Row row) throws Exception {
+                List<Row> result = new ArrayList<>();
+                Set<Integer> columnIndexNeedToRepalceNull = new 
HashSet<Integer>();
+                boolean validRow = true;
+                for (int i = 0; i < columnNameArray.length; i++) {
+                    EtlJobConfig.EtlColumn column = columnNameArray[i];
+                    int fieldIndex = row.fieldIndex(column.columnName);
+                    Object value = row.get(fieldIndex);
+                    if (value == null && !column.isAllowNull) {
+                        validRow = false;
+                        LOG.warn("column:" + i + " can not be null. row:" + 
row.toString());
+                        break;
+                    }
+                    if (value != null && 
!columnParserArray[i].parse(value.toString())) {
+                        if (isStrictMode) {
+                            validRow = false;
+                            LOG.warn(String.format("row parsed failed in 
strict mode, column name %s, src row %s", column.columnName, row.toString()));
+                        } else {
+                            columnIndexNeedToRepalceNull.add(fieldIndex);
+                        }
+                    }
+                }
+                if (!validRow) {
+                    abnormalRowAcc.add(1);
+                    // at most add 5 rows to invalidRows
+                    if (abnormalRowAcc.value() <= 5) {
+                        invalidRows.add(row.toString());
+                    }
+                } if (columnIndexNeedToRepalceNull.size() != 0) {
+                    Object[] newRow = new Object[row.size()];
+                    for (int i = 0; i < row.size(); i++) {
+                        if (columnIndexNeedToRepalceNull.contains(i)) {
+                            newRow[i] = null;
+                        } else {
+                            newRow[i] = row.get(i);
+                        }
+                    }
+                    result.add(RowFactory.create(newRow));
+                } else {
+                    result.add(row);
+                }
+                return result.iterator();
+            }
+        });
+
+        return spark.createDataFrame(result, dstTableSchema);
+    }
+
     private void process() throws Exception {
         try {
             for (Map.Entry<Long, EtlJobConfig.EtlTable> entry : 
etlJobConfig.tables.entrySet()) {
@@ -872,9 +995,10 @@ public final class SparkDpp implements 
java.io.Serializable {
                         unselectedRowAcc.add(currentSize - originalSize);
                     }
 
-                    JavaPairRDD<List<Object>, Object[]> ret = 
fillTupleWithPartitionColumn(spark, fileGroupDataframe,
+                    JavaPairRDD<List<Object>, Object[]> ret = 
fillTupleWithPartitionColumn(
+                            fileGroupDataframe,
                             partitionInfo, partitionKeyIndex,
-                            partitionKeySchema, partitionRangeKeys,
+                            partitionRangeKeys,
                             keyColumnNames, valueColumnNames,
                             dstTableSchema, baseIndex, fileGroup.partitions);
                     if (tablePairRDD == null) {
diff --git 
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java
 
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java
index 4682fdc..7f1d030 100644
--- 
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java
+++ 
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java
@@ -32,6 +32,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -52,7 +53,6 @@ public abstract class SparkRDDAggregator<T> implements 
Serializable {
         return value;
     };
 
-    // TODO(wb) support more datatype:decimal,date,datetime
     public static SparkRDDAggregator buildAggregator(EtlJobConfig.EtlColumn 
column) throws SparkDppException {
         String aggType = StringUtils.lowerCase(column.aggregationType);
         String columnType = StringUtils.lowerCase(column.columnType);
@@ -69,6 +69,9 @@ public abstract class SparkRDDAggregator<T> implements 
Serializable {
                     case "bigint":
                     case "float":
                     case "double":
+                    case "decimalv2":
+                    case "date":
+                    case "datetime":
                         return new NumberMaxAggregator();
                     case "char":
                     case "varchar":
@@ -86,6 +89,9 @@ public abstract class SparkRDDAggregator<T> implements 
Serializable {
                     case "bigint":
                     case "float":
                     case "double":
+                    case "decimalv2":
+                    case "date":
+                    case "datetime":
                         return new NumberMinAggregator();
                     case "char":
                     case "varchar":
@@ -111,6 +117,8 @@ public abstract class SparkRDDAggregator<T> implements 
Serializable {
                         return new DoubleSumAggregator();
                     case "largeint":
                         return new LargeIntSumAggregator();
+                    case "decimalv2":
+                        return new BigDecimalSumAggregator();
                     default:
                         throw new SparkDppException(String.format("unsupported 
sum aggregator for column type:%s", columnType));
                 }
@@ -324,6 +332,12 @@ class LargeIntMaxAggregator extends 
SparkRDDAggregator<BigInteger> {
         }
         return dst.compareTo(src) > 0 ? dst : src;
     }
+
+    @Override
+    String finalize(Object value) {
+        BigInteger bigInteger = (BigInteger) value;
+        return bigInteger.toString();
+    }
 }
 
 class LargeIntMinAggregator extends LargeIntMaxAggregator {
@@ -394,7 +408,6 @@ class LongSumAggregator extends SparkRDDAggregator<Long> {
         if (dst == null) {
             return src;
         }
-        // TODO(wb) check overflow of long type
         return dst + src;
     }
 }
@@ -409,11 +422,9 @@ class ShortSumAggregator extends SparkRDDAggregator<Short> 
{
         if (dst == null) {
             return src;
         }
-        Integer ret = dst + src;
-        if  (ret > Short.MAX_VALUE || ret < Short.MIN_VALUE) {
-            throw new RuntimeException("short column sum size exceeds 
Short.MAX_VALUE or Short.MIN_VALUE");
-        }
-        return Short.valueOf(ret.toString());
+        int ret = dst + src;
+        // here may overflow, just keep the same logic with be
+        return (short)ret;
     }
 }
 
@@ -428,9 +439,7 @@ class IntSumAggregator extends SparkRDDAggregator<Integer> {
             return src;
         }
         long ret = Long.sum(dst, src);
-        if  (ret > Integer.MAX_VALUE || ret < Integer.MIN_VALUE) {
-            throw new RuntimeException("int column sum size exceeds 
Integer.MAX_VALUE or Integer.MIN_VALUE");
-        }
+        // here may overflow, just keep the same logic with be
         return (int) ret;
     }
 }
@@ -445,11 +454,9 @@ class ByteSumAggregator extends SparkRDDAggregator<Byte> {
         if (dst == null) {
             return src;
         }
-        Integer ret = dst + src;
-        if  (ret > Byte.MAX_VALUE || ret < Byte.MIN_VALUE) {
-            throw new RuntimeException("byte column sum size exceeds 
Byte.MAX_VALUE or Byte.MIN_VALUE");
-        }
-        return Byte.valueOf(ret.toString());
+        int ret = dst + src;
+        // here may overflow, just keep the same logic with be
+        return (byte)ret;
     }
 }
 
@@ -467,7 +474,6 @@ class DoubleSumAggregator extends 
SparkRDDAggregator<Double> {
     }
 }
 
-// TODO(wb) add bound check for float/double
 class FloatSumAggregator extends SparkRDDAggregator<Float> {
 
     @Override
@@ -510,6 +516,21 @@ class StringMinAggregator extends 
SparkRDDAggregator<String> {
     }
 }
 
+class BigDecimalSumAggregator extends SparkRDDAggregator<BigDecimal> {
+
+
+    @Override
+    BigDecimal update(BigDecimal src, BigDecimal dst) {
+        if (src == null) {
+            return dst;
+        }
+        if (dst == null) {
+            return src;
+        }
+        return src.add(dst);
+    }
+}
+
 
 class BucketComparator implements Comparator<List<Object>>, Serializable {
 
diff --git 
a/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/ColumnParserTest.java
 
b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/ColumnParserTest.java
new file mode 100644
index 0000000..cfb4122
--- /dev/null
+++ 
b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/ColumnParserTest.java
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ColumnParserTest {
+
+    // TODO(wb) try to keep ut consistent with be's ut
+    @Test
+    public void testBoundCheck() {
+        // tinyint
+        TinyIntParser tinyIntParser = new TinyIntParser();
+        // 1 normal
+        String tinyint = "100";
+        Assert.assertTrue(tinyIntParser.parse(tinyint));
+        // 2 upper
+        String tinyintUpper = "128";
+        Assert.assertFalse(tinyIntParser.parse(tinyintUpper));
+        // 3 lower
+        String tinyintLower = "-129";
+        Assert.assertFalse(tinyIntParser.parse(tinyintLower));
+
+        // smallint
+        SmallIntParser smallIntParser = new SmallIntParser();
+        // 1 normal
+        String smallint = "100";
+        Assert.assertTrue(smallIntParser.parse(smallint));
+        // 2 upper
+        String smallintUpper = "32768";
+        Assert.assertFalse(smallIntParser.parse(smallintUpper));
+        // 3 lower
+        String smallintLower = "-32769";
+        Assert.assertFalse(smallIntParser.parse(smallintLower));
+
+        // int
+        IntParser intParser = new IntParser();
+        // 1 normal
+        String intValue = "100";
+        Assert.assertTrue(intParser.parse(intValue));
+        // 2 upper
+        String intUpper = "2147483648";
+        Assert.assertFalse(intParser.parse(intUpper));
+        // 3 lower
+        String intLower = "-2147483649";
+        Assert.assertFalse(intParser.parse(intLower));
+
+        // bigint
+        BigIntParser bigIntParser = new BigIntParser();
+        // 1 normal
+        String bigint = "100";
+        Assert.assertTrue(bigIntParser.parse(bigint));
+        // 2 upper
+        String bigintUpper = "9223372036854775808";
+        Assert.assertFalse(bigIntParser.parse(bigintUpper));
+        // 3 lower
+        String bigintLower = "-9223372036854775809";
+        Assert.assertFalse(bigIntParser.parse(bigintLower));
+
+        // largeint
+        LargeIntParser largeIntParser = new LargeIntParser();
+        // 1 normal
+        String largeint = "100";
+        Assert.assertTrue(largeIntParser.parse(largeint));
+        // 2 upper
+        String largeintUpper = "170141183460469231731687303715884105728";
+        Assert.assertFalse(largeIntParser.parse(largeintUpper));
+        // 3 lower
+        String largeintLower = "-170141183460469231731687303715884105729";
+        Assert.assertFalse(largeIntParser.parse(largeintLower));
+
+        // float
+        FloatParser floatParser = new FloatParser();
+        // normal
+        String floatValue = "1.1";
+        Assert.assertTrue(floatParser.parse(floatValue));
+        // inf
+        String inf = "Infinity";
+        Assert.assertFalse(floatParser.parse(inf));
+        // nan
+        String nan = "NaN";
+        // failed
+        Assert.assertFalse(floatParser.parse(nan));
+
+        // double
+        DoubleParser doubleParser = new DoubleParser();
+        // normal
+        Assert.assertTrue(doubleParser.parse(floatValue));
+        // inf
+        Assert.assertFalse(doubleParser.parse(inf));
+        // nan
+        Assert.assertFalse(doubleParser.parse(nan));
+
+        // decimal
+        EtlJobConfig.EtlColumn etlColumn = new EtlJobConfig.EtlColumn();
+        etlColumn.precision = 5;
+        etlColumn.scale = 3;
+        DecimalParser decimalParser = new DecimalParser(etlColumn);
+        // normal
+        String decimalValue = "10.333";
+        Assert.assertTrue(decimalParser.parse(decimalValue));
+        // overflow
+        String decimalOverflow = "1000.3333333333";
+        Assert.assertFalse(decimalParser.parse(decimalOverflow));
+
+        // string
+        EtlJobConfig.EtlColumn stringColumn = new EtlJobConfig.EtlColumn();
+        stringColumn.stringLength = 3;
+        StringParser stringParser = new StringParser(stringColumn);
+        // normal
+        String stringnormal = "a";
+        Assert.assertTrue(stringParser.parse(stringnormal));
+        // overflow
+        String stringoverflow = "中文";
+        Assert.assertFalse(stringParser.parse(stringoverflow));
+    }
+
+}
diff --git 
a/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/SparkDppTest.java 
b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/SparkDppTest.java
new file mode 100644
index 0000000..9cec220
--- /dev/null
+++ 
b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/SparkDppTest.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+
+public class SparkDppTest {
+
+    @Test
+    public void testValidateData() {
+        SparkDpp sparkDpp = new SparkDpp();
+
+        // decimal
+        EtlJobConfig.EtlColumn etlColumn = new EtlJobConfig.EtlColumn();
+        etlColumn.columnType = "DECIMALV2";
+        etlColumn.precision = 3;
+        etlColumn.scale = 2;
+
+        DecimalParser decimalParser = new DecimalParser(etlColumn);
+        // test max/min
+        
Assert.assertTrue(decimalParser.getMaxValue().toString().equals("9.99"));
+        
Assert.assertTrue(decimalParser.getMinValue().toString().equals("-9.99"));
+        // normal
+        BigDecimal bigDecimal = new BigDecimal("1.21");
+        Assert.assertTrue(sparkDpp.validateData(bigDecimal, etlColumn, 
decimalParser, RowFactory.create(bigDecimal)));
+        // failed
+        BigDecimal bigDecimalFailed = new BigDecimal("10");
+        Assert.assertFalse(sparkDpp.validateData(bigDecimalFailed, etlColumn, 
decimalParser, RowFactory.create(bigDecimalFailed)));
+
+        // string
+        EtlJobConfig.EtlColumn stringColumn = new EtlJobConfig.EtlColumn();
+        stringColumn.stringLength = 3;
+        stringColumn.columnType = "VARCHAR";
+        StringParser stringParser = new StringParser(stringColumn);
+        // normal
+        String normalString = "a1";
+        Assert.assertTrue(sparkDpp.validateData(normalString, stringColumn, 
stringParser, RowFactory.create(normalString)));
+        // cn normal
+        String normalStringCN = "中";
+        Assert.assertTrue(sparkDpp.validateData(normalStringCN, stringColumn, 
stringParser, RowFactory.create(normalStringCN)));
+        // cn failed
+        String failedStringCN = "中a";
+        Assert.assertFalse(sparkDpp.validateData(failedStringCN, stringColumn, 
stringParser, RowFactory.create(failedStringCN)));
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to