This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 3f148f0 [improve] support read variant type (#363) 3f148f0 is described below commit 3f148f050b7b07b1b7ebaa091d95fcf1a454be5c Author: wudi <676366...@qq.com> AuthorDate: Wed Apr 10 15:34:10 2024 +0800 [improve] support read variant type (#363) --- flink-doris-connector/pom.xml | 4 +- .../apache/doris/flink/serialization/RowBatch.java | 1 + .../doris/flink/serialization/TestRowBatch.java | 70 ++++++++++++++++++++++ 3 files changed, 73 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index a8014d0..beaa5b9 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -67,7 +67,7 @@ under the License. </mailingLists> <properties> - <revision>1.6.0-SNAPSHOT</revision> + <revision>1.6.1-SNAPSHOT</revision> <flink.version>1.18.0</flink.version> <flink.major.version>1.18</flink.major.version> <flink.sql.cdc.version>2.4.2</flink.sql.cdc.version> @@ -79,7 +79,7 @@ under the License. <maven-source-plugin.version>3.2.1</maven-source-plugin.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.scm.id>github</project.scm.id> - <thrift-service.version>1.0.0</thrift-service.version> + <thrift-service.version>1.0.1</thrift-service.version> <checkstyle.version>8.14</checkstyle.version> <maven-checkstyle-plugin.version>2.17</maven-checkstyle-plugin.version> <spotless.version>2.4.2</spotless.version> diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java index ed649ca..c6dfa0e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java @@ -378,6 +378,7 @@ public class RowBatch { case "VARCHAR": case "STRING": case "JSONB": + case "VARIANT": if (!minorType.equals(Types.MinorType.VARCHAR)) { return false; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java index 6b29078..481004b 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java @@ -885,4 +885,74 @@ public class TestRowBatch { thrown.expectMessage(startsWith("Get row offset:")); rowBatch.next(); } + + @Test + public void testVariant() throws DorisException, IOException { + + ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); + childrenBuilder.add(new Field("k1", FieldType.nullable(new ArrowType.Utf8()), null)); + + VectorSchemaRoot root = + VectorSchemaRoot.create( + new org.apache.arrow.vector.types.pojo.Schema( + childrenBuilder.build(), null), + new RootAllocator(Integer.MAX_VALUE)); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ArrowStreamWriter arrowStreamWriter = + new ArrowStreamWriter( + root, new DictionaryProvider.MapDictionaryProvider(), outputStream); + + arrowStreamWriter.start(); + root.setRowCount(3); + + FieldVector vector = root.getVector("k1"); + VarCharVector datetimeVector = (VarCharVector) vector; + datetimeVector.setInitialCapacity(3); + datetimeVector.allocateNew(); + datetimeVector.setIndexDefined(0); + datetimeVector.setValueLengthSafe(0, 20); + datetimeVector.setSafe(0, "{\"id\":\"a\"}".getBytes()); + datetimeVector.setIndexDefined(1); + datetimeVector.setValueLengthSafe(1, 20); + datetimeVector.setSafe(1, "1000".getBytes()); + datetimeVector.setIndexDefined(2); + datetimeVector.setValueLengthSafe(2, 20); + datetimeVector.setSafe(2, "123.456".getBytes()); + vector.setValueCount(3); + + arrowStreamWriter.writeBatch(); + + arrowStreamWriter.end(); + arrowStreamWriter.close(); + + TStatus status = new TStatus(); + status.setStatusCode(TStatusCode.OK); + TScanBatchResult scanBatchResult = new TScanBatchResult(); + scanBatchResult.setStatus(status); + scanBatchResult.setEos(false); + scanBatchResult.setRows(outputStream.toByteArray()); + + String schemaStr = + "{\"properties\":[{\"type\":\"VARIANT\",\"name\":\"k\",\"comment\":\"\"}" + + "], \"status\":200}"; + + Schema schema = RestService.parseSchema(schemaStr, logger); + + RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); + + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow0 = rowBatch.next(); + Assert.assertEquals("{\"id\":\"a\"}", actualRow0.get(0)); + + List<Object> actualRow1 = rowBatch.next(); + Assert.assertEquals("1000", actualRow1.get(0)); + + List<Object> actualRow2 = rowBatch.next(); + Assert.assertEquals("123.456", actualRow2.get(0)); + + Assert.assertFalse(rowBatch.hasNext()); + thrown.expect(NoSuchElementException.class); + thrown.expectMessage(startsWith("Get row offset:")); + rowBatch.next(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org