This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 39b5fbc4 [source]Add compatibility for IP data types after Doris 2.1.3 (#576) 39b5fbc4 is described below commit 39b5fbc4ba354b1f3f557e7819cf47596bb20972 Author: Petrichor <1401597...@qq.com> AuthorDate: Mon Mar 17 19:15:15 2025 +0800 [source]Add compatibility for IP data types after Doris 2.1.3 (#576) --- .../apache/doris/flink/serialization/RowBatch.java | 109 ++++---- .../doris/flink/serialization/TestRowBatch.java | 292 ++++++++++----------- 2 files changed, 200 insertions(+), 201 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java index ddaf6600..f012b9c6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java @@ -46,7 +46,7 @@ import org.apache.arrow.vector.complex.impl.UnionMapReader; import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.ipc.ArrowStreamReader; -import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.Types.MinorType; import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.exception.DorisRuntimeException; import org.apache.doris.flink.rest.models.Schema; @@ -177,7 +177,7 @@ public class RowBatch { throw new DorisException( "Load Doris data failed, schema size of fetch data is wrong."); } - if (fieldVectors.size() == 0 || root.getRowCount() == 0) { + if (fieldVectors.isEmpty() || root.getRowCount() == 0) { logger.debug("One batch in arrow has no data."); continue; } @@ -217,12 +217,12 @@ public class RowBatch { try { for (int col = 0; col < fieldVectors.size(); col++) { FieldVector fieldVector = fieldVectors.get(col); - Types.MinorType minorType = fieldVector.getMinorType(); + MinorType minorType = fieldVector.getMinorType(); final String currentType = schema.get(col).getType(); for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { boolean passed = doConvert(col, rowIndex, minorType, currentType, fieldVector); if (!passed) { - throw new java.lang.IllegalArgumentException( + throw new IllegalArgumentException( "FLINK type is " + currentType + ", but arrow type is " @@ -239,17 +239,13 @@ public class RowBatch { @VisibleForTesting public boolean doConvert( - int col, - int rowIndex, - Types.MinorType minorType, - String currentType, - FieldVector fieldVector) + int col, int rowIndex, MinorType minorType, String currentType, FieldVector fieldVector) throws DorisException { switch (currentType) { case "NULL_TYPE": break; case "BOOLEAN": - if (!minorType.equals(Types.MinorType.BIT)) { + if (!minorType.equals(MinorType.BIT)) { return false; } BitVector bitVector = (BitVector) fieldVector; @@ -258,7 +254,7 @@ public class RowBatch { addValueToRow(rowIndex, fieldValue); break; case "TINYINT": - if (!minorType.equals(Types.MinorType.TINYINT)) { + if (!minorType.equals(MinorType.TINYINT)) { return false; } TinyIntVector tinyIntVector = (TinyIntVector) fieldVector; @@ -266,7 +262,7 @@ public class RowBatch { addValueToRow(rowIndex, fieldValue); break; case "SMALLINT": - if (!minorType.equals(Types.MinorType.SMALLINT)) { + if (!minorType.equals(MinorType.SMALLINT)) { return false; } SmallIntVector smallIntVector = (SmallIntVector) fieldVector; @@ -274,7 +270,7 @@ public class RowBatch { addValueToRow(rowIndex, fieldValue); break; case "INT": - if (!minorType.equals(Types.MinorType.INT)) { + if (!minorType.equals(MinorType.INT)) { return false; } IntVector intVector = (IntVector) fieldVector; @@ -282,25 +278,39 @@ public class RowBatch { addValueToRow(rowIndex, fieldValue); break; case "IPV4": - if (!minorType.equals(Types.MinorType.UINT4) - && !minorType.equals(Types.MinorType.INT)) { + if (!minorType.equals(MinorType.UINT4) + && !minorType.equals(MinorType.INT) + && !minorType.equals(MinorType.VARCHAR)) { return false; } - BaseIntVector ipv4Vector; - if (minorType.equals(Types.MinorType.INT)) { - ipv4Vector = (IntVector) fieldVector; + if (fieldVector.isNull(rowIndex)) { + addValueToRow(rowIndex, null); + break; + } + + if (minorType.equals(MinorType.VARCHAR)) { + VarCharVector ipv4VarcharVector = (VarCharVector) fieldVector; + String ipv4Str = + new String(ipv4VarcharVector.get(rowIndex), StandardCharsets.UTF_8); + addValueToRow(rowIndex, ipv4Str); } else { - ipv4Vector = (UInt4Vector) fieldVector; + BaseIntVector ipv4Vector; + if (minorType.equals(MinorType.INT)) { + ipv4Vector = (IntVector) fieldVector; + + } else { + ipv4Vector = (UInt4Vector) fieldVector; + } + fieldValue = + ipv4Vector.isNull(rowIndex) + ? null + : convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex)); + addValueToRow(rowIndex, fieldValue); } - fieldValue = - ipv4Vector.isNull(rowIndex) - ? null - : convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex)); - addValueToRow(rowIndex, fieldValue); break; case "BIGINT": - if (!minorType.equals(Types.MinorType.BIGINT)) { + if (!minorType.equals(MinorType.BIGINT)) { return false; } BigIntVector bigIntVector = (BigIntVector) fieldVector; @@ -308,7 +318,7 @@ public class RowBatch { addValueToRow(rowIndex, fieldValue); break; case "FLOAT": - if (!minorType.equals(Types.MinorType.FLOAT4)) { + if (!minorType.equals(MinorType.FLOAT4)) { return false; } Float4Vector float4Vector = (Float4Vector) fieldVector; @@ -317,7 +327,7 @@ public class RowBatch { break; case "TIME": case "DOUBLE": - if (!minorType.equals(Types.MinorType.FLOAT8)) { + if (!minorType.equals(MinorType.FLOAT8)) { return false; } Float8Vector float8Vector = (Float8Vector) fieldVector; @@ -325,7 +335,7 @@ public class RowBatch { addValueToRow(rowIndex, fieldValue); break; case "BINARY": - if (!minorType.equals(Types.MinorType.VARBINARY)) { + if (!minorType.equals(MinorType.VARBINARY)) { return false; } VarBinaryVector varBinaryVector = (VarBinaryVector) fieldVector; @@ -339,7 +349,7 @@ public class RowBatch { case "DECIMAL64": case "DECIMAL128I": case "DECIMAL128": - if (!minorType.equals(Types.MinorType.DECIMAL)) { + if (!minorType.equals(MinorType.DECIMAL)) { return false; } DecimalVector decimalVector = (DecimalVector) fieldVector; @@ -352,11 +362,10 @@ public class RowBatch { break; case "DATE": case "DATEV2": - if (!minorType.equals(Types.MinorType.DATEDAY) - && !minorType.equals(Types.MinorType.VARCHAR)) { + if (!minorType.equals(MinorType.DATEDAY) && !minorType.equals(MinorType.VARCHAR)) { return false; } - if (minorType.equals(Types.MinorType.VARCHAR)) { + if (minorType.equals(MinorType.VARCHAR)) { VarCharVector date = (VarCharVector) fieldVector; if (date.isNull(rowIndex)) { addValueToRow(rowIndex, null); @@ -376,7 +385,7 @@ public class RowBatch { } break; case "DATETIME": - if (minorType.equals(Types.MinorType.VARCHAR)) { + if (minorType.equals(MinorType.VARCHAR)) { VarCharVector varCharVector = (VarCharVector) fieldVector; if (varCharVector.isNull(rowIndex)) { addValueToRow(rowIndex, null); @@ -400,7 +409,7 @@ public class RowBatch { } break; case "DATETIMEV2": - if (minorType.equals(Types.MinorType.VARCHAR)) { + if (minorType.equals(MinorType.VARCHAR)) { VarCharVector varCharVector = (VarCharVector) fieldVector; if (varCharVector.isNull(rowIndex)) { addValueToRow(rowIndex, null); @@ -424,11 +433,11 @@ public class RowBatch { } break; case "LARGEINT": - if (!minorType.equals(Types.MinorType.FIXEDSIZEBINARY) - && !minorType.equals(Types.MinorType.VARCHAR)) { + if (!minorType.equals(MinorType.FIXEDSIZEBINARY) + && !minorType.equals(MinorType.VARCHAR)) { return false; } - if (minorType.equals(Types.MinorType.FIXEDSIZEBINARY)) { + if (minorType.equals(MinorType.FIXEDSIZEBINARY)) { FixedSizeBinaryVector largeIntVector = (FixedSizeBinaryVector) fieldVector; if (largeIntVector.isNull(rowIndex)) { addValueToRow(rowIndex, null); @@ -464,7 +473,7 @@ public class RowBatch { case "JSONB": case "JSON": case "VARIANT": - if (!minorType.equals(Types.MinorType.VARCHAR)) { + if (!minorType.equals(MinorType.VARCHAR)) { return false; } VarCharVector varCharVector = (VarCharVector) fieldVector; @@ -477,21 +486,27 @@ public class RowBatch { addValueToRow(rowIndex, stringValue); break; case "IPV6": - if (!minorType.equals(Types.MinorType.VARCHAR)) { + if (!minorType.equals(MinorType.VARCHAR)) { return false; } - VarCharVector ipv6VarcharVector = (VarCharVector) fieldVector; - if (ipv6VarcharVector.isNull(rowIndex)) { + + if (fieldVector.isNull(rowIndex)) { addValueToRow(rowIndex, null); break; } + + VarCharVector ipv6VarcharVector = (VarCharVector) fieldVector; String ipv6Str = new String(ipv6VarcharVector.get(rowIndex), StandardCharsets.UTF_8); - String ipv6Address = IPUtils.fromBigInteger(new BigInteger(ipv6Str)); - addValueToRow(rowIndex, ipv6Address); + if (ipv6Str.contains(":")) { + addValueToRow(rowIndex, ipv6Str); + } else { + String ipv6Address = IPUtils.fromBigInteger(new BigInteger(ipv6Str)); + addValueToRow(rowIndex, ipv6Address); + } break; case "ARRAY": - if (!minorType.equals(Types.MinorType.LIST)) { + if (!minorType.equals(MinorType.LIST)) { return false; } ListVector listVector = (ListVector) fieldVector; @@ -501,7 +516,7 @@ public class RowBatch { addValueToRow(rowIndex, listValue); break; case "MAP": - if (!minorType.equals(Types.MinorType.MAP)) { + if (!minorType.equals(MinorType.MAP)) { return false; } MapVector mapVector = (MapVector) fieldVector; @@ -522,7 +537,7 @@ public class RowBatch { addValueToRow(rowIndex, mapValue); break; case "STRUCT": - if (!minorType.equals(Types.MinorType.STRUCT)) { + if (!minorType.equals(MinorType.STRUCT)) { return false; } StructVector structVector = (StructVector) fieldVector; @@ -627,7 +642,7 @@ public class RowBatch { return rowBatch.get(offsetInRowBatch++).getCols(); } - private String typeMismatchMessage(final String flinkType, final Types.MinorType arrowType) { + private String typeMismatchMessage(final String flinkType, final MinorType arrowType) { final String messageTemplate = "FLINK type is %1$s, but arrow type is %2$s."; return String.format(messageTemplate, flinkType, arrowType.name()); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java index 3d9ac261..b13cfe15 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java @@ -52,6 +52,7 @@ import org.apache.arrow.vector.types.DateUnit; import org.apache.arrow.vector.types.FloatingPointPrecision; import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; @@ -964,15 +965,16 @@ public class TestRowBatch { @Test public void testIPV6() throws DorisException, IOException { - - ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); - childrenBuilder.add(new Field("k1", FieldType.nullable(new ArrowType.Utf8()), null)); + ImmutableList<Field> childrenFields = + ImmutableList.of( + new Field("k1", FieldType.nullable(new ArrowType.Utf8()), null), + new Field("k2", FieldType.nullable(new ArrowType.Utf8()), null)); VectorSchemaRoot root = VectorSchemaRoot.create( - new org.apache.arrow.vector.types.pojo.Schema( - childrenBuilder.build(), null), + new org.apache.arrow.vector.types.pojo.Schema(childrenFields, null), new RootAllocator(Integer.MAX_VALUE)); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter( @@ -981,52 +983,55 @@ public class TestRowBatch { arrowStreamWriter.start(); root.setRowCount(13); - FieldVector vector = root.getVector("k1"); - VarCharVector ipv6Vector = (VarCharVector) vector; + VarCharVector ipv6Vector = (VarCharVector) root.getVector("k1"); + VarCharVector ipv6Vector1 = (VarCharVector) root.getVector("k2"); ipv6Vector.setInitialCapacity(13); ipv6Vector.allocateNew(); - ipv6Vector.setIndexDefined(0); - ipv6Vector.setValueLengthSafe(0, 1); - ipv6Vector.setSafe(0, "0".getBytes()); - - ipv6Vector.setIndexDefined(1); - ipv6Vector.setValueLengthSafe(0, 1); - ipv6Vector.setSafe(1, "1".getBytes()); - - ipv6Vector.setIndexDefined(2); - ipv6Vector.setSafe(2, "65535".getBytes()); - - ipv6Vector.setIndexDefined(3); - ipv6Vector.setSafe(3, "65536".getBytes()); - - ipv6Vector.setIndexDefined(4); - ipv6Vector.setSafe(4, "4294967295".getBytes()); - - ipv6Vector.setIndexDefined(5); - ipv6Vector.setSafe(5, "4294967296".getBytes()); - - ipv6Vector.setIndexDefined(6); - ipv6Vector.setSafe(6, "8589934591".getBytes()); - - ipv6Vector.setIndexDefined(7); - ipv6Vector.setSafe(7, "281470681743359".getBytes()); - - ipv6Vector.setIndexDefined(8); - ipv6Vector.setSafe(8, "281470681743360".getBytes()); - - ipv6Vector.setIndexDefined(9); - ipv6Vector.setSafe(9, "281474976710655".getBytes()); - - ipv6Vector.setIndexDefined(10); - ipv6Vector.setSafe(10, "281474976710656".getBytes()); - - ipv6Vector.setIndexDefined(11); - ipv6Vector.setSafe(11, "340277174624079928635746639885392347137".getBytes()); - - ipv6Vector.setIndexDefined(12); - ipv6Vector.setSafe(12, "340282366920938463463374607431768211455".getBytes()); + ipv6Vector1.setInitialCapacity(13); + ipv6Vector1.allocateNew(13); + + String[] k1Values = { + "0", + "1", + "65535", + "65536", + "4294967295", + "4294967296", + "8589934591", + "281470681743359", + "281470681743360", + "281474976710655", + "281474976710656", + "340277174624079928635746639885392347137", + "340282366920938463463374607431768211455" + }; + + String[] k2Values = { + "::", + "::1", + "::ffff", + "::0.1.0.0", + "::255.255.255.255", + "::1:0:0", + "::1:ffff:ffff", + "::fffe:ffff:ffff", + "::ffff:0.0.0.0", + "::ffff:255.255.255.255", + "::1:0:0:0", + "ffff::1:ffff:ffff:1", + "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff" + }; + + for (int i = 0; i < 13; i++) { + ipv6Vector.setIndexDefined(i); + ipv6Vector.setSafe(i, k1Values[i].getBytes()); + + ipv6Vector1.setIndexDefined(i); + ipv6Vector1.setSafe(i, k2Values[i].getBytes()); + } - vector.setValueCount(13); + ipv6Vector.setValueCount(13); + ipv6Vector1.setValueCount(13); arrowStreamWriter.writeBatch(); arrowStreamWriter.end(); @@ -1041,63 +1046,36 @@ public class TestRowBatch { String schemaStr = "{\"properties\":[" - + "{\"type\":\"IPV6\",\"name\":\"k1\",\"comment\":\"\"}" + + "{\"type\":\"IPV6\",\"name\":\"k1\",\"comment\":\"\"}," + + "{\"type\":\"IPV6\",\"name\":\"k2\",\"comment\":\"\"}" + "], \"status\":200}"; Schema schema = RestService.parseSchema(schemaStr, logger); RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow0 = rowBatch.next(); - assertEquals("::", actualRow0.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow1 = rowBatch.next(); - assertEquals("::1", actualRow1.get(0)); - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow2 = rowBatch.next(); - assertEquals("::ffff", actualRow2.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow3 = rowBatch.next(); - assertEquals("::0.1.0.0", actualRow3.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow4 = rowBatch.next(); - assertEquals("::255.255.255.255", actualRow4.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow5 = rowBatch.next(); - assertEquals("::1:0:0", actualRow5.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow6 = rowBatch.next(); - assertEquals("::1:ffff:ffff", actualRow6.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow7 = rowBatch.next(); - assertEquals("::fffe:ffff:ffff", actualRow7.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow8 = rowBatch.next(); - assertEquals("::ffff:0.0.0.0", actualRow8.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow9 = rowBatch.next(); - assertEquals("::ffff:255.255.255.255", actualRow9.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow10 = rowBatch.next(); - assertEquals("::1:0:0:0", actualRow10.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow11 = rowBatch.next(); - assertEquals("ffff::1:ffff:ffff:1", actualRow11.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow12 = rowBatch.next(); - assertEquals("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", actualRow12.get(0)); + String[][] expectedResults = { + {"::", "::"}, + {"::1", "::1"}, + {"::ffff", "::ffff"}, + {"::0.1.0.0", "::0.1.0.0"}, + {"::255.255.255.255", "::255.255.255.255"}, + {"::1:0:0", "::1:0:0"}, + {"::1:ffff:ffff", "::1:ffff:ffff"}, + {"::fffe:ffff:ffff", "::fffe:ffff:ffff"}, + {"::ffff:0.0.0.0", "::ffff:0.0.0.0"}, + {"::ffff:255.255.255.255", "::ffff:255.255.255.255"}, + {"::1:0:0:0", "::1:0:0:0"}, + {"ffff::1:ffff:ffff:1", "ffff::1:ffff:ffff:1"}, + {"ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"} + }; + + for (String[] expectedResult : expectedResults) { + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow = rowBatch.next(); + assertEquals(expectedResult[0], actualRow.get(0)); + assertEquals(expectedResult[1], actualRow.get(1)); + } Assert.assertFalse(rowBatch.hasNext()); thrown.expect(NoSuchElementException.class); @@ -1107,17 +1085,17 @@ public class TestRowBatch { @Test public void testIPV4() throws DorisException, IOException { - - ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); - childrenBuilder.add( - new Field("k1", FieldType.nullable(new ArrowType.Int(32, false)), null), - new Field("k2", FieldType.nullable(new ArrowType.Int(32, true)), null)); + ImmutableList<Field> fields = + ImmutableList.of( + new Field("k1", FieldType.nullable(new ArrowType.Int(32, false)), null), + new Field("k2", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("k3", FieldType.nullable(new ArrowType.Utf8()), null)); VectorSchemaRoot root = VectorSchemaRoot.create( - new org.apache.arrow.vector.types.pojo.Schema( - childrenBuilder.build(), null), + new org.apache.arrow.vector.types.pojo.Schema(fields, null), new RootAllocator(Integer.MAX_VALUE)); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter( @@ -1126,37 +1104,48 @@ public class TestRowBatch { arrowStreamWriter.start(); root.setRowCount(5); - FieldVector vector = root.getVector("k1"); - UInt4Vector uInt4Vector = (UInt4Vector) vector; + long[] ipValues = {0, 255, 65535, 16777215, 4294967295L}; + String[] ipStrings = { + "0.0.0.0", "0.0.0.255", "0.0.255.255", "0.255.255.255", "255.255.255.255" + }; + + UInt4Vector uInt4Vector = (UInt4Vector) root.getVector("k1"); uInt4Vector.setInitialCapacity(5); - uInt4Vector.allocateNew(4); - uInt4Vector.setIndexDefined(0); - uInt4Vector.setSafe(0, 0); - uInt4Vector.setIndexDefined(1); - uInt4Vector.setSafe(1, 255); - uInt4Vector.setIndexDefined(2); - uInt4Vector.setSafe(2, 65535); - uInt4Vector.setIndexDefined(3); - uInt4Vector.setSafe(3, 16777215); - uInt4Vector.setIndexDefined(4); - uInt4Vector.setWithPossibleTruncate(4, 4294967295L); + uInt4Vector.allocateNew(5); // Fixed from 4 to 5 to match actual row count - FieldVector vector1 = root.getVector("k2"); - IntVector intVector = (IntVector) vector1; + IntVector intVector = (IntVector) root.getVector("k2"); intVector.setInitialCapacity(5); - intVector.allocateNew(4); - intVector.setIndexDefined(0); - intVector.setSafe(0, 0); - intVector.setIndexDefined(1); - intVector.setSafe(1, 255); - intVector.setIndexDefined(2); - intVector.setSafe(2, 65535); - intVector.setIndexDefined(3); - intVector.setSafe(3, 16777215); - intVector.setIndexDefined(4); - intVector.setWithPossibleTruncate(4, 4294967295L); - vector.setValueCount(5); - vector1.setValueCount(5); + intVector.allocateNew(5); // Fixed from 4 to 5 to match actual row count + + VarCharVector varCharVector = (VarCharVector) root.getVector("k3"); + varCharVector.setInitialCapacity(5); + varCharVector.allocateNew(5); // Fixed from 4 to 5 to match actual row count + + for (int i = 0; i < 5; i++) { + uInt4Vector.setIndexDefined(i); + if (i < 4) { + uInt4Vector.setSafe(i, (int) ipValues[i]); + } else { + uInt4Vector.setWithPossibleTruncate( + i, ipValues[i]); // Large value that might be truncated + } + + intVector.setIndexDefined(i); + if (i < 4) { + intVector.setSafe(i, (int) ipValues[i]); + } else { + intVector.setWithPossibleTruncate( + i, ipValues[i]); // Large value that might be truncated + } + + varCharVector.setIndexDefined(i); + byte[] bytes = ipStrings[i].getBytes(StandardCharsets.UTF_8); + varCharVector.setSafe(i, bytes, 0, bytes.length); + } + + uInt4Vector.setValueCount(5); + intVector.setValueCount(5); + varCharVector.setValueCount(5); arrowStreamWriter.writeBatch(); arrowStreamWriter.end(); @@ -1172,30 +1161,25 @@ public class TestRowBatch { String schemaStr = "{\"properties\":[" + "{\"type\":\"IPV4\",\"name\":\"k1\",\"comment\":\"\"}," - + "{\"type\":\"IPV4\",\"name\":\"k2\",\"comment\":\"\"}" + + "{\"type\":\"IPV4\",\"name\":\"k2\",\"comment\":\"\"}," + + "{\"type\":\"IPV4\",\"name\":\"k3\",\"comment\":\"\"}" + "], \"status\":200}"; Schema schema = RestService.parseSchema(schemaStr, logger); RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow0 = rowBatch.next(); - assertEquals("0.0.0.0", actualRow0.get(0)); - assertEquals("0.0.0.0", actualRow0.get(1)); - List<Object> actualRow1 = rowBatch.next(); - assertEquals("0.0.0.255", actualRow1.get(0)); - assertEquals("0.0.0.255", actualRow1.get(1)); - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow2 = rowBatch.next(); - assertEquals("0.0.255.255", actualRow2.get(0)); - assertEquals("0.0.255.255", actualRow2.get(1)); - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow3 = rowBatch.next(); - assertEquals("0.255.255.255", actualRow3.get(0)); - assertEquals("0.255.255.255", actualRow3.get(1)); - List<Object> actualRow4 = rowBatch.next(); - assertEquals("255.255.255.255", actualRow4.get(0)); - assertEquals("255.255.255.255", actualRow4.get(1)); + + // Validate each row of data + for (int i = 0; i < 5; i++) { + Assert.assertTrue( + "Expected row " + i + " to exist, but it doesn't", rowBatch.hasNext()); + List<Object> actualRow = rowBatch.next(); + assertEquals(ipStrings[i], actualRow.get(0)); + assertEquals(ipStrings[i], actualRow.get(1)); + assertEquals(ipStrings[i], actualRow.get(2)); + } + + // Ensure no more rows exist Assert.assertFalse(rowBatch.hasNext()); thrown.expect(NoSuchElementException.class); thrown.expectMessage(startsWith("Get row offset:")); @@ -1309,7 +1293,7 @@ public class TestRowBatch { flag = rowBatch.doConvert(1, 1, Types.MinorType.VARCHAR, "INT", null); Assert.assertFalse(flag); - flag = rowBatch.doConvert(1, 1, Types.MinorType.VARCHAR, "IPV4", null); + flag = rowBatch.doConvert(1, 1, MinorType.BIGINT, "IPV4", null); Assert.assertFalse(flag); flag = rowBatch.doConvert(1, 1, Types.MinorType.INT, "BIGINT", null); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org