This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 86de79e  [improvement] json format support json by line mode (#120)
86de79e is described below

commit 86de79ef8a59e49de99edc86ca85b7ac9c949bcb
Author: gnehil <adamlee...@gmail.com>
AuthorDate: Wed Jul 26 17:21:44 2023 +0800

    [improvement] json format support json by line mode (#120)
---
 .../apache/doris/spark/load/DorisStreamLoad.java   | 19 +++++++------
 .../org/apache/doris/spark/util/ListUtils.java     | 33 ++++++++++++++++++----
 .../org/apache/doris/spark/util/TestListUtils.java |  4 +--
 .../doris/spark/sql/TestConnectorWriteDoris.scala  | 23 +++++++++++++++
 4 files changed, 63 insertions(+), 16 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
index 07e6624..e1c1bc1 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
@@ -69,7 +69,7 @@ import java.util.stream.Collectors;
  **/
 public class DorisStreamLoad implements Serializable {
     private String FIELD_DELIMITER;
-    private String LINE_DELIMITER;
+    private final String LINE_DELIMITER;
     private static final String NULL_VALUE = "\\N";
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisStreamLoad.class);
@@ -89,6 +89,8 @@ public class DorisStreamLoad implements Serializable {
     private final LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
     private final String fileType;
 
+    private boolean readJsonByLine = false;
+
     public DorisStreamLoad(SparkSettings settings) {
         String[] dbTable = 
settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
         this.db = dbTable[0];
@@ -105,6 +107,12 @@ public class DorisStreamLoad implements Serializable {
         fileType = streamLoadProp.getOrDefault("format", "csv");
         if ("csv".equals(fileType)){
             FIELD_DELIMITER = 
EscapeHandler.escapeString(streamLoadProp.getOrDefault("column_separator", 
"\t"));
+        } else if ("json".equalsIgnoreCase(fileType)) {
+            readJsonByLine = 
Boolean.parseBoolean(streamLoadProp.getOrDefault("read_json_by_line", "false"));
+            boolean stripOuterArray = 
Boolean.parseBoolean(streamLoadProp.getOrDefault("strip_outer_array", "false"));
+            if (readJsonByLine && stripOuterArray) {
+                throw new IllegalArgumentException("Only one of options 
'read_json_by_line' and 'strip_outer_array' can be set to true");
+            }
         }
         LINE_DELIMITER = 
EscapeHandler.escapeString(streamLoadProp.getOrDefault("line_delimiter", "\n"));
     }
@@ -134,12 +142,7 @@ public class DorisStreamLoad implements Serializable {
             httpPut.setHeader("max_filter_ratio", maxFilterRatio);
         }
         if (MapUtils.isNotEmpty(streamLoadProp)) {
-            streamLoadProp.entrySet().stream()
-                    .filter(entry -> 
!"read_json_by_line".equals(entry.getKey()))
-                    .forEach(entry -> httpPut.setHeader(entry.getKey(), 
entry.getValue()));
-        }
-        if (fileType.equals("json")) {
-            httpPut.setHeader("strip_outer_array", "true");
+            streamLoadProp.forEach(httpPut::setHeader);
         }
         return httpPut;
     }
@@ -195,7 +198,7 @@ public class DorisStreamLoad implements Serializable {
                 throw new StreamLoadException("The number of configured 
columns does not match the number of data columns.");
             }
             // splits large collections to normal collection to avoid the 
"Requested array size exceeds VM limit" exception
-            List<String> serializedList = 
ListUtils.getSerializedList(dataList);
+            List<String> serializedList = 
ListUtils.getSerializedList(dataList, readJsonByLine ? LINE_DELIMITER : null);
             for (String serializedRows : serializedList) {
                 load(serializedRows);
             }
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java
index 46a37ff..d8d31b9 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java
@@ -24,7 +24,6 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -34,9 +33,10 @@ public class ListUtils {
 
     private static final ObjectMapper MAPPER = new ObjectMapper();
 
-    public static List<String> getSerializedList(List<Map<Object, Object>> 
batch) throws JsonProcessingException {
+    public static List<String> getSerializedList(List<Map<Object, Object>> 
batch,
+                                                 String lineDelimiter) throws 
JsonProcessingException {
         List<String> result = new ArrayList<>();
-        divideAndSerialize(batch, result);
+        divideAndSerialize(batch, result, lineDelimiter);
         return result;
     }
 
@@ -46,8 +46,9 @@ public class ListUtils {
      * @param result
      * @throws JsonProcessingException
      */
-    public static void divideAndSerialize(List<Map<Object, Object>> batch, 
List<String> result) throws JsonProcessingException {
-        String serializedResult = MAPPER.writeValueAsString(batch);
+    public static void divideAndSerialize(List<Map<Object, Object>> batch, 
List<String> result, String lineDelimiter)
+            throws JsonProcessingException {
+        String serializedResult = generateSerializedResult(batch, 
lineDelimiter);
         // if an error occurred in the batch call to getBytes ,average divide 
the batch
         try {
             //the "Requested array size exceeds VM limit" exception occurs 
when the collection is large
@@ -58,7 +59,7 @@ public class ListUtils {
             LOG.error("getBytes error:{} ,average divide the collection", 
ExceptionUtils.getStackTrace(error));
         }
         for (List<Map<Object, Object>> avgSubCollection : 
getAvgSubCollections(batch)) {
-            divideAndSerialize(avgSubCollection, result);
+            divideAndSerialize(avgSubCollection, result, lineDelimiter);
         }
     }
 
@@ -70,4 +71,24 @@ public class ListUtils {
     public static List<List<Map<Object, Object>>> 
getAvgSubCollections(List<Map<Object, Object>> values) {
         return Lists.partition(values, (values.size() + 1) / 2);
     }
+
+    private static String generateSerializedResult(List<Map<Object, Object>> 
batch, String lineDelimiter)
+            throws JsonProcessingException {
+
+        // when lineDelimiter is null, use strip_outer_array mode, otherwise 
use json_by_line mode
+        if (lineDelimiter == null) {
+            return MAPPER.writeValueAsString(batch);
+        } else {
+            StringBuilder builder = new StringBuilder();
+            for (Map<Object, Object> data : batch) {
+                
builder.append(MAPPER.writeValueAsString(data)).append(lineDelimiter);
+            }
+            int lastIdx = builder.lastIndexOf(lineDelimiter);
+            if (lastIdx != -1) {
+                return builder.substring(0, lastIdx);
+            }
+            return builder.toString();
+        }
+    }
+
 }
diff --git 
a/spark-doris-connector/src/test/java/org/apache/doris/spark/util/TestListUtils.java
 
b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/TestListUtils.java
index c0ec102..4e36418 100644
--- 
a/spark-doris-connector/src/test/java/org/apache/doris/spark/util/TestListUtils.java
+++ 
b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/TestListUtils.java
@@ -34,9 +34,9 @@ public class TestListUtils {
             Map<Object, Object> entity = new HashMap<>();
             batch.add(entity);
         }
-        Assert.assertEquals(ListUtils.getSerializedList(batch).size(), 1);
+        Assert.assertEquals(ListUtils.getSerializedList(batch, "\n").size(), 
1);
 
-        Assert.assertEquals(ListUtils.getSerializedList(new 
ArrayList<>()).size(), 1);
+        Assert.assertEquals(ListUtils.getSerializedList(new ArrayList<>(), 
"\n").size(), 1);
 
     }
 }
diff --git 
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
index 26f89af..ae3b066 100644
--- 
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
+++ 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
@@ -108,4 +108,27 @@ class TestConnectorWriteDoris {
       .start().awaitTermination()
   }
 
+  @Test
+  def jsonWriteTest(): Unit = {
+    val spark = SparkSession.builder().master("local[*]").getOrCreate()
+    val df = spark.createDataFrame(Seq(
+      ("1", 100, "待付款"),
+      ("2", 200, "待发货"),
+      ("3", 300, "已收货")
+    )).toDF("order_id", "order_amount", "order_status")
+    df.write
+      .format("doris")
+      .option("doris.fenodes", dorisFeNodes)
+      .option("doris.table.identifier", dorisTable)
+      .option("user", dorisUser)
+      .option("password", dorisPwd)
+      .option("sink.batch.size", 2)
+      .option("sink.max-retries", 2)
+      .option("sink.properties.format", "json")
+      // .option("sink.properties.read_json_by_line", "true")
+      .option("sink.properties.strip_outer_array", "true")
+      .save()
+    spark.stop()
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to