This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-0.13 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 33b6206f699b30eb5b55f4bb28f3991a19c2862a Author: wangbo <506340...@qq.com> AuthorDate: Sun Sep 13 11:57:33 2020 +0800 [SparkDpp] Support complete types (#4524) For[Spark Load] 1 support decimal andl largeint 2 add validate logic for char/varchar/decimal 3 check data load from hive with strict mode 4 support decimal/date/datetime aggregator --- be/src/olap/push_handler.cpp | 2 +- .../apache/doris/load/loadv2/dpp/ColumnParser.java | 89 +++++++++-- .../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 170 ++++++++++++++++++--- .../doris/load/loadv2/dpp/SparkRDDAggregator.java | 53 +++++-- .../doris/load/loadv2/dpp/ColumnParserTest.java | 135 ++++++++++++++++ .../apache/doris/load/loadv2/dpp/SparkDppTest.java | 67 ++++++++ 6 files changed, 467 insertions(+), 49 deletions(-) diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 2a06329..cfd221d 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -1026,7 +1026,7 @@ OLAPStatus PushBrokerReader::next(ContiguousRow* row) { const void* value = _tuple->get_slot(slot->tuple_offset()); // try execute init method defined in aggregateInfo // by default it only copies data into cell - _schema->column(i)->consume(&cell, (const char*)value, is_null, + _schema->column(i)->consume(&cell, (const char*)value, is_null, _mem_pool.get(), _runtime_state->obj_pool()); // if column(i) is a value column, try execute finalize method defined in aggregateInfo // to convert data into final format diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java index d5e0cee..c9d6a42 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java @@ -19,13 +19,20 @@ package org.apache.doris.load.loadv2.dpp; import org.apache.doris.common.SparkDppException; import org.apache.doris.load.loadv2.etl.EtlJobConfig; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.joda.time.DateTime; import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; import java.util.Date; // Parser to validate value for different type public abstract class ColumnParser implements Serializable { + + protected static final Logger LOG = LogManager.getLogger(ColumnParser.class); + public static ColumnParser create(EtlJobConfig.EtlColumn etlColumn) throws SparkDppException { String columnType = etlColumn.columnType; if (columnType.equalsIgnoreCase("TINYINT")) { @@ -51,6 +58,10 @@ public abstract class ColumnParser implements Serializable { || columnType.equalsIgnoreCase("BITMAP") || columnType.equalsIgnoreCase("HLL")) { return new StringParser(etlColumn); + } else if (columnType.equalsIgnoreCase("DECIMALV2")) { + return new DecimalParser(etlColumn); + } else if (columnType.equalsIgnoreCase("LARGEINT")) { + return new LargeIntParser(); } else { throw new SparkDppException("unsupported type:" + columnType); } @@ -63,10 +74,7 @@ class TinyIntParser extends ColumnParser { @Override public boolean parse(String value) { try { - Short parsed = Short.parseShort(value); - if (parsed > 127 || parsed < -128) { - return false; - } + Byte.parseByte(value); } catch (NumberFormatException e) { return false; } @@ -102,7 +110,7 @@ class BigIntParser extends ColumnParser { @Override public boolean parse(String value) { try { - Integer.parseInt(value); + Long.parseLong(value); } catch (NumberFormatException e) { return false; } @@ -114,11 +122,11 @@ class FloatParser extends ColumnParser { @Override public boolean parse(String value) { try { - Float.parseFloat(value); + Float ret = Float.parseFloat(value); + return !ret.isNaN() && !ret.isInfinite(); } catch (NumberFormatException e) { return false; } - return true; } } @@ -126,11 +134,11 @@ class DoubleParser extends ColumnParser { @Override public boolean parse(String value) { try { - Double.parseDouble(value); + Double ret = Double.parseDouble(value); + return !ret.isInfinite() && !ret.isNaN(); } catch (NumberFormatException e) { return false; } - return true; } } @@ -186,4 +194,67 @@ class StringParser extends ColumnParser { throw new RuntimeException("string check failed ", e); } } +} + +class DecimalParser extends ColumnParser { + + public static int PRECISION = 27; + public static int SCALE = 9; + + private BigDecimal maxValue; + private BigDecimal minValue; + + public DecimalParser(EtlJobConfig.EtlColumn etlColumn) { + StringBuilder precisionStr = new StringBuilder(); + for (int i = 0; i < etlColumn.precision - etlColumn.scale; i++) { + precisionStr.append("9"); + } + StringBuilder scaleStr = new StringBuilder(); + for (int i = 0; i < etlColumn.scale; i++) { + scaleStr.append("9"); + } + maxValue = new BigDecimal(precisionStr.toString() + "." + scaleStr.toString()); + minValue = new BigDecimal("-" + precisionStr.toString() + "." + scaleStr.toString()); + } + + @Override + public boolean parse(String value) { + try { + BigDecimal bigDecimal = new BigDecimal(value); + return bigDecimal.precision() - bigDecimal.scale() <= PRECISION - SCALE && bigDecimal.scale() <= SCALE; + } catch (NumberFormatException e) { + return false; + } catch (Exception e) { + throw new RuntimeException("decimal parse failed ", e); + } + } + + public BigDecimal getMaxValue() { + return maxValue; + } + + public BigDecimal getMinValue() { + return minValue; + } +} + +class LargeIntParser extends ColumnParser { + + private BigInteger maxValue = new BigInteger("170141183460469231731687303715884105727"); + private BigInteger minValue = new BigInteger("-170141183460469231731687303715884105728"); + + @Override + public boolean parse(String value) { + try { + BigInteger inputValue = new BigInteger(value); + return inputValue.compareTo(maxValue) < 0 && inputValue.compareTo(minValue) > 0; + } catch (NumberFormatException e) { + return false; + } catch (ArithmeticException e) { + LOG.warn("int value is too big even for java BigInteger,value={}" + value); + return false; + } catch (Exception e) { + throw new RuntimeException("large int parse failed:" + value, e); + } + } } \ No newline at end of file diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java index fd71add..e4c3a23 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java @@ -40,6 +40,7 @@ import org.apache.spark.Partitioner; import org.apache.spark.TaskContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.Dataset; @@ -59,6 +60,8 @@ import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.SerializableConfiguration; import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; import java.math.BigInteger; import java.net.URI; import java.net.URISyntaxException; @@ -107,6 +110,8 @@ public final class SparkDpp implements java.io.Serializable { private SerializableConfiguration serializableHadoopConf; private DppResult dppResult = new DppResult(); + // just for ut + public SparkDpp() {} public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig) { this.spark = spark; @@ -180,6 +185,8 @@ public final class SparkDpp implements java.io.Serializable { long tableId, EtlJobConfig.EtlIndex indexMeta, SparkRDDAggregator[] sparkRDDAggregators) throws SparkDppException { + // TODO(wb) should deal largint as BigInteger instead of string when using biginteger as key, + // data type may affect sorting logic StructType dstSchema = DppUtils.createDstTableSchema(indexMeta.columns, false, true); ExpressionEncoder encoder = RowEncoder.apply(dstSchema); @@ -359,12 +366,49 @@ public final class SparkDpp implements java.io.Serializable { return Pair.of(keyMap.toArray(new Integer[keyMap.size()]), valueMap.toArray(new Integer[valueMap.size()])); } - // repartition dataframe by partitionid_bucketid - // so data in the same bucket will be consecutive. - private JavaPairRDD<List<Object>, Object[]> fillTupleWithPartitionColumn(SparkSession spark, Dataset<Row> dataframe, + /** + * check decimal,char/varchar + */ + public boolean validateData(Object srcValue, EtlJobConfig.EtlColumn etlColumn, ColumnParser columnParser, Row row) { + + switch (etlColumn.columnType.toUpperCase()) { + case "DECIMALV2": + // TODO(wb): support decimal round; see be DecimalV2Value::round + DecimalParser decimalParser = (DecimalParser) columnParser; + BigDecimal srcBigDecimal = (BigDecimal) srcValue; + if (srcValue != null && (decimalParser.getMaxValue().compareTo(srcBigDecimal) < 0 || decimalParser.getMinValue().compareTo(srcBigDecimal) > 0)) { + LOG.warn(String.format("decimal value is not valid for defination, column=%s, value=%s,precision=%s,scale=%s", + etlColumn.columnName, srcValue.toString(), srcBigDecimal.precision(), srcBigDecimal.scale())); + return false; + } + break; + case "CHAR": + case "VARCHAR": + // TODO(wb) padding char type + try { + int strSize = 0; + if (srcValue != null && (strSize = srcValue.toString().getBytes("UTF-8").length) > etlColumn.stringLength) { + LOG.warn(String.format("the length of input is too long than schema. column_name:%s,input_str[%s],schema length:%s,actual length:%s", + etlColumn.columnName, row.toString(), etlColumn.stringLength, strSize)); + return false; + } + } catch (UnsupportedEncodingException e) { + LOG.warn("input string value can not encode with utf-8,value=" + srcValue.toString()); + return false; + } + break; + } + return true; + } + + /** + * 1 project column and reorder column + * 2 validate data + * 3 fill tuple with partition column + */ + private JavaPairRDD<List<Object>, Object[]> fillTupleWithPartitionColumn(Dataset<Row> dataframe, EtlJobConfig.EtlPartitionInfo partitionInfo, List<Integer> partitionKeyIndex, - List<Class> partitionKeySchema, List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys, List<String> keyColumnNames, List<String> valueColumnNames, @@ -385,25 +429,42 @@ public final class SparkDpp implements java.io.Serializable { } } } + + List<ColumnParser> parsers = new ArrayList<>(); + for (EtlJobConfig.EtlColumn column : baseIndex.columns) { + parsers.add(ColumnParser.create(column)); + } + // use PairFlatMapFunction instead of PairMapFunction because the there will be // 0 or 1 output row for 1 input row JavaPairRDD<List<Object>, Object[]> resultPairRDD = dataframe.toJavaRDD().flatMapToPair(new PairFlatMapFunction<Row, List<Object>, Object[]>() { @Override public Iterator<Tuple2<List<Object>, Object[]>> call(Row row) throws Exception { + List<Tuple2<List<Object>, Object[]>> result = new ArrayList<>(); List<Object> keyColumns = new ArrayList<>(); - Object[] valueColumns = new Object[valueColumnNames.size()]; - for (String columnName : keyColumnNames) { + List<Object> valueColumns = new ArrayList<>(valueColumnNames.size()); + for (int i = 0; i < keyColumnNames.size(); i++) { + String columnName = keyColumnNames.get(i); Object columnObject = row.get(row.fieldIndex(columnName)); + if(!validateData(columnObject, baseIndex.getColumn(columnName), parsers.get(i), row)) { + abnormalRowAcc.add(1); + return result.iterator(); + }; keyColumns.add(columnObject); } for (int i = 0; i < valueColumnNames.size(); i++) { - valueColumns[i] = row.get(row.fieldIndex(valueColumnNames.get(i))); + String columnName = valueColumnNames.get(i); + Object columnObject = row.get(row.fieldIndex(columnName)); + if(!validateData(columnObject, baseIndex.getColumn(columnName), parsers.get(i + keyColumnNames.size()),row)) { + abnormalRowAcc.add(1); + return result.iterator(); + }; + valueColumns.add(columnObject); } DppColumns key = new DppColumns(keyColumns); int pid = partitioner.getPartition(key); - List<Tuple2<List<Object>, Object[]>> result = new ArrayList<>(); if (!validPartitionIndex.contains(pid)) { LOG.warn("invalid partition for row:" + row + ", pid:" + pid); abnormalRowAcc.add(1); @@ -414,6 +475,7 @@ public final class SparkDpp implements java.io.Serializable { LOG.info("invalid rows contents:" + invalidRows.value()); } } else { + // TODO(wb) support lagreint for hash long hashValue = DppUtils.getHashValue(row, distributeColumns, dstTableSchema); int bucketId = (int) ((hashValue & 0xffffffff) % partitionInfo.partitions.get(pid).bucketNum); long partitionId = partitionInfo.partitions.get(pid).partitionId; @@ -423,7 +485,7 @@ public final class SparkDpp implements java.io.Serializable { List<Object> tuple = new ArrayList<>(); tuple.add(bucketKey); tuple.addAll(keyColumns); - result.add(new Tuple2<>(tuple, valueColumns)); + result.add(new Tuple2<>(tuple, valueColumns.toArray())); } return result.iterator(); } @@ -508,17 +570,6 @@ public final class SparkDpp implements java.io.Serializable { dataframe = dataframe.withColumn(mappingColumn, functions.expr(mappingDescription).cast(dstTableSchema.apply(mappingColumn).dataType())); } - // projection and reorder the columns - dataframe.createOrReplaceTempView("src_table"); - StringBuilder selectSqlBuilder = new StringBuilder(); - selectSqlBuilder.append("select "); - for (String name : dstColumnNames) { - selectSqlBuilder.append(name + ","); - } - selectSqlBuilder.deleteCharAt(selectSqlBuilder.length() - 1); - selectSqlBuilder.append(" from src_table"); - String selectSql = selectSqlBuilder.toString(); - dataframe = spark.sql(selectSql); return dataframe; } @@ -588,7 +639,8 @@ public final class SparkDpp implements java.io.Serializable { int index = dstColumnNameToIndex.get(srcColumnName); String type = columns.get(index).columnType; if (type.equalsIgnoreCase("CHAR") - || type.equalsIgnoreCase("VARCHAR")) { + || type.equalsIgnoreCase("VARCHAR") + || fileGroup.columnMappings.containsKey(field.name())) { continue; } ColumnParser parser = parsers.get(index); @@ -675,6 +727,9 @@ public final class SparkDpp implements java.io.Serializable { } else if (dstClass.equals(Long.class)) { return ((Double) srcValue).longValue(); } else if (dstClass.equals(BigInteger.class)) { + // TODO(wb) gson will cast origin value to double by default + // when the partition column is largeint, this will cause error data + // need fix it thoroughly return new BigInteger(((Double) srcValue).toString()); } else if (dstClass.equals(java.sql.Date.class) || dstClass.equals(java.util.Date.class)) { double srcValueDouble = (double)srcValue; @@ -800,10 +855,78 @@ public final class SparkDpp implements java.io.Serializable { }); sql.deleteCharAt(sql.length() - 1).append(" from ").append(hiveDbTableName); Dataset<Row> dataframe = spark.sql(sql.toString()); + dataframe = checkDataFromHiveWithStrictMode(dataframe, baseIndex, fileGroup.columnMappings.keySet(), etlJobConfig.properties.strictMode, + dstTableSchema); dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup); return dataframe; } + private Dataset<Row> checkDataFromHiveWithStrictMode( + Dataset<Row> dataframe, EtlJobConfig.EtlIndex baseIndex, Set<String> mappingColKeys, boolean isStrictMode, StructType dstTableSchema) throws SparkDppException { + List<EtlJobConfig.EtlColumn> columnNameNeedCheckArrayList = new ArrayList<>(); + List<ColumnParser> columnParserArrayList = new ArrayList<>(); + for (EtlJobConfig.EtlColumn column : baseIndex.columns) { + if (!StringUtils.equalsIgnoreCase(column.columnType, "varchar") && + !StringUtils.equalsIgnoreCase(column.columnType, "char") && + !mappingColKeys.contains(column.columnName)) { + columnNameNeedCheckArrayList.add(column); + columnParserArrayList.add(ColumnParser.create(column)); + } + } + + ColumnParser[] columnParserArray = columnParserArrayList.toArray(new ColumnParser[columnParserArrayList.size()]); + EtlJobConfig.EtlColumn[] columnNameArray = columnNameNeedCheckArrayList.toArray(new EtlJobConfig.EtlColumn[columnNameNeedCheckArrayList.size()]); + + JavaRDD<Row> result = dataframe.toJavaRDD().flatMap(new FlatMapFunction<Row, Row>() { + @Override + public Iterator<Row> call(Row row) throws Exception { + List<Row> result = new ArrayList<>(); + Set<Integer> columnIndexNeedToRepalceNull = new HashSet<Integer>(); + boolean validRow = true; + for (int i = 0; i < columnNameArray.length; i++) { + EtlJobConfig.EtlColumn column = columnNameArray[i]; + int fieldIndex = row.fieldIndex(column.columnName); + Object value = row.get(fieldIndex); + if (value == null && !column.isAllowNull) { + validRow = false; + LOG.warn("column:" + i + " can not be null. row:" + row.toString()); + break; + } + if (value != null && !columnParserArray[i].parse(value.toString())) { + if (isStrictMode) { + validRow = false; + LOG.warn(String.format("row parsed failed in strict mode, column name %s, src row %s", column.columnName, row.toString())); + } else { + columnIndexNeedToRepalceNull.add(fieldIndex); + } + } + } + if (!validRow) { + abnormalRowAcc.add(1); + // at most add 5 rows to invalidRows + if (abnormalRowAcc.value() <= 5) { + invalidRows.add(row.toString()); + } + } if (columnIndexNeedToRepalceNull.size() != 0) { + Object[] newRow = new Object[row.size()]; + for (int i = 0; i < row.size(); i++) { + if (columnIndexNeedToRepalceNull.contains(i)) { + newRow[i] = null; + } else { + newRow[i] = row.get(i); + } + } + result.add(RowFactory.create(newRow)); + } else { + result.add(row); + } + return result.iterator(); + } + }); + + return spark.createDataFrame(result, dstTableSchema); + } + private void process() throws Exception { try { for (Map.Entry<Long, EtlJobConfig.EtlTable> entry : etlJobConfig.tables.entrySet()) { @@ -872,9 +995,10 @@ public final class SparkDpp implements java.io.Serializable { unselectedRowAcc.add(currentSize - originalSize); } - JavaPairRDD<List<Object>, Object[]> ret = fillTupleWithPartitionColumn(spark, fileGroupDataframe, + JavaPairRDD<List<Object>, Object[]> ret = fillTupleWithPartitionColumn( + fileGroupDataframe, partitionInfo, partitionKeyIndex, - partitionKeySchema, partitionRangeKeys, + partitionRangeKeys, keyColumnNames, valueColumnNames, dstTableSchema, baseIndex, fileGroup.partitions); if (tablePairRDD == null) { diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java index 4682fdc..7f1d030 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java @@ -32,6 +32,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.Serializable; +import java.math.BigDecimal; import java.math.BigInteger; import java.util.ArrayList; import java.util.Comparator; @@ -52,7 +53,6 @@ public abstract class SparkRDDAggregator<T> implements Serializable { return value; }; - // TODO(wb) support more datatype:decimal,date,datetime public static SparkRDDAggregator buildAggregator(EtlJobConfig.EtlColumn column) throws SparkDppException { String aggType = StringUtils.lowerCase(column.aggregationType); String columnType = StringUtils.lowerCase(column.columnType); @@ -69,6 +69,9 @@ public abstract class SparkRDDAggregator<T> implements Serializable { case "bigint": case "float": case "double": + case "decimalv2": + case "date": + case "datetime": return new NumberMaxAggregator(); case "char": case "varchar": @@ -86,6 +89,9 @@ public abstract class SparkRDDAggregator<T> implements Serializable { case "bigint": case "float": case "double": + case "decimalv2": + case "date": + case "datetime": return new NumberMinAggregator(); case "char": case "varchar": @@ -111,6 +117,8 @@ public abstract class SparkRDDAggregator<T> implements Serializable { return new DoubleSumAggregator(); case "largeint": return new LargeIntSumAggregator(); + case "decimalv2": + return new BigDecimalSumAggregator(); default: throw new SparkDppException(String.format("unsupported sum aggregator for column type:%s", columnType)); } @@ -324,6 +332,12 @@ class LargeIntMaxAggregator extends SparkRDDAggregator<BigInteger> { } return dst.compareTo(src) > 0 ? dst : src; } + + @Override + String finalize(Object value) { + BigInteger bigInteger = (BigInteger) value; + return bigInteger.toString(); + } } class LargeIntMinAggregator extends LargeIntMaxAggregator { @@ -394,7 +408,6 @@ class LongSumAggregator extends SparkRDDAggregator<Long> { if (dst == null) { return src; } - // TODO(wb) check overflow of long type return dst + src; } } @@ -409,11 +422,9 @@ class ShortSumAggregator extends SparkRDDAggregator<Short> { if (dst == null) { return src; } - Integer ret = dst + src; - if (ret > Short.MAX_VALUE || ret < Short.MIN_VALUE) { - throw new RuntimeException("short column sum size exceeds Short.MAX_VALUE or Short.MIN_VALUE"); - } - return Short.valueOf(ret.toString()); + int ret = dst + src; + // here may overflow, just keep the same logic with be + return (short)ret; } } @@ -428,9 +439,7 @@ class IntSumAggregator extends SparkRDDAggregator<Integer> { return src; } long ret = Long.sum(dst, src); - if (ret > Integer.MAX_VALUE || ret < Integer.MIN_VALUE) { - throw new RuntimeException("int column sum size exceeds Integer.MAX_VALUE or Integer.MIN_VALUE"); - } + // here may overflow, just keep the same logic with be return (int) ret; } } @@ -445,11 +454,9 @@ class ByteSumAggregator extends SparkRDDAggregator<Byte> { if (dst == null) { return src; } - Integer ret = dst + src; - if (ret > Byte.MAX_VALUE || ret < Byte.MIN_VALUE) { - throw new RuntimeException("byte column sum size exceeds Byte.MAX_VALUE or Byte.MIN_VALUE"); - } - return Byte.valueOf(ret.toString()); + int ret = dst + src; + // here may overflow, just keep the same logic with be + return (byte)ret; } } @@ -467,7 +474,6 @@ class DoubleSumAggregator extends SparkRDDAggregator<Double> { } } -// TODO(wb) add bound check for float/double class FloatSumAggregator extends SparkRDDAggregator<Float> { @Override @@ -510,6 +516,21 @@ class StringMinAggregator extends SparkRDDAggregator<String> { } } +class BigDecimalSumAggregator extends SparkRDDAggregator<BigDecimal> { + + + @Override + BigDecimal update(BigDecimal src, BigDecimal dst) { + if (src == null) { + return dst; + } + if (dst == null) { + return src; + } + return src.add(dst); + } +} + class BucketComparator implements Comparator<List<Object>>, Serializable { diff --git a/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/ColumnParserTest.java b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/ColumnParserTest.java new file mode 100644 index 0000000..cfb4122 --- /dev/null +++ b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/ColumnParserTest.java @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package org.apache.doris.load.loadv2.dpp; + +import org.apache.doris.load.loadv2.etl.EtlJobConfig; +import org.junit.Assert; +import org.junit.Test; + +public class ColumnParserTest { + + // TODO(wb) try to keep ut consistent with be's ut + @Test + public void testBoundCheck() { + // tinyint + TinyIntParser tinyIntParser = new TinyIntParser(); + // 1 normal + String tinyint = "100"; + Assert.assertTrue(tinyIntParser.parse(tinyint)); + // 2 upper + String tinyintUpper = "128"; + Assert.assertFalse(tinyIntParser.parse(tinyintUpper)); + // 3 lower + String tinyintLower = "-129"; + Assert.assertFalse(tinyIntParser.parse(tinyintLower)); + + // smallint + SmallIntParser smallIntParser = new SmallIntParser(); + // 1 normal + String smallint = "100"; + Assert.assertTrue(smallIntParser.parse(smallint)); + // 2 upper + String smallintUpper = "32768"; + Assert.assertFalse(smallIntParser.parse(smallintUpper)); + // 3 lower + String smallintLower = "-32769"; + Assert.assertFalse(smallIntParser.parse(smallintLower)); + + // int + IntParser intParser = new IntParser(); + // 1 normal + String intValue = "100"; + Assert.assertTrue(intParser.parse(intValue)); + // 2 upper + String intUpper = "2147483648"; + Assert.assertFalse(intParser.parse(intUpper)); + // 3 lower + String intLower = "-2147483649"; + Assert.assertFalse(intParser.parse(intLower)); + + // bigint + BigIntParser bigIntParser = new BigIntParser(); + // 1 normal + String bigint = "100"; + Assert.assertTrue(bigIntParser.parse(bigint)); + // 2 upper + String bigintUpper = "9223372036854775808"; + Assert.assertFalse(bigIntParser.parse(bigintUpper)); + // 3 lower + String bigintLower = "-9223372036854775809"; + Assert.assertFalse(bigIntParser.parse(bigintLower)); + + // largeint + LargeIntParser largeIntParser = new LargeIntParser(); + // 1 normal + String largeint = "100"; + Assert.assertTrue(largeIntParser.parse(largeint)); + // 2 upper + String largeintUpper = "170141183460469231731687303715884105728"; + Assert.assertFalse(largeIntParser.parse(largeintUpper)); + // 3 lower + String largeintLower = "-170141183460469231731687303715884105729"; + Assert.assertFalse(largeIntParser.parse(largeintLower)); + + // float + FloatParser floatParser = new FloatParser(); + // normal + String floatValue = "1.1"; + Assert.assertTrue(floatParser.parse(floatValue)); + // inf + String inf = "Infinity"; + Assert.assertFalse(floatParser.parse(inf)); + // nan + String nan = "NaN"; + // failed + Assert.assertFalse(floatParser.parse(nan)); + + // double + DoubleParser doubleParser = new DoubleParser(); + // normal + Assert.assertTrue(doubleParser.parse(floatValue)); + // inf + Assert.assertFalse(doubleParser.parse(inf)); + // nan + Assert.assertFalse(doubleParser.parse(nan)); + + // decimal + EtlJobConfig.EtlColumn etlColumn = new EtlJobConfig.EtlColumn(); + etlColumn.precision = 5; + etlColumn.scale = 3; + DecimalParser decimalParser = new DecimalParser(etlColumn); + // normal + String decimalValue = "10.333"; + Assert.assertTrue(decimalParser.parse(decimalValue)); + // overflow + String decimalOverflow = "1000.3333333333"; + Assert.assertFalse(decimalParser.parse(decimalOverflow)); + + // string + EtlJobConfig.EtlColumn stringColumn = new EtlJobConfig.EtlColumn(); + stringColumn.stringLength = 3; + StringParser stringParser = new StringParser(stringColumn); + // normal + String stringnormal = "a"; + Assert.assertTrue(stringParser.parse(stringnormal)); + // overflow + String stringoverflow = "中文"; + Assert.assertFalse(stringParser.parse(stringoverflow)); + } + +} diff --git a/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/SparkDppTest.java b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/SparkDppTest.java new file mode 100644 index 0000000..9cec220 --- /dev/null +++ b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/SparkDppTest.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package org.apache.doris.load.loadv2.dpp; + +import org.apache.doris.load.loadv2.etl.EtlJobConfig; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.math.BigDecimal; + +public class SparkDppTest { + + @Test + public void testValidateData() { + SparkDpp sparkDpp = new SparkDpp(); + + // decimal + EtlJobConfig.EtlColumn etlColumn = new EtlJobConfig.EtlColumn(); + etlColumn.columnType = "DECIMALV2"; + etlColumn.precision = 3; + etlColumn.scale = 2; + + DecimalParser decimalParser = new DecimalParser(etlColumn); + // test max/min + Assert.assertTrue(decimalParser.getMaxValue().toString().equals("9.99")); + Assert.assertTrue(decimalParser.getMinValue().toString().equals("-9.99")); + // normal + BigDecimal bigDecimal = new BigDecimal("1.21"); + Assert.assertTrue(sparkDpp.validateData(bigDecimal, etlColumn, decimalParser, RowFactory.create(bigDecimal))); + // failed + BigDecimal bigDecimalFailed = new BigDecimal("10"); + Assert.assertFalse(sparkDpp.validateData(bigDecimalFailed, etlColumn, decimalParser, RowFactory.create(bigDecimalFailed))); + + // string + EtlJobConfig.EtlColumn stringColumn = new EtlJobConfig.EtlColumn(); + stringColumn.stringLength = 3; + stringColumn.columnType = "VARCHAR"; + StringParser stringParser = new StringParser(stringColumn); + // normal + String normalString = "a1"; + Assert.assertTrue(sparkDpp.validateData(normalString, stringColumn, stringParser, RowFactory.create(normalString))); + // cn normal + String normalStringCN = "中"; + Assert.assertTrue(sparkDpp.validateData(normalStringCN, stringColumn, stringParser, RowFactory.create(normalStringCN))); + // cn failed + String failedStringCN = "中a"; + Assert.assertFalse(sparkDpp.validateData(failedStringCN, stringColumn, stringParser, RowFactory.create(failedStringCN))); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org