This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 39b5fbc4 [source]Add compatibility for IP data types after Doris 2.1.3 
 (#576)
39b5fbc4 is described below

commit 39b5fbc4ba354b1f3f557e7819cf47596bb20972
Author: Petrichor <1401597...@qq.com>
AuthorDate: Mon Mar 17 19:15:15 2025 +0800

    [source]Add compatibility for IP data types after Doris 2.1.3  (#576)
---
 .../apache/doris/flink/serialization/RowBatch.java | 109 ++++----
 .../doris/flink/serialization/TestRowBatch.java    | 292 ++++++++++-----------
 2 files changed, 200 insertions(+), 201 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index ddaf6600..f012b9c6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -46,7 +46,7 @@ import org.apache.arrow.vector.complex.impl.UnionMapReader;
 import org.apache.arrow.vector.complex.reader.FieldReader;
 import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.ipc.ArrowStreamReader;
-import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.exception.DorisRuntimeException;
 import org.apache.doris.flink.rest.models.Schema;
@@ -177,7 +177,7 @@ public class RowBatch {
                     throw new DorisException(
                             "Load Doris data failed, schema size of fetch data 
is wrong.");
                 }
-                if (fieldVectors.size() == 0 || root.getRowCount() == 0) {
+                if (fieldVectors.isEmpty() || root.getRowCount() == 0) {
                     logger.debug("One batch in arrow has no data.");
                     continue;
                 }
@@ -217,12 +217,12 @@ public class RowBatch {
         try {
             for (int col = 0; col < fieldVectors.size(); col++) {
                 FieldVector fieldVector = fieldVectors.get(col);
-                Types.MinorType minorType = fieldVector.getMinorType();
+                MinorType minorType = fieldVector.getMinorType();
                 final String currentType = schema.get(col).getType();
                 for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
                     boolean passed = doConvert(col, rowIndex, minorType, 
currentType, fieldVector);
                     if (!passed) {
-                        throw new java.lang.IllegalArgumentException(
+                        throw new IllegalArgumentException(
                                 "FLINK type is "
                                         + currentType
                                         + ", but arrow type is "
@@ -239,17 +239,13 @@ public class RowBatch {
 
     @VisibleForTesting
     public boolean doConvert(
-            int col,
-            int rowIndex,
-            Types.MinorType minorType,
-            String currentType,
-            FieldVector fieldVector)
+            int col, int rowIndex, MinorType minorType, String currentType, 
FieldVector fieldVector)
             throws DorisException {
         switch (currentType) {
             case "NULL_TYPE":
                 break;
             case "BOOLEAN":
-                if (!minorType.equals(Types.MinorType.BIT)) {
+                if (!minorType.equals(MinorType.BIT)) {
                     return false;
                 }
                 BitVector bitVector = (BitVector) fieldVector;
@@ -258,7 +254,7 @@ public class RowBatch {
                 addValueToRow(rowIndex, fieldValue);
                 break;
             case "TINYINT":
-                if (!minorType.equals(Types.MinorType.TINYINT)) {
+                if (!minorType.equals(MinorType.TINYINT)) {
                     return false;
                 }
                 TinyIntVector tinyIntVector = (TinyIntVector) fieldVector;
@@ -266,7 +262,7 @@ public class RowBatch {
                 addValueToRow(rowIndex, fieldValue);
                 break;
             case "SMALLINT":
-                if (!minorType.equals(Types.MinorType.SMALLINT)) {
+                if (!minorType.equals(MinorType.SMALLINT)) {
                     return false;
                 }
                 SmallIntVector smallIntVector = (SmallIntVector) fieldVector;
@@ -274,7 +270,7 @@ public class RowBatch {
                 addValueToRow(rowIndex, fieldValue);
                 break;
             case "INT":
-                if (!minorType.equals(Types.MinorType.INT)) {
+                if (!minorType.equals(MinorType.INT)) {
                     return false;
                 }
                 IntVector intVector = (IntVector) fieldVector;
@@ -282,25 +278,39 @@ public class RowBatch {
                 addValueToRow(rowIndex, fieldValue);
                 break;
             case "IPV4":
-                if (!minorType.equals(Types.MinorType.UINT4)
-                        && !minorType.equals(Types.MinorType.INT)) {
+                if (!minorType.equals(MinorType.UINT4)
+                        && !minorType.equals(MinorType.INT)
+                        && !minorType.equals(MinorType.VARCHAR)) {
                     return false;
                 }
-                BaseIntVector ipv4Vector;
-                if (minorType.equals(Types.MinorType.INT)) {
-                    ipv4Vector = (IntVector) fieldVector;
 
+                if (fieldVector.isNull(rowIndex)) {
+                    addValueToRow(rowIndex, null);
+                    break;
+                }
+
+                if (minorType.equals(MinorType.VARCHAR)) {
+                    VarCharVector ipv4VarcharVector = (VarCharVector) 
fieldVector;
+                    String ipv4Str =
+                            new String(ipv4VarcharVector.get(rowIndex), 
StandardCharsets.UTF_8);
+                    addValueToRow(rowIndex, ipv4Str);
                 } else {
-                    ipv4Vector = (UInt4Vector) fieldVector;
+                    BaseIntVector ipv4Vector;
+                    if (minorType.equals(MinorType.INT)) {
+                        ipv4Vector = (IntVector) fieldVector;
+
+                    } else {
+                        ipv4Vector = (UInt4Vector) fieldVector;
+                    }
+                    fieldValue =
+                            ipv4Vector.isNull(rowIndex)
+                                    ? null
+                                    : 
convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex));
+                    addValueToRow(rowIndex, fieldValue);
                 }
-                fieldValue =
-                        ipv4Vector.isNull(rowIndex)
-                                ? null
-                                : 
convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex));
-                addValueToRow(rowIndex, fieldValue);
                 break;
             case "BIGINT":
-                if (!minorType.equals(Types.MinorType.BIGINT)) {
+                if (!minorType.equals(MinorType.BIGINT)) {
                     return false;
                 }
                 BigIntVector bigIntVector = (BigIntVector) fieldVector;
@@ -308,7 +318,7 @@ public class RowBatch {
                 addValueToRow(rowIndex, fieldValue);
                 break;
             case "FLOAT":
-                if (!minorType.equals(Types.MinorType.FLOAT4)) {
+                if (!minorType.equals(MinorType.FLOAT4)) {
                     return false;
                 }
                 Float4Vector float4Vector = (Float4Vector) fieldVector;
@@ -317,7 +327,7 @@ public class RowBatch {
                 break;
             case "TIME":
             case "DOUBLE":
-                if (!minorType.equals(Types.MinorType.FLOAT8)) {
+                if (!minorType.equals(MinorType.FLOAT8)) {
                     return false;
                 }
                 Float8Vector float8Vector = (Float8Vector) fieldVector;
@@ -325,7 +335,7 @@ public class RowBatch {
                 addValueToRow(rowIndex, fieldValue);
                 break;
             case "BINARY":
-                if (!minorType.equals(Types.MinorType.VARBINARY)) {
+                if (!minorType.equals(MinorType.VARBINARY)) {
                     return false;
                 }
                 VarBinaryVector varBinaryVector = (VarBinaryVector) 
fieldVector;
@@ -339,7 +349,7 @@ public class RowBatch {
             case "DECIMAL64":
             case "DECIMAL128I":
             case "DECIMAL128":
-                if (!minorType.equals(Types.MinorType.DECIMAL)) {
+                if (!minorType.equals(MinorType.DECIMAL)) {
                     return false;
                 }
                 DecimalVector decimalVector = (DecimalVector) fieldVector;
@@ -352,11 +362,10 @@ public class RowBatch {
                 break;
             case "DATE":
             case "DATEV2":
-                if (!minorType.equals(Types.MinorType.DATEDAY)
-                        && !minorType.equals(Types.MinorType.VARCHAR)) {
+                if (!minorType.equals(MinorType.DATEDAY) && 
!minorType.equals(MinorType.VARCHAR)) {
                     return false;
                 }
-                if (minorType.equals(Types.MinorType.VARCHAR)) {
+                if (minorType.equals(MinorType.VARCHAR)) {
                     VarCharVector date = (VarCharVector) fieldVector;
                     if (date.isNull(rowIndex)) {
                         addValueToRow(rowIndex, null);
@@ -376,7 +385,7 @@ public class RowBatch {
                 }
                 break;
             case "DATETIME":
-                if (minorType.equals(Types.MinorType.VARCHAR)) {
+                if (minorType.equals(MinorType.VARCHAR)) {
                     VarCharVector varCharVector = (VarCharVector) fieldVector;
                     if (varCharVector.isNull(rowIndex)) {
                         addValueToRow(rowIndex, null);
@@ -400,7 +409,7 @@ public class RowBatch {
                 }
                 break;
             case "DATETIMEV2":
-                if (minorType.equals(Types.MinorType.VARCHAR)) {
+                if (minorType.equals(MinorType.VARCHAR)) {
                     VarCharVector varCharVector = (VarCharVector) fieldVector;
                     if (varCharVector.isNull(rowIndex)) {
                         addValueToRow(rowIndex, null);
@@ -424,11 +433,11 @@ public class RowBatch {
                 }
                 break;
             case "LARGEINT":
-                if (!minorType.equals(Types.MinorType.FIXEDSIZEBINARY)
-                        && !minorType.equals(Types.MinorType.VARCHAR)) {
+                if (!minorType.equals(MinorType.FIXEDSIZEBINARY)
+                        && !minorType.equals(MinorType.VARCHAR)) {
                     return false;
                 }
-                if (minorType.equals(Types.MinorType.FIXEDSIZEBINARY)) {
+                if (minorType.equals(MinorType.FIXEDSIZEBINARY)) {
                     FixedSizeBinaryVector largeIntVector = 
(FixedSizeBinaryVector) fieldVector;
                     if (largeIntVector.isNull(rowIndex)) {
                         addValueToRow(rowIndex, null);
@@ -464,7 +473,7 @@ public class RowBatch {
             case "JSONB":
             case "JSON":
             case "VARIANT":
-                if (!minorType.equals(Types.MinorType.VARCHAR)) {
+                if (!minorType.equals(MinorType.VARCHAR)) {
                     return false;
                 }
                 VarCharVector varCharVector = (VarCharVector) fieldVector;
@@ -477,21 +486,27 @@ public class RowBatch {
                 addValueToRow(rowIndex, stringValue);
                 break;
             case "IPV6":
-                if (!minorType.equals(Types.MinorType.VARCHAR)) {
+                if (!minorType.equals(MinorType.VARCHAR)) {
                     return false;
                 }
-                VarCharVector ipv6VarcharVector = (VarCharVector) fieldVector;
-                if (ipv6VarcharVector.isNull(rowIndex)) {
+
+                if (fieldVector.isNull(rowIndex)) {
                     addValueToRow(rowIndex, null);
                     break;
                 }
+
+                VarCharVector ipv6VarcharVector = (VarCharVector) fieldVector;
                 String ipv6Str =
                         new String(ipv6VarcharVector.get(rowIndex), 
StandardCharsets.UTF_8);
-                String ipv6Address = IPUtils.fromBigInteger(new 
BigInteger(ipv6Str));
-                addValueToRow(rowIndex, ipv6Address);
+                if (ipv6Str.contains(":")) {
+                    addValueToRow(rowIndex, ipv6Str);
+                } else {
+                    String ipv6Address = IPUtils.fromBigInteger(new 
BigInteger(ipv6Str));
+                    addValueToRow(rowIndex, ipv6Address);
+                }
                 break;
             case "ARRAY":
-                if (!minorType.equals(Types.MinorType.LIST)) {
+                if (!minorType.equals(MinorType.LIST)) {
                     return false;
                 }
                 ListVector listVector = (ListVector) fieldVector;
@@ -501,7 +516,7 @@ public class RowBatch {
                 addValueToRow(rowIndex, listValue);
                 break;
             case "MAP":
-                if (!minorType.equals(Types.MinorType.MAP)) {
+                if (!minorType.equals(MinorType.MAP)) {
                     return false;
                 }
                 MapVector mapVector = (MapVector) fieldVector;
@@ -522,7 +537,7 @@ public class RowBatch {
                 addValueToRow(rowIndex, mapValue);
                 break;
             case "STRUCT":
-                if (!minorType.equals(Types.MinorType.STRUCT)) {
+                if (!minorType.equals(MinorType.STRUCT)) {
                     return false;
                 }
                 StructVector structVector = (StructVector) fieldVector;
@@ -627,7 +642,7 @@ public class RowBatch {
         return rowBatch.get(offsetInRowBatch++).getCols();
     }
 
-    private String typeMismatchMessage(final String flinkType, final 
Types.MinorType arrowType) {
+    private String typeMismatchMessage(final String flinkType, final MinorType 
arrowType) {
         final String messageTemplate = "FLINK type is %1$s, but arrow type is 
%2$s.";
         return String.format(messageTemplate, flinkType, arrowType.name());
     }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index 3d9ac261..b13cfe15 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -52,6 +52,7 @@ import org.apache.arrow.vector.types.DateUnit;
 import org.apache.arrow.vector.types.FloatingPointPrecision;
 import org.apache.arrow.vector.types.TimeUnit;
 import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
@@ -964,15 +965,16 @@ public class TestRowBatch {
 
     @Test
     public void testIPV6() throws DorisException, IOException {
-
-        ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
-        childrenBuilder.add(new Field("k1", FieldType.nullable(new 
ArrowType.Utf8()), null));
+        ImmutableList<Field> childrenFields =
+                ImmutableList.of(
+                        new Field("k1", FieldType.nullable(new 
ArrowType.Utf8()), null),
+                        new Field("k2", FieldType.nullable(new 
ArrowType.Utf8()), null));
 
         VectorSchemaRoot root =
                 VectorSchemaRoot.create(
-                        new org.apache.arrow.vector.types.pojo.Schema(
-                                childrenBuilder.build(), null),
+                        new 
org.apache.arrow.vector.types.pojo.Schema(childrenFields, null),
                         new RootAllocator(Integer.MAX_VALUE));
+
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
         ArrowStreamWriter arrowStreamWriter =
                 new ArrowStreamWriter(
@@ -981,52 +983,55 @@ public class TestRowBatch {
         arrowStreamWriter.start();
         root.setRowCount(13);
 
-        FieldVector vector = root.getVector("k1");
-        VarCharVector ipv6Vector = (VarCharVector) vector;
+        VarCharVector ipv6Vector = (VarCharVector) root.getVector("k1");
+        VarCharVector ipv6Vector1 = (VarCharVector) root.getVector("k2");
         ipv6Vector.setInitialCapacity(13);
         ipv6Vector.allocateNew();
-        ipv6Vector.setIndexDefined(0);
-        ipv6Vector.setValueLengthSafe(0, 1);
-        ipv6Vector.setSafe(0, "0".getBytes());
-
-        ipv6Vector.setIndexDefined(1);
-        ipv6Vector.setValueLengthSafe(0, 1);
-        ipv6Vector.setSafe(1, "1".getBytes());
-
-        ipv6Vector.setIndexDefined(2);
-        ipv6Vector.setSafe(2, "65535".getBytes());
-
-        ipv6Vector.setIndexDefined(3);
-        ipv6Vector.setSafe(3, "65536".getBytes());
-
-        ipv6Vector.setIndexDefined(4);
-        ipv6Vector.setSafe(4, "4294967295".getBytes());
-
-        ipv6Vector.setIndexDefined(5);
-        ipv6Vector.setSafe(5, "4294967296".getBytes());
-
-        ipv6Vector.setIndexDefined(6);
-        ipv6Vector.setSafe(6, "8589934591".getBytes());
-
-        ipv6Vector.setIndexDefined(7);
-        ipv6Vector.setSafe(7, "281470681743359".getBytes());
-
-        ipv6Vector.setIndexDefined(8);
-        ipv6Vector.setSafe(8, "281470681743360".getBytes());
-
-        ipv6Vector.setIndexDefined(9);
-        ipv6Vector.setSafe(9, "281474976710655".getBytes());
-
-        ipv6Vector.setIndexDefined(10);
-        ipv6Vector.setSafe(10, "281474976710656".getBytes());
-
-        ipv6Vector.setIndexDefined(11);
-        ipv6Vector.setSafe(11, 
"340277174624079928635746639885392347137".getBytes());
-
-        ipv6Vector.setIndexDefined(12);
-        ipv6Vector.setSafe(12, 
"340282366920938463463374607431768211455".getBytes());
+        ipv6Vector1.setInitialCapacity(13);
+        ipv6Vector1.allocateNew(13);
+
+        String[] k1Values = {
+            "0",
+            "1",
+            "65535",
+            "65536",
+            "4294967295",
+            "4294967296",
+            "8589934591",
+            "281470681743359",
+            "281470681743360",
+            "281474976710655",
+            "281474976710656",
+            "340277174624079928635746639885392347137",
+            "340282366920938463463374607431768211455"
+        };
+
+        String[] k2Values = {
+            "::",
+            "::1",
+            "::ffff",
+            "::0.1.0.0",
+            "::255.255.255.255",
+            "::1:0:0",
+            "::1:ffff:ffff",
+            "::fffe:ffff:ffff",
+            "::ffff:0.0.0.0",
+            "::ffff:255.255.255.255",
+            "::1:0:0:0",
+            "ffff::1:ffff:ffff:1",
+            "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
+        };
+
+        for (int i = 0; i < 13; i++) {
+            ipv6Vector.setIndexDefined(i);
+            ipv6Vector.setSafe(i, k1Values[i].getBytes());
+
+            ipv6Vector1.setIndexDefined(i);
+            ipv6Vector1.setSafe(i, k2Values[i].getBytes());
+        }
 
-        vector.setValueCount(13);
+        ipv6Vector.setValueCount(13);
+        ipv6Vector1.setValueCount(13);
         arrowStreamWriter.writeBatch();
 
         arrowStreamWriter.end();
@@ -1041,63 +1046,36 @@ public class TestRowBatch {
 
         String schemaStr =
                 "{\"properties\":["
-                        + 
"{\"type\":\"IPV6\",\"name\":\"k1\",\"comment\":\"\"}"
+                        + 
"{\"type\":\"IPV6\",\"name\":\"k1\",\"comment\":\"\"},"
+                        + 
"{\"type\":\"IPV6\",\"name\":\"k2\",\"comment\":\"\"}"
                         + "], \"status\":200}";
 
         Schema schema = RestService.parseSchema(schemaStr, logger);
 
         RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow0 = rowBatch.next();
-        assertEquals("::", actualRow0.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow1 = rowBatch.next();
-        assertEquals("::1", actualRow1.get(0));
 
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow2 = rowBatch.next();
-        assertEquals("::ffff", actualRow2.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow3 = rowBatch.next();
-        assertEquals("::0.1.0.0", actualRow3.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow4 = rowBatch.next();
-        assertEquals("::255.255.255.255", actualRow4.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow5 = rowBatch.next();
-        assertEquals("::1:0:0", actualRow5.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow6 = rowBatch.next();
-        assertEquals("::1:ffff:ffff", actualRow6.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow7 = rowBatch.next();
-        assertEquals("::fffe:ffff:ffff", actualRow7.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow8 = rowBatch.next();
-        assertEquals("::ffff:0.0.0.0", actualRow8.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow9 = rowBatch.next();
-        assertEquals("::ffff:255.255.255.255", actualRow9.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow10 = rowBatch.next();
-        assertEquals("::1:0:0:0", actualRow10.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow11 = rowBatch.next();
-        assertEquals("ffff::1:ffff:ffff:1", actualRow11.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow12 = rowBatch.next();
-        assertEquals("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", 
actualRow12.get(0));
+        String[][] expectedResults = {
+            {"::", "::"},
+            {"::1", "::1"},
+            {"::ffff", "::ffff"},
+            {"::0.1.0.0", "::0.1.0.0"},
+            {"::255.255.255.255", "::255.255.255.255"},
+            {"::1:0:0", "::1:0:0"},
+            {"::1:ffff:ffff", "::1:ffff:ffff"},
+            {"::fffe:ffff:ffff", "::fffe:ffff:ffff"},
+            {"::ffff:0.0.0.0", "::ffff:0.0.0.0"},
+            {"::ffff:255.255.255.255", "::ffff:255.255.255.255"},
+            {"::1:0:0:0", "::1:0:0:0"},
+            {"ffff::1:ffff:ffff:1", "ffff::1:ffff:ffff:1"},
+            {"ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", 
"ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"}
+        };
+
+        for (String[] expectedResult : expectedResults) {
+            Assert.assertTrue(rowBatch.hasNext());
+            List<Object> actualRow = rowBatch.next();
+            assertEquals(expectedResult[0], actualRow.get(0));
+            assertEquals(expectedResult[1], actualRow.get(1));
+        }
 
         Assert.assertFalse(rowBatch.hasNext());
         thrown.expect(NoSuchElementException.class);
@@ -1107,17 +1085,17 @@ public class TestRowBatch {
 
     @Test
     public void testIPV4() throws DorisException, IOException {
-
-        ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
-        childrenBuilder.add(
-                new Field("k1", FieldType.nullable(new ArrowType.Int(32, 
false)), null),
-                new Field("k2", FieldType.nullable(new ArrowType.Int(32, 
true)), null));
+        ImmutableList<Field> fields =
+                ImmutableList.of(
+                        new Field("k1", FieldType.nullable(new 
ArrowType.Int(32, false)), null),
+                        new Field("k2", FieldType.nullable(new 
ArrowType.Int(32, true)), null),
+                        new Field("k3", FieldType.nullable(new 
ArrowType.Utf8()), null));
 
         VectorSchemaRoot root =
                 VectorSchemaRoot.create(
-                        new org.apache.arrow.vector.types.pojo.Schema(
-                                childrenBuilder.build(), null),
+                        new org.apache.arrow.vector.types.pojo.Schema(fields, 
null),
                         new RootAllocator(Integer.MAX_VALUE));
+
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
         ArrowStreamWriter arrowStreamWriter =
                 new ArrowStreamWriter(
@@ -1126,37 +1104,48 @@ public class TestRowBatch {
         arrowStreamWriter.start();
         root.setRowCount(5);
 
-        FieldVector vector = root.getVector("k1");
-        UInt4Vector uInt4Vector = (UInt4Vector) vector;
+        long[] ipValues = {0, 255, 65535, 16777215, 4294967295L};
+        String[] ipStrings = {
+            "0.0.0.0", "0.0.0.255", "0.0.255.255", "0.255.255.255", 
"255.255.255.255"
+        };
+
+        UInt4Vector uInt4Vector = (UInt4Vector) root.getVector("k1");
         uInt4Vector.setInitialCapacity(5);
-        uInt4Vector.allocateNew(4);
-        uInt4Vector.setIndexDefined(0);
-        uInt4Vector.setSafe(0, 0);
-        uInt4Vector.setIndexDefined(1);
-        uInt4Vector.setSafe(1, 255);
-        uInt4Vector.setIndexDefined(2);
-        uInt4Vector.setSafe(2, 65535);
-        uInt4Vector.setIndexDefined(3);
-        uInt4Vector.setSafe(3, 16777215);
-        uInt4Vector.setIndexDefined(4);
-        uInt4Vector.setWithPossibleTruncate(4, 4294967295L);
+        uInt4Vector.allocateNew(5); // Fixed from 4 to 5 to match actual row 
count
 
-        FieldVector vector1 = root.getVector("k2");
-        IntVector intVector = (IntVector) vector1;
+        IntVector intVector = (IntVector) root.getVector("k2");
         intVector.setInitialCapacity(5);
-        intVector.allocateNew(4);
-        intVector.setIndexDefined(0);
-        intVector.setSafe(0, 0);
-        intVector.setIndexDefined(1);
-        intVector.setSafe(1, 255);
-        intVector.setIndexDefined(2);
-        intVector.setSafe(2, 65535);
-        intVector.setIndexDefined(3);
-        intVector.setSafe(3, 16777215);
-        intVector.setIndexDefined(4);
-        intVector.setWithPossibleTruncate(4, 4294967295L);
-        vector.setValueCount(5);
-        vector1.setValueCount(5);
+        intVector.allocateNew(5); // Fixed from 4 to 5 to match actual row 
count
+
+        VarCharVector varCharVector = (VarCharVector) root.getVector("k3");
+        varCharVector.setInitialCapacity(5);
+        varCharVector.allocateNew(5); // Fixed from 4 to 5 to match actual row 
count
+
+        for (int i = 0; i < 5; i++) {
+            uInt4Vector.setIndexDefined(i);
+            if (i < 4) {
+                uInt4Vector.setSafe(i, (int) ipValues[i]);
+            } else {
+                uInt4Vector.setWithPossibleTruncate(
+                        i, ipValues[i]); // Large value that might be truncated
+            }
+
+            intVector.setIndexDefined(i);
+            if (i < 4) {
+                intVector.setSafe(i, (int) ipValues[i]);
+            } else {
+                intVector.setWithPossibleTruncate(
+                        i, ipValues[i]); // Large value that might be truncated
+            }
+
+            varCharVector.setIndexDefined(i);
+            byte[] bytes = ipStrings[i].getBytes(StandardCharsets.UTF_8);
+            varCharVector.setSafe(i, bytes, 0, bytes.length);
+        }
+
+        uInt4Vector.setValueCount(5);
+        intVector.setValueCount(5);
+        varCharVector.setValueCount(5);
         arrowStreamWriter.writeBatch();
 
         arrowStreamWriter.end();
@@ -1172,30 +1161,25 @@ public class TestRowBatch {
         String schemaStr =
                 "{\"properties\":["
                         + 
"{\"type\":\"IPV4\",\"name\":\"k1\",\"comment\":\"\"},"
-                        + 
"{\"type\":\"IPV4\",\"name\":\"k2\",\"comment\":\"\"}"
+                        + 
"{\"type\":\"IPV4\",\"name\":\"k2\",\"comment\":\"\"},"
+                        + 
"{\"type\":\"IPV4\",\"name\":\"k3\",\"comment\":\"\"}"
                         + "], \"status\":200}";
 
         Schema schema = RestService.parseSchema(schemaStr, logger);
 
         RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow0 = rowBatch.next();
-        assertEquals("0.0.0.0", actualRow0.get(0));
-        assertEquals("0.0.0.0", actualRow0.get(1));
-        List<Object> actualRow1 = rowBatch.next();
-        assertEquals("0.0.0.255", actualRow1.get(0));
-        assertEquals("0.0.0.255", actualRow1.get(1));
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow2 = rowBatch.next();
-        assertEquals("0.0.255.255", actualRow2.get(0));
-        assertEquals("0.0.255.255", actualRow2.get(1));
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow3 = rowBatch.next();
-        assertEquals("0.255.255.255", actualRow3.get(0));
-        assertEquals("0.255.255.255", actualRow3.get(1));
-        List<Object> actualRow4 = rowBatch.next();
-        assertEquals("255.255.255.255", actualRow4.get(0));
-        assertEquals("255.255.255.255", actualRow4.get(1));
+
+        // Validate each row of data
+        for (int i = 0; i < 5; i++) {
+            Assert.assertTrue(
+                    "Expected row " + i + " to exist, but it doesn't", 
rowBatch.hasNext());
+            List<Object> actualRow = rowBatch.next();
+            assertEquals(ipStrings[i], actualRow.get(0));
+            assertEquals(ipStrings[i], actualRow.get(1));
+            assertEquals(ipStrings[i], actualRow.get(2));
+        }
+
+        // Ensure no more rows exist
         Assert.assertFalse(rowBatch.hasNext());
         thrown.expect(NoSuchElementException.class);
         thrown.expectMessage(startsWith("Get row offset:"));
@@ -1309,7 +1293,7 @@ public class TestRowBatch {
         flag = rowBatch.doConvert(1, 1, Types.MinorType.VARCHAR, "INT", null);
         Assert.assertFalse(flag);
 
-        flag = rowBatch.doConvert(1, 1, Types.MinorType.VARCHAR, "IPV4", null);
+        flag = rowBatch.doConvert(1, 1, MinorType.BIGINT, "IPV4", null);
         Assert.assertFalse(flag);
 
         flag = rowBatch.doConvert(1, 1, Types.MinorType.INT, "BIGINT", null);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to