This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 06aaddd  [fix](connector) fix read map type convert error (#267)
06aaddd is described below

commit 06aaddddb3e23d5a4e3fde11f7949749726c9f4b
Author: gnehil <adamlee...@gmail.com>
AuthorDate: Thu Feb 20 11:33:13 2025 +0800

    [fix](connector) fix read map type convert error (#267)
---
 .../main/java/org/apache/doris/spark/client/read/RowBatch.java   | 3 +--
 .../main/scala/org/apache/doris/spark/util/RowConvertors.scala   | 8 ++++----
 .../java/org/apache/doris/spark/client/read/RowBatchTest.java    | 9 +++------
 .../scala/org/apache/doris/spark/util/RowConvertorsTest.scala    | 5 ++++-
 .../scala/org/apache/doris/spark/read/DorisPartitionReader.scala | 6 +++++-
 5 files changed, 17 insertions(+), 14 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
index 6b06b37..759678d 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
@@ -51,7 +51,6 @@ import org.apache.doris.spark.util.IPUtils;
 import org.apache.spark.sql.types.Decimal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.collection.JavaConverters;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -484,7 +483,7 @@ public class RowBatch implements Serializable {
                                 
value.put(Objects.toString(reader.key().readObject(), null),
                                         
Objects.toString(reader.value().readObject(), null));
                             }
-                            addValueToRow(rowIndex, 
JavaConverters.mapAsScalaMapConverter(value).asScala());
+                            addValueToRow(rowIndex, value);
                         }
                         break;
                     case "STRUCT":
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
index dd8a7d5..31b7196 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
@@ -23,12 +23,12 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils}
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 import java.sql.{Date, Timestamp}
 import java.time.LocalDate
+import scala.collection.JavaConverters.mapAsScalaMapConverter
 import scala.collection.mutable
 
 object RowConvertors {
@@ -114,9 +114,9 @@ object RowConvertors {
       case DateType if datetimeJava8ApiEnabled => 
v.asInstanceOf[LocalDate].toEpochDay.toInt
       case DateType => DateTimeUtils.fromJavaDate(v.asInstanceOf[Date])
       case _: MapType =>
-        val map = v.asInstanceOf[Map[String, String]]
-        val keys = map.keys.toArray
-        val values = map.values.toArray
+        val map = v.asInstanceOf[java.util.Map[String, String]].asScala
+        val keys = map.keys.toArray.map(UTF8String.fromString)
+        val values = map.values.toArray.map(UTF8String.fromString)
         ArrayBasedMapData(keys, values)
       case NullType | BooleanType | ByteType | ShortType | IntegerType | 
LongType | FloatType | DoubleType | BinaryType | _:DecimalType => v
       case _ => throw new Exception(s"Unsupported spark type: 
${dataType.typeName}")
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
 
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
index 7b0cca7..05123b4 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
@@ -688,14 +688,11 @@ public class RowBatchTest {
 
         RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
         Assert.assertTrue(rowBatch.hasNext());
-        
Assert.assertEquals(JavaConverters.mapAsScalaMapConverter(ImmutableMap.of("k1", 
"0")).asScala(),
-                rowBatch.next().get(0));
+        Assert.assertEquals(ImmutableMap.of("k1", "0"), 
rowBatch.next().get(0));
         Assert.assertTrue(rowBatch.hasNext());
-        
Assert.assertEquals(JavaConverters.mapAsScalaMapConverter(ImmutableMap.of("k2", 
"1")).asScala(),
-                rowBatch.next().get(0));
+        Assert.assertEquals(ImmutableMap.of("k2", "1"), 
rowBatch.next().get(0));
         Assert.assertTrue(rowBatch.hasNext());
-        
Assert.assertEquals(JavaConverters.mapAsScalaMapConverter(ImmutableMap.of("k3", 
"2")).asScala(),
-                rowBatch.next().get(0));
+        Assert.assertEquals(ImmutableMap.of("k3", "2"), 
rowBatch.next().get(0));
         Assert.assertFalse(rowBatch.hasNext());
 
     }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
index ce36c06..31a787f 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
@@ -26,6 +26,7 @@ import org.junit.jupiter.api.Test
 
 import java.sql.{Date, Timestamp}
 import java.time.LocalDate
+import java.util
 
 class RowConvertorsTest {
 
@@ -113,7 +114,9 @@ class RowConvertorsTest {
     Assert.assertTrue(RowConvertors.convertValue(Date.valueOf("2024-01-01"), 
DataTypes.DateType, false).isInstanceOf[Int])
     Assert.assertTrue(RowConvertors.convertValue(LocalDate.now(), 
DataTypes.DateType, true).isInstanceOf[Int])
     Assert.assertTrue(RowConvertors.convertValue("2024-01-01 12:34:56", 
DataTypes.TimestampType, false).isInstanceOf[Long])
-    Assert.assertTrue(RowConvertors.convertValue(Map[String, String]("a" -> 
"1"), MapType(DataTypes.StringType, DataTypes.StringType), 
false).isInstanceOf[MapData])
+    val map = new util.HashMap[String, String]()
+    map.put("a", "1")
+    Assert.assertTrue(RowConvertors.convertValue(map, 
MapType(DataTypes.StringType, DataTypes.StringType), 
false).isInstanceOf[MapData])
     Assert.assertTrue(RowConvertors.convertValue("test", DataTypes.StringType, 
false).isInstanceOf[UTF8String])
 
   }
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala
index 0061fff..9cf2fd6 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala
@@ -56,7 +56,11 @@ class DorisPartitionReader(inputPartition: InputPartition, 
schema: StructType, m
       values.zipWithIndex.foreach {
         case (value, index) =>
           if (value == null) row.setNullAt(index)
-          else row.update(index, RowConvertors.convertValue(value, 
schema.fields(index).dataType, datetimeJava8ApiEnabled))
+          else {
+            val dataType = schema.fields(index).dataType
+            val catalystValue = RowConvertors.convertValue(value, dataType, 
datetimeJava8ApiEnabled)
+            row.update(index, catalystValue)
+          }
       }
     }
     row


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to