This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new e0c67fb [feature] support spark array type write (#127) e0c67fb is described below commit e0c67fb871bae718023d5f5b30011ed2c2a3c278 Author: gnehil <adamlee...@gmail.com> AuthorDate: Tue Aug 15 15:13:08 2023 +0800 [feature] support spark array type write (#127) --- .../apache/doris/spark/load/DorisStreamLoad.java | 16 ++++--- .../java/org/apache/doris/spark/util/DataUtil.java | 50 ++++++++++++++++++++++ .../org/apache/doris/spark/util/DataUtilTest.java | 32 ++++++++++++++ 3 files changed, 91 insertions(+), 7 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 3708c55..4a7b1e0 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -22,6 +22,7 @@ import org.apache.doris.spark.exception.StreamLoadException; import org.apache.doris.spark.rest.RestService; import org.apache.doris.spark.rest.models.BackendV2; import org.apache.doris.spark.rest.models.RespContent; +import org.apache.doris.spark.util.DataUtil; import org.apache.doris.spark.util.ListUtils; import org.apache.doris.spark.util.ResponseUtil; @@ -380,7 +381,13 @@ public class DorisStreamLoad implements Serializable { switch (fileType.toUpperCase()) { case "CSV": - loadDataList = Collections.singletonList(rows.stream().map(row -> row.stream().map(field -> field == null ? NULL_VALUE : field.toString()).collect(Collectors.joining(FIELD_DELIMITER))).collect(Collectors.joining(LINE_DELIMITER))); + loadDataList = Collections.singletonList( + rows.stream() + .map(row -> row.stream() + .map(DataUtil::handleColumnValue) + .map(Object::toString) + .collect(Collectors.joining(FIELD_DELIMITER)) + ).collect(Collectors.joining(LINE_DELIMITER))); break; case "JSON": List<Map<Object, Object>> dataList = new ArrayList<>(); @@ -389,12 +396,7 @@ public class DorisStreamLoad implements Serializable { Map<Object, Object> dataMap = new HashMap<>(); if (dfColumns.length == row.size()) { for (int i = 0; i < dfColumns.length; i++) { - Object col = row.get(i); - if (col instanceof Timestamp) { - dataMap.put(dfColumns[i], col.toString()); - continue; - } - dataMap.put(dfColumns[i], col); + dataMap.put(dfColumns[i], DataUtil.handleColumnValue(row.get(i))); } } dataList.add(dataMap); diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java new file mode 100644 index 0000000..ed9af50 --- /dev/null +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.util; + +import scala.collection.JavaConversions; +import scala.collection.mutable.WrappedArray; + +import java.sql.Timestamp; +import java.util.Arrays; + +public class DataUtil { + + public static final String NULL_VALUE = "\\N"; + + public static Object handleColumnValue(Object value) { + + if (value == null) { + return NULL_VALUE; + } + + if (value instanceof Timestamp) { + return value.toString(); + } + + if (value instanceof WrappedArray) { + + Object[] arr = JavaConversions.seqAsJavaList((WrappedArray) value).toArray(); + return Arrays.toString(arr); + } + + return value; + + } + +} diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/util/DataUtilTest.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/DataUtilTest.java new file mode 100644 index 0000000..020a241 --- /dev/null +++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/DataUtilTest.java @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.util; + +import junit.framework.TestCase; +import org.junit.Assert; +import scala.collection.mutable.WrappedArray; + +import java.sql.Timestamp; + +public class DataUtilTest extends TestCase { + + public void testHandleColumnValue() { + Assert.assertEquals("2023-08-14 18:00:00.0", DataUtil.handleColumnValue(Timestamp.valueOf("2023-08-14 18:00:00"))); + Assert.assertEquals("[1, 2, 3]", DataUtil.handleColumnValue(WrappedArray.make(new Integer[]{1,2,3}))); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org