This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 4b761f3 [Feature](Connector) Use Array[String] instead of defaulting
to String (#362)
4b761f3 is described below
commit 4b761f379d58cbba32a3c0735ae5868d0862dc00
Author: Adesh Nalpet Adimurthy <[email protected]>
AuthorDate: Wed Jun 3 03:01:41 2026 -0400
[Feature](Connector) Use Array[String] instead of defaulting to String
(#362)
---
.../spark/client/read/AbstractThriftReader.java | 7 +-
.../spark/client/read/DorisFlightSqlReader.java | 4 +-
.../apache/doris/spark/client/read/RowBatch.java | 24 ++++-
.../apache/doris/spark/config/DorisOptions.java | 2 +
.../doris/spark/sql/DorisRowFlightSqlReader.scala | 2 +
.../doris/spark/sql/DorisRowThriftReader.scala | 2 +
.../doris/spark/sql/sources/DorisRelation.scala | 3 +-
.../apache/doris/spark/util/RowConvertors.scala | 12 ++-
.../apache/doris/spark/util/SchemaConvertors.scala | 6 +-
.../doris/spark/util/RowConvertorsTest.scala | 12 +++
.../doris/spark/util/SchemaConvertorsTest.scala | 8 +-
.../apache/doris/spark/sql/DorisReaderITCase.scala | 104 +++++++++++++++++++++
.../doris/spark/catalog/DorisTableBase.scala | 8 +-
13 files changed, 179 insertions(+), 15 deletions(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
index 76ea2a9..3c37e5c 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
@@ -76,6 +76,8 @@ public abstract class AbstractThriftReader extends
DorisReader {
private final Boolean datetimeJava8ApiEnabled;
+ private final boolean arrayNativeType;
+
protected AbstractThriftReader(DorisReaderPartition partition) throws
Exception {
super(partition);
this.frontend = new DorisFrontendClient(config);
@@ -112,6 +114,7 @@ public abstract class AbstractThriftReader extends
DorisReader {
this.asyncThread = null;
}
this.datetimeJava8ApiEnabled = partition.getDateTimeJava8APIEnabled();
+ this.arrayNativeType =
config.getValue(DorisOptions.DORIS_READ_ARRAY_NATIVE_TYPE);
}
private void runAsync() throws DorisException, InterruptedException {
@@ -128,7 +131,7 @@ public abstract class AbstractThriftReader extends
DorisReader {
});
endOfStream.set(nextResult.isEos());
if (!endOfStream.get()) {
- rowBatch = new RowBatch(nextResult, dorisSchema,
datetimeJava8ApiEnabled);
+ rowBatch = new RowBatch(nextResult, dorisSchema,
datetimeJava8ApiEnabled, arrayNativeType);
offset += rowBatch.getReadRowCount();
rowBatch.close();
rowBatchQueue.put(rowBatch);
@@ -187,7 +190,7 @@ public abstract class AbstractThriftReader extends
DorisReader {
});
endOfStream.set(nextResult.isEos());
if (!endOfStream.get()) {
- rowBatch = new RowBatch(nextResult, dorisSchema,
datetimeJava8ApiEnabled);
+ rowBatch = new RowBatch(nextResult, dorisSchema,
datetimeJava8ApiEnabled, arrayNativeType);
} else {
logger.info(
"Scan finished, tablets: {}, offset: {}",
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
index b691f83..ef35554 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
@@ -63,6 +63,7 @@ public class DorisFlightSqlReader extends DorisReader {
private AdbcConnection connection;
private final ArrowReader arrowReader;
private final Boolean datetimeJava8ApiEnabled;
+ private final boolean arrayNativeType;
private int totalBatches = 0;
private long totalRows = 0;
@@ -89,6 +90,7 @@ public class DorisFlightSqlReader extends DorisReader {
this.schema = processDorisSchema(partition);
this.arrowReader = executeQuery();
this.datetimeJava8ApiEnabled = partition.getDateTimeJava8APIEnabled();
+ this.arrayNativeType =
config.getValue(DorisOptions.DORIS_READ_ARRAY_NATIVE_TYPE);
}
@Override
@@ -111,7 +113,7 @@ public class DorisFlightSqlReader extends DorisReader {
throw new DorisException(e);
}
if (!endOfStream.get()) {
- rowBatch = new RowBatch(arrowReader, schema,
datetimeJava8ApiEnabled);
+ rowBatch = new RowBatch(arrowReader, schema,
datetimeJava8ApiEnabled, arrayNativeType);
}
}
return !endOfStream.get();
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
index d937edd..fe5c59f 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
@@ -106,12 +106,19 @@ public class RowBatch implements Serializable {
private final Boolean datetimeJava8ApiEnabled;
+ private final boolean arrayNativeType;
+
public RowBatch(TScanBatchResult nextResult, Schema schema, Boolean
datetimeJava8ApiEnabled) throws DorisException {
+ this(nextResult, schema, datetimeJava8ApiEnabled, false);
+ }
+
+ public RowBatch(TScanBatchResult nextResult, Schema schema, Boolean
datetimeJava8ApiEnabled, boolean arrayNativeType) throws DorisException {
this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
this.arrowReader = new ArrowStreamReader(new
ByteArrayInputStream(nextResult.getRows()), rootAllocator);
this.schema = schema;
this.datetimeJava8ApiEnabled = datetimeJava8ApiEnabled;
+ this.arrayNativeType = arrayNativeType;
try {
VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
@@ -128,10 +135,15 @@ public class RowBatch implements Serializable {
}
public RowBatch(ArrowReader reader, Schema schema, Boolean
datetimeJava8ApiEnabled) throws DorisException {
+ this(reader, schema, datetimeJava8ApiEnabled, false);
+ }
+
+ public RowBatch(ArrowReader reader, Schema schema, Boolean
datetimeJava8ApiEnabled, boolean arrayNativeType) throws DorisException {
this.arrowReader = reader;
this.schema = schema;
this.datetimeJava8ApiEnabled = datetimeJava8ApiEnabled;
+ this.arrayNativeType = arrayNativeType;
try {
VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
@@ -483,8 +495,16 @@ public class RowBatch implements Serializable {
addValueToRow(rowIndex, null);
continue;
}
- String value =
listVector.getObject(rowIndex).toString();
- addValueToRow(rowIndex, value);
+ List<?> rawList = listVector.getObject(rowIndex);
+ if (arrayNativeType) {
+ List<String> stringList = new
ArrayList<>(rawList.size());
+ for (Object item : rawList) {
+ stringList.add(item == null ? null :
item.toString());
+ }
+ addValueToRow(rowIndex, stringList);
+ } else {
+ addValueToRow(rowIndex, rawList.toString());
+ }
}
break;
case "MAP":
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
index 940f06b..1d77d63 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
@@ -142,6 +142,8 @@ public class DorisOptions {
public static final ConfigOption<Boolean> DORIS_READ_BITMAP_TO_BASE64 =
ConfigOptions.name("doris.read.bitmap-to-base64").booleanType().defaultValue(false).withDescription("");
+ public static final ConfigOption<Boolean> DORIS_READ_ARRAY_NATIVE_TYPE =
ConfigOptions.name("doris.read.array.native-type").booleanType().defaultValue(false).withDescription("If
true, Doris ARRAY columns are read as Spark ArrayType(StringType). If false
(default), they are read as a JSON-encoded String for backward compatibility.");
+
public static final ConfigOption<Integer> DORIS_SINK_NET_BUFFER_SIZE =
ConfigOptions.name("doris.sink.net.buffer.size").intType().defaultValue(1024 *
1024).withDescription("");
public static final ConfigOption<Boolean> DORIS_SINK_HTTP_UTF8_CHARSET =
ConfigOptions.name("doris.sink.http-utf8-charset").booleanType().defaultValue(false).withDescription("");
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala
index 03796c6..c4b1268 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala
@@ -36,6 +36,8 @@ class DorisRowFlightSqlReader(partition:
DorisReaderPartition) extends DorisFlig
rowBatch.next.asScala.zipWithIndex.foreach {
case (s, index) if index < row.values.size &&
s.isInstanceOf[java.util.HashMap[String, String]] =>
row.values.update(index, s.asInstanceOf[java.util.HashMap[String,
String]].asScala)
+ case (s, index) if index < row.values.size &&
s.isInstanceOf[java.util.List[_]] =>
+ row.values.update(index, s.asInstanceOf[java.util.List[_]].asScala)
case (s, index) if index < row.values.size => row.values.update(index, s)
case _ => // nothing
}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala
index 9c3cefa..6215c5b 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala
@@ -36,6 +36,8 @@ class DorisRowThriftReader(partition: DorisReaderPartition)
extends DorisThriftR
rowBatch.next.asScala.zipWithIndex.foreach {
case (s, index) if index < row.values.size &&
s.isInstanceOf[java.util.HashMap[String, String]] =>
row.values.update(index, s.asInstanceOf[java.util.HashMap[String,
String]].asScala)
+ case (s, index) if index < row.values.size &&
s.isInstanceOf[java.util.List[_]] =>
+ row.values.update(index, s.asInstanceOf[java.util.List[_]].asScala)
case (s, index) if index < row.values.size => row.values.update(index, s)
case _ => // nothing
}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala
index 55249e9..6fd7af5 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala
@@ -45,8 +45,9 @@ private[sql] class DorisRelation(
val tableIdentifier = cfg.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER)
val tableIdentifierArr =
tableIdentifier.split("\\.").map(_.replaceAll("`", ""))
val dorisSchema = frontend.getTableSchema(tableIdentifierArr(0),
tableIdentifierArr(1))
+ val arrayNativeType =
cfg.getValue(DorisOptions.DORIS_READ_ARRAY_NATIVE_TYPE)
StructType(dorisSchema.getProperties.asScala.map(field => {
- StructField(field.getName,
SchemaConvertors.toCatalystType(field.getType, field.getPrecision,
field.getScale))
+ StructField(field.getName,
SchemaConvertors.toCatalystType(field.getType, field.getPrecision,
field.getScale, arrayNativeType))
}))
}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
index 2d8b4d9..ba9d46f 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
@@ -22,7 +22,7 @@ import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils,
GenericArrayData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -131,6 +131,16 @@ object RowConvertors {
val keys = map.keys.toArray.map(UTF8String.fromString)
val values = map.values.toArray.map(UTF8String.fromString)
ArrayBasedMapData(keys, values)
+ case at: ArrayType =>
+ val list = v.asInstanceOf[java.util.List[_]]
+ val elements = new Array[Any](list.size())
+ var i = 0
+ while (i < list.size()) {
+ val elem = list.get(i)
+ elements(i) = if (elem == null) null else convertValue(elem,
at.elementType, datetimeJava8ApiEnabled)
+ i += 1
+ }
+ new GenericArrayData(elements)
case NullType | BooleanType | ByteType | ShortType | IntegerType |
LongType | FloatType | DoubleType | BinaryType | _: DecimalType => v
case _ => throw new Exception(s"Unsupported spark type:
${dataType.typeName}")
}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala
index 91b8317..8285064 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala
@@ -20,12 +20,12 @@ package org.apache.doris.spark.util
import org.apache.doris.sdk.thrift.TScanColumnDesc
import org.apache.doris.spark.rest.models.{Field, Schema}
-import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, MapType}
+import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes,
DecimalType, MapType}
object SchemaConvertors {
@throws[IllegalArgumentException]
- def toCatalystType(dorisType: String, precision: Int, scale: Int): DataType
= {
+ def toCatalystType(dorisType: String, precision: Int, scale: Int,
arrayNativeType: Boolean = false): DataType = {
dorisType match {
case "NULL_TYPE" => DataTypes.NullType
case "BOOLEAN" => DataTypes.BooleanType
@@ -52,7 +52,7 @@ object SchemaConvertors {
case "DECIMAL128" => DecimalType(precision, scale)
case "TIME" => DataTypes.DoubleType
case "STRING" => DataTypes.StringType
- case "ARRAY" => DataTypes.StringType
+ case "ARRAY" => if (arrayNativeType) ArrayType(DataTypes.StringType,
containsNull = true) else DataTypes.StringType
case "MAP" => MapType(DataTypes.StringType, DataTypes.StringType)
case "STRUCT" => DataTypes.StringType
case "VARIANT" => DataTypes.StringType
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
index 5a08a32..6ec4712 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
@@ -119,6 +119,18 @@ class RowConvertorsTest {
Assert.assertTrue(RowConvertors.convertValue(map,
MapType(DataTypes.StringType, DataTypes.StringType),
false).isInstanceOf[MapData])
Assert.assertTrue(RowConvertors.convertValue("test", DataTypes.StringType,
false).isInstanceOf[UTF8String])
+ val list = new util.ArrayList[String]()
+ list.add("a")
+ list.add(null)
+ list.add("b")
+ val arrayValue = RowConvertors.convertValue(list,
ArrayType(DataTypes.StringType), false)
+ Assert.assertTrue(arrayValue.isInstanceOf[ArrayData])
+ val arrayData = arrayValue.asInstanceOf[ArrayData]
+ Assert.assertEquals(3, arrayData.numElements())
+ Assert.assertEquals(UTF8String.fromString("a"), arrayData.getUTF8String(0))
+ Assert.assertTrue(arrayData.isNullAt(1))
+ Assert.assertEquals(UTF8String.fromString("b"), arrayData.getUTF8String(2))
+
}
}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/SchemaConvertorsTest.scala
b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/SchemaConvertorsTest.scala
index b259bc1..f014b5f 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/SchemaConvertorsTest.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/SchemaConvertorsTest.scala
@@ -18,7 +18,7 @@
package org.apache.doris.spark.util
-import org.apache.spark.sql.types.{DataTypes, DecimalType, MapType}
+import org.apache.spark.sql.types.{ArrayType, DataTypes, DecimalType, MapType}
import org.junit.Assert
import org.junit.jupiter.api.Test
@@ -63,4 +63,10 @@ class SchemaConvertorsTest {
}
+ @Test
+ def toCatalystTypeArrayNativeTypeTest(): Unit = {
+ Assert.assertEquals(SchemaConvertors.toCatalystType("ARRAY", -1, -1,
true), ArrayType(DataTypes.StringType, true))
+ Assert.assertEquals(SchemaConvertors.toCatalystType("ARRAY", -1, -1,
false), DataTypes.StringType)
+ }
+
}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
index 2ed6188..8815714 100644
---
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
@@ -562,6 +562,110 @@ class DorisReaderITCase(readMode: String, flightSqlPort:
Int) extends AbstractCo
}
}
+ @Test
+ def testReadArrayNativeType(): Unit = {
+ val sourceInitSql: Array[String] =
ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql")
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG,
sourceInitSql: _*)
+
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ try {
+ session.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_source
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}",
+ | "doris.read.mode"="${readMode}",
+ | "doris.read.arrow-flight-sql.port"="${flightSqlPort}",
+ | "doris.read.array.native-type"="true"
+ |)
+ |""".stripMargin)
+
+ val schemaType = session.sql("select c15 from
test_source").schema.fields(0).dataType.typeName
+ assert("array".equals(schemaType))
+
+ val sizes = session.sql(
+ """
+ |select id, size(c15) as sz from test_source where c15 is not null
order by id
+ |""".stripMargin).collect().toList.toString()
+ assert("List([1,2], [2,2], [3,2])".equals(sizes))
+
+ val arrayRows = session.sql(
+ """
+ |select id, c15 from test_source order by id
+ |""".stripMargin).collect()
+
+ assert(arrayRows(0).getList[String](1).toString == "[Alice, Bob]")
+ assert(arrayRows(1).getList[String](1).toString == "[Charlie, David]")
+ assert(arrayRows(2).getList[String](1).toString == "[Eve, Frank]")
+ assert(arrayRows(3).isNullAt(1))
+ } finally {
+ session.stop()
+ }
+ }
+
+ @Test
+ def testDataFrameSourceArrayNativeType(): Unit = {
+ val sourceInitSql: Array[String] =
ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql")
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG,
sourceInitSql: _*)
+
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ try {
+ val dorisSparkDF = session.read
+ .format("doris")
+ .option("doris.fenodes", getFenodes)
+ .option("doris.table.identifier", DATABASE + "." +
TABLE_READ_TBL_ALL_TYPES)
+ .option("doris.user", getDorisUsername)
+ .option("doris.password", getDorisPassword)
+ .option("doris.read.mode", readMode)
+ .option("doris.read.arrow-flight-sql.port", flightSqlPort.toString)
+ .option("doris.read.array.native-type", "true")
+ .load()
+
+ assert("array".equals(dorisSparkDF.schema("c15").dataType.typeName))
+
+ val sizes = dorisSparkDF
+ .where("c15 is not null")
+ .selectExpr("id", "size(c15) as sz")
+ .orderBy("id")
+ .collect().toList.toString()
+ assert("List([1,2], [2,2], [3,2])".equals(sizes))
+ } finally {
+ session.stop()
+ }
+ }
+
+ @Test
+ def testReadArrayDefaultsToString(): Unit = {
+ val sourceInitSql: Array[String] =
ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql")
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG,
sourceInitSql: _*)
+
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ try {
+ session.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_source
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}",
+ | "doris.read.mode"="${readMode}",
+ | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+ |)
+ |""".stripMargin)
+
+ val schemaType = session.sql("select c15 from
test_source").schema.fields(0).dataType.typeName
+ assert("string".equals(schemaType))
+ } finally {
+ session.stop()
+ }
+ }
+
@Test
def testExpressionNotPushDown(): Unit = {
val sourceInitSql: Array[String] =
ContainerUtils.parseFileContentSQL("container/ddl/read_filter_pushdown.sql")
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
index 2df410b..214732f 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
-import scala.language.implicitConversions
abstract class DorisTableBase(identifier: Identifier, config: DorisConfig,
schema: Option[StructType]) extends Table with SupportsRead with SupportsWrite {
@@ -42,7 +41,7 @@ abstract class DorisTableBase(identifier: Identifier, config:
DorisConfig, schem
override def schema(): StructType = schema.getOrElse({
val dorisSchema = frontend.getTableSchema(identifier.namespace()(0),
identifier.name())
- dorisSchema
+ dorisSchemaToStructType(dorisSchema)
})
override def capabilities(): util.Set[TableCapability] = {
@@ -69,9 +68,10 @@ abstract class DorisTableBase(identifier: Identifier,
config: DorisConfig, schem
createWriteBuilder(config, logicalWriteInfo.schema())
}
- private implicit def dorisSchemaToStructType(dorisSchema: Schema):
StructType = {
+ private def dorisSchemaToStructType(dorisSchema: Schema): StructType = {
+ val arrayNativeType =
config.getValue(DorisOptions.DORIS_READ_ARRAY_NATIVE_TYPE)
StructType(dorisSchema.getProperties.asScala.map(field => {
- StructField(field.getName,
SchemaConvertors.toCatalystType(field.getType, field.getPrecision,
field.getScale))
+ StructField(field.getName,
SchemaConvertors.toCatalystType(field.getType, field.getPrecision,
field.getScale, arrayNativeType))
}))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]