This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 86de79e [improvement] json format support json by line mode (#120) 86de79e is described below commit 86de79ef8a59e49de99edc86ca85b7ac9c949bcb Author: gnehil <adamlee...@gmail.com> AuthorDate: Wed Jul 26 17:21:44 2023 +0800 [improvement] json format support json by line mode (#120) --- .../apache/doris/spark/load/DorisStreamLoad.java | 19 +++++++------ .../org/apache/doris/spark/util/ListUtils.java | 33 ++++++++++++++++++---- .../org/apache/doris/spark/util/TestListUtils.java | 4 +-- .../doris/spark/sql/TestConnectorWriteDoris.scala | 23 +++++++++++++++ 4 files changed, 63 insertions(+), 16 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 07e6624..e1c1bc1 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -69,7 +69,7 @@ import java.util.stream.Collectors; **/ public class DorisStreamLoad implements Serializable { private String FIELD_DELIMITER; - private String LINE_DELIMITER; + private final String LINE_DELIMITER; private static final String NULL_VALUE = "\\N"; private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class); @@ -89,6 +89,8 @@ public class DorisStreamLoad implements Serializable { private final LoadingCache<String, List<BackendV2.BackendRowV2>> cache; private final String fileType; + private boolean readJsonByLine = false; + public DorisStreamLoad(SparkSettings settings) { String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\."); this.db = dbTable[0]; @@ -105,6 +107,12 @@ public class DorisStreamLoad implements Serializable { fileType = streamLoadProp.getOrDefault("format", "csv"); if ("csv".equals(fileType)){ FIELD_DELIMITER = EscapeHandler.escapeString(streamLoadProp.getOrDefault("column_separator", "\t")); + } else if ("json".equalsIgnoreCase(fileType)) { + readJsonByLine = Boolean.parseBoolean(streamLoadProp.getOrDefault("read_json_by_line", "false")); + boolean stripOuterArray = Boolean.parseBoolean(streamLoadProp.getOrDefault("strip_outer_array", "false")); + if (readJsonByLine && stripOuterArray) { + throw new IllegalArgumentException("Only one of options 'read_json_by_line' and 'strip_outer_array' can be set to true"); + } } LINE_DELIMITER = EscapeHandler.escapeString(streamLoadProp.getOrDefault("line_delimiter", "\n")); } @@ -134,12 +142,7 @@ public class DorisStreamLoad implements Serializable { httpPut.setHeader("max_filter_ratio", maxFilterRatio); } if (MapUtils.isNotEmpty(streamLoadProp)) { - streamLoadProp.entrySet().stream() - .filter(entry -> !"read_json_by_line".equals(entry.getKey())) - .forEach(entry -> httpPut.setHeader(entry.getKey(), entry.getValue())); - } - if (fileType.equals("json")) { - httpPut.setHeader("strip_outer_array", "true"); + streamLoadProp.forEach(httpPut::setHeader); } return httpPut; } @@ -195,7 +198,7 @@ public class DorisStreamLoad implements Serializable { throw new StreamLoadException("The number of configured columns does not match the number of data columns."); } // splits large collections to normal collection to avoid the "Requested array size exceeds VM limit" exception - List<String> serializedList = ListUtils.getSerializedList(dataList); + List<String> serializedList = ListUtils.getSerializedList(dataList, readJsonByLine ? LINE_DELIMITER : null); for (String serializedRows : serializedList) { load(serializedRows); } diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java index 46a37ff..d8d31b9 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java @@ -24,7 +24,6 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -34,9 +33,10 @@ public class ListUtils { private static final ObjectMapper MAPPER = new ObjectMapper(); - public static List<String> getSerializedList(List<Map<Object, Object>> batch) throws JsonProcessingException { + public static List<String> getSerializedList(List<Map<Object, Object>> batch, + String lineDelimiter) throws JsonProcessingException { List<String> result = new ArrayList<>(); - divideAndSerialize(batch, result); + divideAndSerialize(batch, result, lineDelimiter); return result; } @@ -46,8 +46,9 @@ public class ListUtils { * @param result * @throws JsonProcessingException */ - public static void divideAndSerialize(List<Map<Object, Object>> batch, List<String> result) throws JsonProcessingException { - String serializedResult = MAPPER.writeValueAsString(batch); + public static void divideAndSerialize(List<Map<Object, Object>> batch, List<String> result, String lineDelimiter) + throws JsonProcessingException { + String serializedResult = generateSerializedResult(batch, lineDelimiter); // if an error occurred in the batch call to getBytes ,average divide the batch try { //the "Requested array size exceeds VM limit" exception occurs when the collection is large @@ -58,7 +59,7 @@ public class ListUtils { LOG.error("getBytes error:{} ,average divide the collection", ExceptionUtils.getStackTrace(error)); } for (List<Map<Object, Object>> avgSubCollection : getAvgSubCollections(batch)) { - divideAndSerialize(avgSubCollection, result); + divideAndSerialize(avgSubCollection, result, lineDelimiter); } } @@ -70,4 +71,24 @@ public class ListUtils { public static List<List<Map<Object, Object>>> getAvgSubCollections(List<Map<Object, Object>> values) { return Lists.partition(values, (values.size() + 1) / 2); } + + private static String generateSerializedResult(List<Map<Object, Object>> batch, String lineDelimiter) + throws JsonProcessingException { + + // when lineDelimiter is null, use strip_outer_array mode, otherwise use json_by_line mode + if (lineDelimiter == null) { + return MAPPER.writeValueAsString(batch); + } else { + StringBuilder builder = new StringBuilder(); + for (Map<Object, Object> data : batch) { + builder.append(MAPPER.writeValueAsString(data)).append(lineDelimiter); + } + int lastIdx = builder.lastIndexOf(lineDelimiter); + if (lastIdx != -1) { + return builder.substring(0, lastIdx); + } + return builder.toString(); + } + } + } diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/util/TestListUtils.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/TestListUtils.java index c0ec102..4e36418 100644 --- a/spark-doris-connector/src/test/java/org/apache/doris/spark/util/TestListUtils.java +++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/TestListUtils.java @@ -34,9 +34,9 @@ public class TestListUtils { Map<Object, Object> entity = new HashMap<>(); batch.add(entity); } - Assert.assertEquals(ListUtils.getSerializedList(batch).size(), 1); + Assert.assertEquals(ListUtils.getSerializedList(batch, "\n").size(), 1); - Assert.assertEquals(ListUtils.getSerializedList(new ArrayList<>()).size(), 1); + Assert.assertEquals(ListUtils.getSerializedList(new ArrayList<>(), "\n").size(), 1); } } diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala index 26f89af..ae3b066 100644 --- a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala +++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala @@ -108,4 +108,27 @@ class TestConnectorWriteDoris { .start().awaitTermination() } + @Test + def jsonWriteTest(): Unit = { + val spark = SparkSession.builder().master("local[*]").getOrCreate() + val df = spark.createDataFrame(Seq( + ("1", 100, "待付款"), + ("2", 200, "待发货"), + ("3", 300, "已收货") + )).toDF("order_id", "order_amount", "order_status") + df.write + .format("doris") + .option("doris.fenodes", dorisFeNodes) + .option("doris.table.identifier", dorisTable) + .option("user", dorisUser) + .option("password", dorisPwd) + .option("sink.batch.size", 2) + .option("sink.max-retries", 2) + .option("sink.properties.format", "json") + // .option("sink.properties.read_json_by_line", "true") + .option("sink.properties.strip_outer_array", "true") + .save() + spark.stop() + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org