This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 5c9ef61 [improvement] add new param for ignored type (#113) 5c9ef61 is described below commit 5c9ef61506b1224352367324053be2b689d103fd Author: gnehil <adamlee...@gmail.com> AuthorDate: Tue Jun 13 14:42:59 2023 +0800 [improvement] add new param for ignored type (#113) * add new param doris.ignore-type to ignore columns of specified type when creating relation * add comment --- .../doris/spark/cfg/ConfigurationOptions.java | 7 +++++++ .../org/apache/doris/spark/sql/SchemaUtils.scala | 19 ++++++++++++++----- .../apache/doris/spark/sql/TestSchemaUtils.scala | 22 +++++++++++++++++++++- 3 files changed, 42 insertions(+), 6 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index a3f4061..9783c46 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -90,4 +90,11 @@ public interface ConfigurationOptions { int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50; + /** + * set types to ignore, split by comma + * e.g. + * "doris.ignore-type"="bitmap,hll" + */ + String DORIS_IGNORE_TYPE = "doris.ignore-type"; + } diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala index 0535805..c7fad41 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala @@ -18,12 +18,13 @@ package org.apache.doris.spark.sql import org.apache.doris.sdk.thrift.TScanColumnDesc + import scala.collection.JavaConversions._ import org.apache.doris.spark.cfg.Settings import org.apache.doris.spark.exception.DorisException import org.apache.doris.spark.rest.RestService import org.apache.doris.spark.rest.models.{Field, Schema} -import org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD +import org.apache.doris.spark.cfg.ConfigurationOptions.{DORIS_IGNORE_TYPE, DORIS_READ_FIELD} import org.apache.spark.sql.types._ import org.slf4j.LoggerFactory @@ -38,7 +39,9 @@ private[spark] object SchemaUtils { */ def discoverSchema(cfg: Settings): StructType = { val schema = discoverSchemaFromFe(cfg) - convertToStruct(cfg.getProperty(DORIS_READ_FIELD), schema) + val dorisReadField = cfg.getProperty(DORIS_READ_FIELD) + val ignoreColumnType = cfg.getProperty(DORIS_IGNORE_TYPE) + convertToStruct(schema, dorisReadField, ignoreColumnType) } /** @@ -57,14 +60,20 @@ private[spark] object SchemaUtils { * @param schema inner schema * @return Spark Catalyst StructType */ - def convertToStruct(dorisReadFields: String, schema: Schema): StructType = { - val fieldList = if (dorisReadFields != null && dorisReadFields.length > 0) { + def convertToStruct(schema: Schema, dorisReadFields: String, ignoredTypes: String): StructType = { + val fieldList = if (dorisReadFields != null && dorisReadFields.nonEmpty) { dorisReadFields.split(",") } else { Array.empty[String] } + val ignoredTypeList = if (ignoredTypes != null && ignoredTypes.nonEmpty) { + ignoredTypes.split(",").map(t => t.trim.toUpperCase) + } else { + Array.empty[String] + } val fields = schema.getProperties - .filter(x => fieldList.contains(x.getName) || fieldList.isEmpty) + .filter(x => (fieldList.contains(x.getName) || fieldList.isEmpty) + && !ignoredTypeList.contains(x.getType)) .map(f => DataTypes.createStructField( f.getName, diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala index f944aaf..e11fb4f 100644 --- a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala +++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala @@ -41,7 +41,7 @@ class TestSchemaUtils extends ExpectedExceptionTest { fields :+= DataTypes.createStructField("k1", DataTypes.ByteType, true) fields :+= DataTypes.createStructField("k5", DataTypes.LongType, true) val expected = DataTypes.createStructType(fields.asJava) - Assert.assertEquals(expected, SchemaUtils.convertToStruct("k1,k5", schema)) + Assert.assertEquals(expected, SchemaUtils.convertToStruct(schema, "k1,k5", null)) } @Test @@ -93,4 +93,24 @@ class TestSchemaUtils extends ExpectedExceptionTest { Assert.assertEquals(expected, SchemaUtils.convertToSchema(Seq(k1, k2))) } + + @Test + def testIgnoreTypes(): Unit = { + + val schema = new Schema + schema.setStatus(200) + val col1 = new Field("col1", "TINYINT", "", 0, 0, "") + val col2 = new Field("col2", "BITMAP", "", 0, 0, "") + val col3 = new Field("col3", "HLL", "", 0, 0, "") + schema.put(col1) + schema.put(col2) + schema.put(col3) + + var fields = List[StructField]() + fields :+= DataTypes.createStructField("col1", DataTypes.ByteType, true) + val expected = DataTypes.createStructType(fields.asJava) + Assert.assertEquals(expected, SchemaUtils.convertToStruct(schema, null, "bitmap,hll")) + + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org