This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 37c5175 [feature] support variant type (#197) 37c5175 is described below commit 37c51754299ca8b6ebd0cd83d3a9c1c86cb26c4d Author: gnehil <adamlee...@gmail.com> AuthorDate: Wed Apr 10 15:32:51 2024 +0800 [feature] support variant type (#197) --- spark-doris-connector/pom.xml | 2 +- .../apache/doris/spark/serialization/RowBatch.java | 1 + .../org/apache/doris/spark/sql/SchemaUtils.scala | 1 + .../doris/spark/serialization/TestRowBatch.java | 71 ++++++++++++++++++++++ 4 files changed, 74 insertions(+), 1 deletion(-) diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml index f5dce80..fef519d 100644 --- a/spark-doris-connector/pom.xml +++ b/spark-doris-connector/pom.xml @@ -77,7 +77,7 @@ <project.scm.id>github</project.scm.id> <netty.version>4.1.77.Final</netty.version> <fasterxml.jackson.version>2.13.5</fasterxml.jackson.version> - <thrift-service.version>1.0.0</thrift-service.version> + <thrift-service.version>1.0.1</thrift-service.version> <testcontainers.version>1.17.6</testcontainers.version> </properties> diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java index 7c28f76..8fd84d3 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java @@ -379,6 +379,7 @@ public class RowBatch { case "VARCHAR": case "STRING": case "JSONB": + case "VARIANT": Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR), typeMismatchMessage(currentType, mt)); VarCharVector varCharVector = (VarCharVector) curFieldVector; diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala index e298349..914190a 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala @@ -129,6 +129,7 @@ private[spark] object SchemaUtils { case "ARRAY" => DataTypes.StringType case "MAP" => MapType(DataTypes.StringType, DataTypes.StringType) case "STRUCT" => DataTypes.StringType + case "VARIANT" => DataTypes.StringType case "HLL" => throw new DorisException("Unsupported type " + dorisType) case _ => diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java index 1cf4136..348895d 100644 --- a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java +++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java @@ -850,4 +850,75 @@ public class TestRowBatch { } + @Test + public void testVariant() throws DorisException, IOException { + + ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); + childrenBuilder.add(new Field("k1", FieldType.nullable(new ArrowType.Utf8()), null)); + + VectorSchemaRoot root = VectorSchemaRoot.create( + new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null), + new RootAllocator(Integer.MAX_VALUE)); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter( + root, + new DictionaryProvider.MapDictionaryProvider(), + outputStream); + + arrowStreamWriter.start(); + root.setRowCount(3); + + FieldVector vector = root.getVector("k1"); + VarCharVector datetimeVector = (VarCharVector)vector; + datetimeVector.setInitialCapacity(3); + datetimeVector.allocateNew(); + datetimeVector.setIndexDefined(0); + datetimeVector.setValueLengthSafe(0, 20); + datetimeVector.setSafe(0, "{\"id\":\"a\"}".getBytes()); + datetimeVector.setIndexDefined(1); + datetimeVector.setValueLengthSafe(1, 20); + datetimeVector.setSafe(1, "1000".getBytes()); + datetimeVector.setIndexDefined(2); + datetimeVector.setValueLengthSafe(2, 20); + datetimeVector.setSafe(2, "123.456".getBytes()); + vector.setValueCount(3); + + arrowStreamWriter.writeBatch(); + + arrowStreamWriter.end(); + arrowStreamWriter.close(); + + TStatus status = new TStatus(); + status.setStatusCode(TStatusCode.OK); + TScanBatchResult scanBatchResult = new TScanBatchResult(); + scanBatchResult.setStatus(status); + scanBatchResult.setEos(false); + scanBatchResult.setRows(outputStream.toByteArray()); + + + String schemaStr = "{\"properties\":[" + + "{\"type\":\"VARIANT\",\"name\":\"k\",\"comment\":\"\"}" + + "], \"status\":200}"; + + Schema schema = RestService.parseSchema(schemaStr, logger); + + RowBatch rowBatch = new RowBatch(scanBatchResult, schema); + + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow0 = rowBatch.next(); + Assert.assertEquals("{\"id\":\"a\"}", actualRow0.get(0)); + + List<Object> actualRow1 = rowBatch.next(); + Assert.assertEquals("1000", actualRow1.get(0)); + + List<Object> actualRow2 = rowBatch.next(); + Assert.assertEquals("123.456", actualRow2.get(0)); + + Assert.assertFalse(rowBatch.hasNext()); + thrown.expect(NoSuchElementException.class); + thrown.expectMessage(startsWith("Get row offset:")); + rowBatch.next(); + + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org