This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new f761ebb [Fix](connector) improve accept_any_schema to options (#326)
f761ebb is described below
commit f761ebb2caa135e9336cb41ec1278413b9e7933e
Author: wanggx <[email protected]>
AuthorDate: Wed Jul 2 11:37:43 2025 +0800
[Fix](connector) improve accept_any_schema to options (#326)
* change accept_any_schema to options
* add update partial columns itcase
---
.../client/write/AbstractStreamLoadProcessor.java | 10 +-
.../apache/doris/spark/config/DorisOptions.java | 8 +
.../doris/spark/sql/DorisAnySchemaITCase.scala | 364 +++++++++++++++++++++
.../doris/spark/catalog/DorisTableBase.scala | 11 +-
4 files changed, 384 insertions(+), 9 deletions(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
index b4dcbf1..f64d201 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
@@ -53,25 +53,23 @@ import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import static org.apache.doris.spark.config.DorisOptions.GROUP_COMMIT;
+import static org.apache.doris.spark.config.DorisOptions.PARTIAL_COLUMNS;
+import static org.apache.doris.spark.config.DorisOptions.VALID_GROUP_MODE;
+
public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R>
implements DorisCommitter {
protected static final JsonMapper MAPPER =
JsonMapper.builder().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false).build();
- private static final String PARTIAL_COLUMNS = "partial_columns";
- private static final String GROUP_COMMIT = "group_commit";
- private static final Set<String> VALID_GROUP_MODE =
- new HashSet<>(Arrays.asList("sync_mode", "async_mode",
"off_mode"));
private static final int arrowBufferSize = 1000;
protected final Logger logger =
LoggerFactory.getLogger(this.getClass().getName().replaceAll("\\$", ""));
protected final DorisConfig config;
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
index 5c31fa1..0be7acf 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
@@ -17,6 +17,10 @@
package org.apache.doris.spark.config;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
public class DorisOptions {
public static final ConfigOption<String> DORIS_FENODES =
ConfigOptions.name("doris.fenodes").stringType().withoutDefaultValue().withDescription("");
@@ -73,6 +77,10 @@ public class DorisOptions {
public static final ConfigOption<String> DORIS_MAX_FILTER_RATIO =
ConfigOptions.name("doris.max.filter.ratio").stringType().withoutDefaultValue().withDescription("");
public static final String STREAM_LOAD_PROP_PREFIX =
"doris.sink.properties.";
+ public static final String PARTIAL_COLUMNS = "partial_columns";
+ public static final String GROUP_COMMIT = "group_commit";
+ public static final Set<String> VALID_GROUP_MODE =
+ new HashSet<>(Arrays.asList("sync_mode", "async_mode",
"off_mode"));
public static final ConfigOption<Integer> DORIS_SINK_TASK_PARTITION_SIZE =
ConfigOptions.name("doris.sink.task.partition.size").intType().withoutDefaultValue().withDescription("");
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisAnySchemaITCase.scala
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisAnySchemaITCase.scala
new file mode 100644
index 0000000..f516eaa
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisAnySchemaITCase.scala
@@ -0,0 +1,364 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.sql
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.doris.spark.container.{AbstractContainerTestBase,
ContainerUtils}
+import
org.apache.doris.spark.container.AbstractContainerTestBase.{assertEqualsInAnyOrder,
getDorisQueryConnection}
+import org.apache.doris.spark.rest.models.DataModel
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.junit.Test
+import org.slf4j.LoggerFactory
+
+import java.util
+import scala.collection.JavaConverters._
+
+class DorisAnySchemaITCase extends AbstractContainerTestBase {
+
+ private val LOG = LoggerFactory.getLogger(classOf[DorisAnySchemaITCase])
+
+ val DATABASE: String = "example_db"
+ /*
+ * CREATE TABLE table4
+ * (
+ * siteid INT DEFAULT '10',
+ * citycode SMALLINT,
+ * username VARCHAR(32) DEFAULT '',
+ * pv BIGINT DEFAULT '0'
+ * )
+ * UNIQUE KEY(siteid, citycode, username)
+ * DISTRIBUTED BY HASH(siteid) BUCKETS 10
+ * PROPERTIES("replication_num" = "1");
+ */
+ val dorisTable = "table4"
+
+ /*
+ * CREATE TABLE table2
+ * (
+ * siteid INT DEFAULT '10',
+ * citycode SMALLINT,
+ * username VARCHAR(32) DEFAULT '',
+ * pv BIGINT DEFAULT '0'
+ * )
+ * UNIQUE KEY(siteid, citycode, username)
+ * DISTRIBUTED BY HASH(siteid) BUCKETS 10
+ * PROPERTIES("replication_num" = "1");
+ */
+ val dorisSourceTable = "table2"
+
+ /*
+ * CREATE TABLE table5
+ * (
+ * siteid INT DEFAULT '10',
+ * citycode SMALLINT,
+ * username VARCHAR(32) DEFAULT '',
+ * pv BIGINT DEFAULT '0',
+ * p_value BIGINT DEFAULT '0'
+ * )
+ * UNIQUE KEY(siteid, citycode, username)
+ * DISTRIBUTED BY HASH(siteid) BUCKETS 10
+ * PROPERTIES("replication_num" = "1");
+ */
+ val dorisPartialTable = "table5"
+
+ @Test
+ def jsonDataWriteTest(): Unit = {
+ initializeTable(dorisTable, DataModel.UNIQUE)
+
+ val spark = SparkSession.builder().master("local[*]").getOrCreate()
+ try {
+ val df = spark.createDataFrame(Seq(
+ (0, 0, "user1", 100),
+ (1, 0, "user2", 100),
+ (3, 0, "user2", 200)
+ )).toDF("siteid", "citycode", "username", "pv")
+ df.write
+ .format("doris")
+ .option("doris.fenodes", getFenodes)
+ .option("doris.table.identifier", DATABASE + "." + dorisTable)
+ .option("user", getDorisUsername)
+ .option("password", getDorisPassword)
+ .option("doris.sink.properties.format", "json")
+ .option("sink.batch.size", 2)
+ .option("sink.max-retries", 2)
+ .mode(SaveMode.Append)
+ .save()
+
+ Thread.sleep(10000)
+ val actual = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("select * from %s.%s", DATABASE, dorisTable),
+ 4)
+ val expected = util.Arrays.asList("0,0,user1,100", "1,0,user2,100",
"3,0,user2,200");
+ checkResultInAnyOrder("jsonDataWriteTest", expected.toArray,
actual.toArray)
+ } finally {
+ spark.stop()
+ }
+ }
+
+ @Test
+ def jsonDataWriteWithPartialUpdateTest(): Unit = {
+ initializePartialTable(dorisPartialTable, DataModel.UNIQUE)
+ val spark = SparkSession.builder().master("local[*]").getOrCreate()
+ try {
+ val df = spark.createDataFrame(Seq(
+ (0, 0, "user4", 100),
+ (1, 0, "user5", 100),
+ (3, 0, "user6", 200)
+ )).toDF("siteid", "citycode", "username", "pv")
+ df.write
+ .format("doris")
+ .option("doris.fenodes", getFenodes)
+ .option("doris.table.identifier", DATABASE + "." + dorisPartialTable)
+ .option("user", getDorisUsername)
+ .option("password", getDorisPassword)
+ .option("doris.sink.properties.format", "json")
+ .option("doris.sink.properties.partial_columns", "true")
+ .option("doris.write.fields", "siteid,citycode,username,pv")
+ .option("sink.batch.size", 2)
+ .option("sink.max-retries", 2)
+ .mode(SaveMode.Append)
+ .save()
+ spark.stop()
+ Thread.sleep(2000)
+ val actual = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("select * from %s.%s", DATABASE, dorisPartialTable),
+ 5)
+ val expected = util.Arrays.asList("0,0,user4,100,0", "1,0,user5,100,0",
"3,0,user6,200,0");
+ checkResultInAnyOrder("jsonDataWriteWithPartialUpdateTest",
expected.toArray, actual.toArray)
+ } finally {
+ spark.stop()
+ }
+ }
+
+ @Test
+ def jsonDataWriteSqlTest(): Unit = {
+ initializeTable(dorisTable, DataModel.UNIQUE)
+ initializeTable(dorisSourceTable, DataModel.UNIQUE)
+ val spark = SparkSession.builder().master("local[*]").getOrCreate()
+ try {
+ val doris = spark.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_lh
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + dorisTable}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}"
+ |)
+ |""".stripMargin)
+ spark.sql(
+ """
+ |insert into test_lh values (0, 0, "user1", 100), (1, 0, "user2",
100),(2, 1, "user3", 100),(3, 0, "user2", 200)
+ |""".stripMargin)
+
+ spark.sql(
+ s"""
+ |CREATE TEMPORARY VIEW table2
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + dorisSourceTable}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}"
+ |)
+ |""".stripMargin)
+
+ spark.sql(
+ """
+ |insert into table2 values (0, 0, "user1", 100), (1, 0, "user2",
100),(2, 1, "user3", 100),(3, 0, "user2", 200)
+ |""".stripMargin)
+
+ spark.sql(
+ """
+ |insert into test_lh select siteid, citycode + 1, username, pv + 1
from table2
+ |""".stripMargin)
+
+ Thread.sleep(2000)
+ val actual = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("select * from %s.%s", DATABASE, dorisTable),
+ 4)
+ val expected = util.Arrays.asList("0,0,user1,100", "1,0,user2,100",
"2,1,user3,100", "3,0,user2,200",
+ "0,1,user1,101", "1,1,user2,101", "2,2,user3,101", "3,1,user2,201");
+ checkResultInAnyOrder("jsonDataWriteSqlTest", expected.toArray,
actual.toArray)
+ } finally {
+ spark.stop()
+ }
+ }
+
+ @Test
+ def jsonDataWriteWithPartialUpdateSqlTest(): Unit = {
+ initializePartialTable(dorisPartialTable, DataModel.UNIQUE)
+ val spark = SparkSession.builder().master("local[*]").getOrCreate()
+ try {
+ val version = spark.version
+ LOG.info("spark version: " + version)
+ if (StringUtils.startsWith(version, "2")
+ || StringUtils.startsWith(version, "3.1")
+ || StringUtils.startsWith(version, "3.2")
+ || StringUtils.startsWith(version, "3.4")) {
+ LOG.warn("sql partial_columns is only support in spark3.3/3.5+")
+ return
+ }
+ val doris = spark.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_lh
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + dorisPartialTable}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}",
+ | "doris.sink.properties.format" = "csv",
+ | "doris.sink.properties.partial_columns" = "true",
+ | "doris.write.fields" = "siteid,citycode,username,pv"
+ |)
+ |""".stripMargin)
+ spark.sql(
+ """
+ |INSERT INTO test_lh (siteid, citycode, username, pv) VALUES (0, 0,
'user1',3)
+ |""".stripMargin)
+
+ Thread.sleep(2000)
+ val actual = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("select * from %s.%s", DATABASE, dorisPartialTable),
+ 5)
+ val expected = util.Arrays.asList("0,0,user1,3,0");
+ checkResultInAnyOrder("jsonDataWriteWithPartialUpdateSqlTest",
expected.toArray, actual.toArray)
+ } finally {
+ spark.stop()
+ }
+ }
+
+ @Test
+ def jsonDataWriteWithPartialUpdateSqlTest1(): Unit = {
+ initializePartialTable(dorisPartialTable, DataModel.UNIQUE)
+ initializeTable(dorisSourceTable, DataModel.UNIQUE)
+ val spark = SparkSession.builder().master("local[*]").getOrCreate()
+ try {
+ val version = spark.version
+ LOG.info("spark version: " + version)
+ if (StringUtils.startsWith(version, "2")
+ || StringUtils.startsWith(version, "3.1")
+ || StringUtils.startsWith(version, "3.2")
+ || StringUtils.startsWith(version, "3.4")) {
+ LOG.warn("sql partial_columns is only support in spark3.3/3.5+")
+ return
+ }
+ val doris = spark.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_lh
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + dorisPartialTable}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}",
+ | "doris.sink.properties.format" = "json",
+ | "doris.sink.properties.partial_columns" = "true"
+ |)
+ |""".stripMargin)
+
+ spark.sql(
+ s"""
+ |CREATE TEMPORARY VIEW table2
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + dorisSourceTable}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}"
+ |)
+ |""".stripMargin)
+
+ spark.sql(
+ """
+ |INSERT INTO table2 (siteid, citycode, username, pv) VALUES (0, 0,
'user1',3)
+ |""".stripMargin)
+
+ spark.sql(
+ """
+ |insert into test_lh(siteid,citycode,username, pv) select
siteid,citycode+1 as citycode,username,pv+1 as pv from table2
+ |""".stripMargin)
+ Thread.sleep(2000)
+ val actual = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("select * from %s.%s", DATABASE, dorisPartialTable),
+ 5)
+ val expected = util.Arrays.asList("0,1,user1,4,0");
+ checkResultInAnyOrder("jsonDataWriteWithPartialUpdateSqlTest1",
expected.toArray, actual.toArray)
+ } finally {
+ spark.stop()
+ }
+ }
+
+ private def initializeTable(table: String, dataModel: DataModel): Unit = {
+ val morProps = if (!(DataModel.UNIQUE_MOR == dataModel)) "" else
",\"enable_unique_key_merge_on_write\" = \"false\""
+ val model = if (dataModel == DataModel.UNIQUE_MOR)
DataModel.UNIQUE.toString else dataModel.toString
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+ String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+ String.format("CREATE TABLE %s.%s ( \n"
+ + " siteid INT DEFAULT '10',"
+ + " citycode SMALLINT, "
+ + " username VARCHAR(32) DEFAULT '',"
+ + " pv BIGINT DEFAULT '0' "
+ + " )"
+ + " %s KEY(siteid, citycode, username) "
+ + " DISTRIBUTED BY HASH(`siteid`) BUCKETS 1\n"
+ + "PROPERTIES ("
+ + "\"replication_num\" = \"1\"\n" + morProps + ")", DATABASE, table,
model))
+ }
+
+ private def initializePartialTable(table: String, dataModel: DataModel):
Unit = {
+ val morProps = if (!(DataModel.UNIQUE_MOR == dataModel)) "" else
",\"enable_unique_key_merge_on_write\" = \"false\""
+ val model = if (dataModel == DataModel.UNIQUE_MOR)
DataModel.UNIQUE.toString else dataModel.toString
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+ String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+ String.format("CREATE TABLE %s.%s ( \n"
+ + " siteid INT DEFAULT '10',"
+ + " citycode SMALLINT, "
+ + " username VARCHAR(32) DEFAULT '',"
+ + " pv BIGINT DEFAULT '0', "
+ + " p_value BIGINT DEFAULT '0' "
+ + " )"
+ + " %s KEY(siteid, citycode, username) "
+ + " DISTRIBUTED BY HASH(`siteid`) BUCKETS 1\n"
+ + "PROPERTIES ("
+ + "\"replication_num\" = \"1\"\n" + morProps + ")", DATABASE, table,
model))
+ }
+
+ private def checkResultInAnyOrder(testName: String, expected: Array[AnyRef],
actual: Array[AnyRef]): Unit = {
+ LOG.info("Checking DorisAnySchemaITCase result. testName={}, actual={},
expected={}", testName, actual, expected)
+ assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava)
+ }
+}
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
index 50af0fa..90da702 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
import java.util
import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.language.implicitConversions
@@ -45,11 +46,15 @@ abstract class DorisTableBase(identifier: Identifier,
config: DorisConfig, schem
})
override def capabilities(): util.Set[TableCapability] = {
- Set(BATCH_READ,
+ val capabilities = mutable.Set(BATCH_READ,
BATCH_WRITE,
STREAMING_WRITE,
- ACCEPT_ANY_SCHEMA,
- TRUNCATE).asJava
+ TRUNCATE)
+ val properties = config.getSinkProperties
+ if (properties.containsKey(DorisOptions.PARTIAL_COLUMNS) &&
"true".equalsIgnoreCase(properties.get(DorisOptions.PARTIAL_COLUMNS))) {
+ capabilities += ACCEPT_ANY_SCHEMA
+ }
+ capabilities.asJava
}
override def newScanBuilder(caseInsensitiveStringMap:
CaseInsensitiveStringMap): ScanBuilder = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]