This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 37c5175  [feature] support variant type (#197)
37c5175 is described below

commit 37c51754299ca8b6ebd0cd83d3a9c1c86cb26c4d
Author: gnehil <adamlee...@gmail.com>
AuthorDate: Wed Apr 10 15:32:51 2024 +0800

    [feature] support variant type (#197)
---
 spark-doris-connector/pom.xml                      |  2 +-
 .../apache/doris/spark/serialization/RowBatch.java |  1 +
 .../org/apache/doris/spark/sql/SchemaUtils.scala   |  1 +
 .../doris/spark/serialization/TestRowBatch.java    | 71 ++++++++++++++++++++++
 4 files changed, 74 insertions(+), 1 deletion(-)

diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml
index f5dce80..fef519d 100644
--- a/spark-doris-connector/pom.xml
+++ b/spark-doris-connector/pom.xml
@@ -77,7 +77,7 @@
         <project.scm.id>github</project.scm.id>
         <netty.version>4.1.77.Final</netty.version>
         <fasterxml.jackson.version>2.13.5</fasterxml.jackson.version>
-        <thrift-service.version>1.0.0</thrift-service.version>
+        <thrift-service.version>1.0.1</thrift-service.version>
         <testcontainers.version>1.17.6</testcontainers.version>
     </properties>
 
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
index 7c28f76..8fd84d3 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
@@ -379,6 +379,7 @@ public class RowBatch {
                     case "VARCHAR":
                     case "STRING":
                     case "JSONB":
+                    case "VARIANT":
                         
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
                                 typeMismatchMessage(currentType, mt));
                         VarCharVector varCharVector = (VarCharVector) 
curFieldVector;
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
index e298349..914190a 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
@@ -129,6 +129,7 @@ private[spark] object SchemaUtils {
       case "ARRAY"           => DataTypes.StringType
       case "MAP"             => MapType(DataTypes.StringType, 
DataTypes.StringType)
       case "STRUCT"          => DataTypes.StringType
+      case "VARIANT"         => DataTypes.StringType
       case "HLL"             =>
         throw new DorisException("Unsupported type " + dorisType)
       case _                             =>
diff --git 
a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
 
b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
index 1cf4136..348895d 100644
--- 
a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
+++ 
b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
@@ -850,4 +850,75 @@ public class TestRowBatch {
 
     }
 
+    @Test
+    public void testVariant() throws DorisException, IOException {
+
+        ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+        childrenBuilder.add(new Field("k1", FieldType.nullable(new 
ArrowType.Utf8()), null));
+
+        VectorSchemaRoot root = VectorSchemaRoot.create(
+                new 
org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null),
+                new RootAllocator(Integer.MAX_VALUE));
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(
+                root,
+                new DictionaryProvider.MapDictionaryProvider(),
+                outputStream);
+
+        arrowStreamWriter.start();
+        root.setRowCount(3);
+
+        FieldVector vector = root.getVector("k1");
+        VarCharVector datetimeVector = (VarCharVector)vector;
+        datetimeVector.setInitialCapacity(3);
+        datetimeVector.allocateNew();
+        datetimeVector.setIndexDefined(0);
+        datetimeVector.setValueLengthSafe(0, 20);
+        datetimeVector.setSafe(0, "{\"id\":\"a\"}".getBytes());
+        datetimeVector.setIndexDefined(1);
+        datetimeVector.setValueLengthSafe(1, 20);
+        datetimeVector.setSafe(1, "1000".getBytes());
+        datetimeVector.setIndexDefined(2);
+        datetimeVector.setValueLengthSafe(2, 20);
+        datetimeVector.setSafe(2, "123.456".getBytes());
+        vector.setValueCount(3);
+
+        arrowStreamWriter.writeBatch();
+
+        arrowStreamWriter.end();
+        arrowStreamWriter.close();
+
+        TStatus status = new TStatus();
+        status.setStatusCode(TStatusCode.OK);
+        TScanBatchResult scanBatchResult = new TScanBatchResult();
+        scanBatchResult.setStatus(status);
+        scanBatchResult.setEos(false);
+        scanBatchResult.setRows(outputStream.toByteArray());
+
+
+        String schemaStr = "{\"properties\":[" +
+                "{\"type\":\"VARIANT\",\"name\":\"k\",\"comment\":\"\"}" +
+                "], \"status\":200}";
+
+        Schema schema = RestService.parseSchema(schemaStr, logger);
+
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow0 = rowBatch.next();
+        Assert.assertEquals("{\"id\":\"a\"}", actualRow0.get(0));
+
+        List<Object> actualRow1 = rowBatch.next();
+        Assert.assertEquals("1000", actualRow1.get(0));
+
+        List<Object> actualRow2 = rowBatch.next();
+        Assert.assertEquals("123.456", actualRow2.get(0));
+
+        Assert.assertFalse(rowBatch.hasNext());
+        thrown.expect(NoSuchElementException.class);
+        thrown.expectMessage(startsWith("Get row offset:"));
+        rowBatch.next();
+
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to