This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 2064f43 [fix] null value is written as null string in csv format (#166) 2064f43 is described below commit 2064f4395a2e9d7fafbd58daab8123f1569cceef Author: gnehil <adamlee...@gmail.com> AuthorDate: Tue Dec 5 15:03:15 2023 +0800 [fix] null value is written as null string in csv format (#166) --- .../doris/spark/load/RecordBatchInputStream.java | 6 +- .../java/org/apache/doris/spark/util/DataUtil.java | 42 ++++------ .../org/apache/doris/spark/util/DataUtilTest.java | 90 ++++++++++++++++++++++ 3 files changed, 107 insertions(+), 31 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java index 047ac3b..544e683 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java @@ -164,11 +164,7 @@ public class RecordBatchInputStream extends InputStream { switch (recordBatch.getFormat().toLowerCase()) { case "csv": - if (recordBatch.getAddDoubleQuotes()) { - bytes = DataUtil.rowAddDoubleQuotesToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep()); - } else { - bytes = DataUtil.rowToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep()); - } + bytes = DataUtil.rowToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep(), recordBatch.getAddDoubleQuotes()); break; case "json": try { diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java index 530657e..763d72b 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java @@ -21,6 +21,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.json.JsonMapper; import com.fasterxml.jackson.module.scala.DefaultScalaModule; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.doris.spark.sql.SchemaUtils; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructField; @@ -29,6 +31,10 @@ import org.apache.spark.sql.types.StructType; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; public class DataUtil { @@ -36,36 +42,20 @@ public class DataUtil { public static final String NULL_VALUE = "\\N"; - public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String sep) { - StringBuilder builder = new StringBuilder(); + public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String sep, boolean quote) { StructField[] fields = schema.fields(); int n = row.numFields(); if (n > 0) { - builder.append(SchemaUtils.rowColumnValue(row, 0, fields[0].dataType())); - int i = 1; - while (i < n) { - builder.append(sep); - builder.append(SchemaUtils.rowColumnValue(row, i, fields[i].dataType())); - i++; - } + return IntStream.range(0, row.numFields()).boxed().map(idx -> { + Object value = ObjectUtils.defaultIfNull(SchemaUtils.rowColumnValue(row, idx, fields[idx].dataType()), + NULL_VALUE); + if (quote) { + value = "\"" + value + "\""; + } + return value.toString(); + }).collect(Collectors.joining(sep)).getBytes(StandardCharsets.UTF_8); } - return builder.toString().getBytes(StandardCharsets.UTF_8); - } - - public static byte[] rowAddDoubleQuotesToCsvBytes(InternalRow row, StructType schema, String sep) { - StringBuilder builder = new StringBuilder(); - StructField[] fields = schema.fields(); - int n = row.numFields(); - if (n > 0) { - builder.append("\"").append(SchemaUtils.rowColumnValue(row, 0, fields[0].dataType())).append("\""); - int i = 1; - while (i < n) { - builder.append(sep); - builder.append("\"").append(SchemaUtils.rowColumnValue(row, i, fields[i].dataType())).append("\""); - i++; - } - } - return builder.toString().getBytes(StandardCharsets.UTF_8); + return StringUtils.EMPTY.getBytes(StandardCharsets.UTF_8); } public static byte[] rowToJsonBytes(InternalRow row, StructType schema) throws JsonProcessingException { diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/util/DataUtilTest.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/DataUtilTest.java new file mode 100644 index 0000000..2dca9bb --- /dev/null +++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/DataUtilTest.java @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.util; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import junit.framework.TestCase; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import scala.collection.JavaConversions; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +@RunWith(JUnit4.class) +public class DataUtilTest extends TestCase { + + private static final ObjectMapper MAPPER = JsonMapper.builder().build(); + + private List<Object> values; + + private StructType schema; + + @Override + @Before + public void setUp() throws Exception { + if (values == null) { + values = new LinkedList<>(); + values.add(1); + values.add(null); + values.add(UTF8String.fromString("abc")); + } + schema = new StructType(new StructField[]{ + StructField.apply("c1", DataTypes.IntegerType, true, Metadata.empty()), + StructField.apply("c2", DataTypes.StringType, true, Metadata.empty()), + StructField.apply("c3", DataTypes.StringType, true, Metadata.empty()) + }); + } + + + @Test + public void rowToCsvBytes() { + + InternalRow row = InternalRow.apply(JavaConversions.asScalaBuffer(values).toSeq()); + byte[] bytes = DataUtil.rowToCsvBytes(row, schema, ",", false); + Assert.assertArrayEquals("1,\\N,abc".getBytes(StandardCharsets.UTF_8), bytes); + byte[] bytes1 = DataUtil.rowToCsvBytes(row, schema, ",", true); + Assert.assertArrayEquals("\"1\",\"\\N\",\"abc\"".getBytes(StandardCharsets.UTF_8), bytes1); + } + + @Test + public void rowToJsonBytes() throws JsonProcessingException { + Map<String, Object> dataMap = new HashMap<>(values.size()); + dataMap.put("c1", 1); + dataMap.put("c2", null); + dataMap.put("c3", "abc"); + InternalRow row = InternalRow.apply(JavaConversions.asScalaBuffer(values).toSeq()); + byte[] bytes = DataUtil.rowToJsonBytes(row, schema); + Assert.assertArrayEquals(MAPPER.writeValueAsBytes(dataMap), bytes); + } + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org