This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c9ef61  [improvement] add new param for ignored type (#113)
5c9ef61 is described below

commit 5c9ef61506b1224352367324053be2b689d103fd
Author: gnehil <adamlee...@gmail.com>
AuthorDate: Tue Jun 13 14:42:59 2023 +0800

    [improvement] add new param for ignored type (#113)
    
    * add new param doris.ignore-type to ignore columns of specified type when 
creating relation
    * add comment
---
 .../doris/spark/cfg/ConfigurationOptions.java      |  7 +++++++
 .../org/apache/doris/spark/sql/SchemaUtils.scala   | 19 ++++++++++++++-----
 .../apache/doris/spark/sql/TestSchemaUtils.scala   | 22 +++++++++++++++++++++-
 3 files changed, 42 insertions(+), 6 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index a3f4061..9783c46 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -90,4 +90,11 @@ public interface ConfigurationOptions {
 
     int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50;
 
+    /**
+     * set types to ignore, split by comma
+     * e.g.
+     * "doris.ignore-type"="bitmap,hll"
+     */
+    String DORIS_IGNORE_TYPE = "doris.ignore-type";
+
 }
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
index 0535805..c7fad41 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
@@ -18,12 +18,13 @@
 package org.apache.doris.spark.sql
 
 import org.apache.doris.sdk.thrift.TScanColumnDesc
+
 import scala.collection.JavaConversions._
 import org.apache.doris.spark.cfg.Settings
 import org.apache.doris.spark.exception.DorisException
 import org.apache.doris.spark.rest.RestService
 import org.apache.doris.spark.rest.models.{Field, Schema}
-import org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD
+import org.apache.doris.spark.cfg.ConfigurationOptions.{DORIS_IGNORE_TYPE, 
DORIS_READ_FIELD}
 import org.apache.spark.sql.types._
 import org.slf4j.LoggerFactory
 
@@ -38,7 +39,9 @@ private[spark] object SchemaUtils {
    */
   def discoverSchema(cfg: Settings): StructType = {
     val schema = discoverSchemaFromFe(cfg)
-    convertToStruct(cfg.getProperty(DORIS_READ_FIELD), schema)
+    val dorisReadField = cfg.getProperty(DORIS_READ_FIELD)
+    val ignoreColumnType = cfg.getProperty(DORIS_IGNORE_TYPE)
+    convertToStruct(schema, dorisReadField, ignoreColumnType)
   }
 
   /**
@@ -57,14 +60,20 @@ private[spark] object SchemaUtils {
    * @param schema inner schema
    * @return Spark Catalyst StructType
    */
-  def convertToStruct(dorisReadFields: String, schema: Schema): StructType = {
-    val fieldList = if (dorisReadFields != null && dorisReadFields.length > 0) 
{
+  def convertToStruct(schema: Schema, dorisReadFields: String, ignoredTypes: 
String): StructType = {
+    val fieldList = if (dorisReadFields != null && dorisReadFields.nonEmpty) {
       dorisReadFields.split(",")
     } else {
       Array.empty[String]
     }
+    val ignoredTypeList = if (ignoredTypes != null && ignoredTypes.nonEmpty) {
+      ignoredTypes.split(",").map(t => t.trim.toUpperCase)
+    } else {
+      Array.empty[String]
+    }
     val fields = schema.getProperties
-      .filter(x => fieldList.contains(x.getName) || fieldList.isEmpty)
+      .filter(x => (fieldList.contains(x.getName) || fieldList.isEmpty)
+        && !ignoredTypeList.contains(x.getType))
       .map(f =>
         DataTypes.createStructField(
           f.getName,
diff --git 
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
index f944aaf..e11fb4f 100644
--- 
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
+++ 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
@@ -41,7 +41,7 @@ class TestSchemaUtils extends ExpectedExceptionTest {
     fields :+= DataTypes.createStructField("k1", DataTypes.ByteType, true)
     fields :+= DataTypes.createStructField("k5", DataTypes.LongType, true)
     val expected = DataTypes.createStructType(fields.asJava)
-    Assert.assertEquals(expected, SchemaUtils.convertToStruct("k1,k5", schema))
+    Assert.assertEquals(expected, SchemaUtils.convertToStruct(schema, "k1,k5", 
null))
   }
 
   @Test
@@ -93,4 +93,24 @@ class TestSchemaUtils extends ExpectedExceptionTest {
 
     Assert.assertEquals(expected, SchemaUtils.convertToSchema(Seq(k1, k2)))
   }
+
+  @Test
+  def testIgnoreTypes(): Unit = {
+
+    val schema = new Schema
+    schema.setStatus(200)
+    val col1 = new Field("col1", "TINYINT", "", 0, 0, "")
+    val col2 = new Field("col2", "BITMAP", "", 0, 0, "")
+    val col3 = new Field("col3", "HLL", "", 0, 0, "")
+    schema.put(col1)
+    schema.put(col2)
+    schema.put(col3)
+
+    var fields = List[StructField]()
+    fields :+= DataTypes.createStructField("col1", DataTypes.ByteType, true)
+    val expected = DataTypes.createStructType(fields.asJava)
+    Assert.assertEquals(expected, SchemaUtils.convertToStruct(schema, null, 
"bitmap,hll"))
+
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to