This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 2064f43  [fix] null value is written as null string in csv format 
(#166)
2064f43 is described below

commit 2064f4395a2e9d7fafbd58daab8123f1569cceef
Author: gnehil <adamlee...@gmail.com>
AuthorDate: Tue Dec 5 15:03:15 2023 +0800

    [fix] null value is written as null string in csv format (#166)
---
 .../doris/spark/load/RecordBatchInputStream.java   |  6 +-
 .../java/org/apache/doris/spark/util/DataUtil.java | 42 ++++------
 .../org/apache/doris/spark/util/DataUtilTest.java  | 90 ++++++++++++++++++++++
 3 files changed, 107 insertions(+), 31 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
index 047ac3b..544e683 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
@@ -164,11 +164,7 @@ public class RecordBatchInputStream extends InputStream {
 
         switch (recordBatch.getFormat().toLowerCase()) {
             case "csv":
-                if (recordBatch.getAddDoubleQuotes()) {
-                    bytes = DataUtil.rowAddDoubleQuotesToCsvBytes(row, 
recordBatch.getSchema(), recordBatch.getSep());
-                } else {
-                    bytes = DataUtil.rowToCsvBytes(row, 
recordBatch.getSchema(), recordBatch.getSep());
-                }
+                bytes = DataUtil.rowToCsvBytes(row, recordBatch.getSchema(), 
recordBatch.getSep(), recordBatch.getAddDoubleQuotes());
                 break;
             case "json":
                 try {
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
index 530657e..763d72b 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
@@ -21,6 +21,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.json.JsonMapper;
 import com.fasterxml.jackson.module.scala.DefaultScalaModule;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.spark.sql.SchemaUtils;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructField;
@@ -29,6 +31,10 @@ import org.apache.spark.sql.types.StructType;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 public class DataUtil {
 
@@ -36,36 +42,20 @@ public class DataUtil {
 
     public static final String NULL_VALUE = "\\N";
 
-    public static byte[] rowToCsvBytes(InternalRow row, StructType schema, 
String sep) {
-        StringBuilder builder = new StringBuilder();
+    public static byte[] rowToCsvBytes(InternalRow row, StructType schema, 
String sep, boolean quote) {
         StructField[] fields = schema.fields();
         int n = row.numFields();
         if (n > 0) {
-            builder.append(SchemaUtils.rowColumnValue(row, 0, 
fields[0].dataType()));
-            int i = 1;
-            while (i < n) {
-                builder.append(sep);
-                builder.append(SchemaUtils.rowColumnValue(row, i, 
fields[i].dataType()));
-                i++;
-            }
+            return IntStream.range(0, row.numFields()).boxed().map(idx -> {
+                Object value = 
ObjectUtils.defaultIfNull(SchemaUtils.rowColumnValue(row, idx, 
fields[idx].dataType()),
+                        NULL_VALUE);
+                if (quote) {
+                    value = "\"" + value + "\"";
+                }
+                return value.toString();
+            
}).collect(Collectors.joining(sep)).getBytes(StandardCharsets.UTF_8);
         }
-        return builder.toString().getBytes(StandardCharsets.UTF_8);
-    }
-
-    public static byte[] rowAddDoubleQuotesToCsvBytes(InternalRow row, 
StructType schema, String sep) {
-        StringBuilder builder = new StringBuilder();
-        StructField[] fields = schema.fields();
-        int n = row.numFields();
-        if (n > 0) {
-            builder.append("\"").append(SchemaUtils.rowColumnValue(row, 0, 
fields[0].dataType())).append("\"");
-            int i = 1;
-            while (i < n) {
-                builder.append(sep);
-                builder.append("\"").append(SchemaUtils.rowColumnValue(row, i, 
fields[i].dataType())).append("\"");
-                i++;
-            }
-        }
-        return builder.toString().getBytes(StandardCharsets.UTF_8);
+        return StringUtils.EMPTY.getBytes(StandardCharsets.UTF_8);
     }
 
     public static byte[] rowToJsonBytes(InternalRow row, StructType schema) 
throws JsonProcessingException {
diff --git 
a/spark-doris-connector/src/test/java/org/apache/doris/spark/util/DataUtilTest.java
 
b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/DataUtilTest.java
new file mode 100644
index 0000000..2dca9bb
--- /dev/null
+++ 
b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/DataUtilTest.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.util;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import junit.framework.TestCase;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import scala.collection.JavaConversions;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(JUnit4.class)
+public class DataUtilTest extends TestCase {
+
+    private static final ObjectMapper MAPPER = JsonMapper.builder().build();
+
+    private List<Object> values;
+
+    private StructType schema;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        if (values == null) {
+            values = new LinkedList<>();
+            values.add(1);
+            values.add(null);
+            values.add(UTF8String.fromString("abc"));
+        }
+        schema = new StructType(new StructField[]{
+                StructField.apply("c1", DataTypes.IntegerType, true, 
Metadata.empty()),
+                StructField.apply("c2", DataTypes.StringType, true, 
Metadata.empty()),
+                StructField.apply("c3", DataTypes.StringType, true, 
Metadata.empty())
+        });
+    }
+
+
+    @Test
+    public void rowToCsvBytes() {
+
+        InternalRow row = 
InternalRow.apply(JavaConversions.asScalaBuffer(values).toSeq());
+        byte[] bytes = DataUtil.rowToCsvBytes(row, schema, ",", false);
+        Assert.assertArrayEquals("1,\\N,abc".getBytes(StandardCharsets.UTF_8), 
bytes);
+        byte[] bytes1 = DataUtil.rowToCsvBytes(row, schema, ",", true);
+        
Assert.assertArrayEquals("\"1\",\"\\N\",\"abc\"".getBytes(StandardCharsets.UTF_8),
 bytes1);
+    }
+
+    @Test
+    public void rowToJsonBytes() throws JsonProcessingException {
+        Map<String, Object> dataMap = new HashMap<>(values.size());
+        dataMap.put("c1", 1);
+        dataMap.put("c2", null);
+        dataMap.put("c3", "abc");
+        InternalRow row = 
InternalRow.apply(JavaConversions.asScalaBuffer(values).toSeq());
+        byte[] bytes = DataUtil.rowToJsonBytes(row, schema);
+        Assert.assertArrayEquals(MAPPER.writeValueAsBytes(dataMap), bytes);
+    }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to