This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 06aaddd [fix](connector) fix read map type convert error (#267) 06aaddd is described below commit 06aaddddb3e23d5a4e3fde11f7949749726c9f4b Author: gnehil <adamlee...@gmail.com> AuthorDate: Thu Feb 20 11:33:13 2025 +0800 [fix](connector) fix read map type convert error (#267) --- .../main/java/org/apache/doris/spark/client/read/RowBatch.java | 3 +-- .../main/scala/org/apache/doris/spark/util/RowConvertors.scala | 8 ++++---- .../java/org/apache/doris/spark/client/read/RowBatchTest.java | 9 +++------ .../scala/org/apache/doris/spark/util/RowConvertorsTest.scala | 5 ++++- .../scala/org/apache/doris/spark/read/DorisPartitionReader.scala | 6 +++++- 5 files changed, 17 insertions(+), 14 deletions(-) diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java index 6b06b37..759678d 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java @@ -51,7 +51,6 @@ import org.apache.doris.spark.util.IPUtils; import org.apache.spark.sql.types.Decimal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.collection.JavaConverters; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -484,7 +483,7 @@ public class RowBatch implements Serializable { value.put(Objects.toString(reader.key().readObject(), null), Objects.toString(reader.value().readObject(), null)); } - addValueToRow(rowIndex, JavaConverters.mapAsScalaMapConverter(value).asScala()); + addValueToRow(rowIndex, value); } break; case "STRUCT": diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala index dd8a7d5..31b7196 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala +++ b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala @@ -23,12 +23,12 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import java.sql.{Date, Timestamp} import java.time.LocalDate +import scala.collection.JavaConverters.mapAsScalaMapConverter import scala.collection.mutable object RowConvertors { @@ -114,9 +114,9 @@ object RowConvertors { case DateType if datetimeJava8ApiEnabled => v.asInstanceOf[LocalDate].toEpochDay.toInt case DateType => DateTimeUtils.fromJavaDate(v.asInstanceOf[Date]) case _: MapType => - val map = v.asInstanceOf[Map[String, String]] - val keys = map.keys.toArray - val values = map.values.toArray + val map = v.asInstanceOf[java.util.Map[String, String]].asScala + val keys = map.keys.toArray.map(UTF8String.fromString) + val values = map.values.toArray.map(UTF8String.fromString) ArrayBasedMapData(keys, values) case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | BinaryType | _:DecimalType => v case _ => throw new Exception(s"Unsupported spark type: ${dataType.typeName}") diff --git a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java index 7b0cca7..05123b4 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java +++ b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java @@ -688,14 +688,11 @@ public class RowBatchTest { RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false); Assert.assertTrue(rowBatch.hasNext()); - Assert.assertEquals(JavaConverters.mapAsScalaMapConverter(ImmutableMap.of("k1", "0")).asScala(), - rowBatch.next().get(0)); + Assert.assertEquals(ImmutableMap.of("k1", "0"), rowBatch.next().get(0)); Assert.assertTrue(rowBatch.hasNext()); - Assert.assertEquals(JavaConverters.mapAsScalaMapConverter(ImmutableMap.of("k2", "1")).asScala(), - rowBatch.next().get(0)); + Assert.assertEquals(ImmutableMap.of("k2", "1"), rowBatch.next().get(0)); Assert.assertTrue(rowBatch.hasNext()); - Assert.assertEquals(JavaConverters.mapAsScalaMapConverter(ImmutableMap.of("k3", "2")).asScala(), - rowBatch.next().get(0)); + Assert.assertEquals(ImmutableMap.of("k3", "2"), rowBatch.next().get(0)); Assert.assertFalse(rowBatch.hasNext()); } diff --git a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala index ce36c06..31a787f 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala +++ b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala @@ -26,6 +26,7 @@ import org.junit.jupiter.api.Test import java.sql.{Date, Timestamp} import java.time.LocalDate +import java.util class RowConvertorsTest { @@ -113,7 +114,9 @@ class RowConvertorsTest { Assert.assertTrue(RowConvertors.convertValue(Date.valueOf("2024-01-01"), DataTypes.DateType, false).isInstanceOf[Int]) Assert.assertTrue(RowConvertors.convertValue(LocalDate.now(), DataTypes.DateType, true).isInstanceOf[Int]) Assert.assertTrue(RowConvertors.convertValue("2024-01-01 12:34:56", DataTypes.TimestampType, false).isInstanceOf[Long]) - Assert.assertTrue(RowConvertors.convertValue(Map[String, String]("a" -> "1"), MapType(DataTypes.StringType, DataTypes.StringType), false).isInstanceOf[MapData]) + val map = new util.HashMap[String, String]() + map.put("a", "1") + Assert.assertTrue(RowConvertors.convertValue(map, MapType(DataTypes.StringType, DataTypes.StringType), false).isInstanceOf[MapData]) Assert.assertTrue(RowConvertors.convertValue("test", DataTypes.StringType, false).isInstanceOf[UTF8String]) } diff --git a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala index 0061fff..9cf2fd6 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala @@ -56,7 +56,11 @@ class DorisPartitionReader(inputPartition: InputPartition, schema: StructType, m values.zipWithIndex.foreach { case (value, index) => if (value == null) row.setNullAt(index) - else row.update(index, RowConvertors.convertValue(value, schema.fields(index).dataType, datetimeJava8ApiEnabled)) + else { + val dataType = schema.fields(index).dataType + val catalystValue = RowConvertors.convertValue(value, dataType, datetimeJava8ApiEnabled) + row.update(index, catalystValue) + } } } row --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org