This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 3294588 [connector](read) compatible with ip data type (#289) 3294588 is described below commit 3294588498a54d08ca9ed18f419ad1eeb2b239e5 Author: Petrichor <1401597...@qq.com> AuthorDate: Fri Mar 21 13:58:15 2025 +0800 [connector](read) compatible with ip data type (#289) --- .../apache/doris/spark/client/read/RowBatch.java | 31 ++- .../doris/spark/client/read/RowBatchTest.java | 295 ++++++++++----------- 2 files changed, 158 insertions(+), 168 deletions(-) diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java index 86ad0cc..bf576f1 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java @@ -282,18 +282,19 @@ public class RowBatch implements Serializable { } break; case "IPV4": - Preconditions.checkArgument(mt.equals(MinorType.UINT4) || mt.equals(MinorType.INT), + Preconditions.checkArgument(mt.equals(MinorType.UINT4) || mt.equals(MinorType.INT) || mt.equals(MinorType.VARCHAR), typeMismatchMessage(colName, currentType, mt)); - BaseIntVector ipv4Vector; - if (mt.equals(MinorType.INT)) { - ipv4Vector = (IntVector) curFieldVector; + + if (mt.equals(MinorType.VARCHAR)) { + VarCharVector vector = (VarCharVector) curFieldVector; + for (int i = 0; i < rowCountInOneBatch; i++) { + addValueToRow(i, vector.isNull(i) ? null : new String(vector.get(i))); + } } else { - ipv4Vector = (UInt4Vector) curFieldVector; - } - for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { - Object fieldValue = ipv4Vector.isNull(rowIndex) ? null : - IPUtils.convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex)); - addValueToRow(rowIndex, fieldValue); + BaseIntVector vector = (mt.equals(MinorType.INT)) ? (IntVector) curFieldVector : (UInt4Vector) curFieldVector; + for (int i = 0; i < rowCountInOneBatch; i++) { + addValueToRow(i, vector.isNull(i) ? null : IPUtils.convertLongToIPv4Address(vector.getValueAsLong(i))); + } } break; case "FLOAT": @@ -464,9 +465,15 @@ public class RowBatch implements Serializable { addValueToRow(rowIndex, null); break; } + // Compatible with IPv6 in Doris 2.1.3 and above. String ipv6Str = new String(ipv6VarcharVector.get(rowIndex)); - String ipv6Address = IPUtils.fromBigInteger(new BigInteger(ipv6Str)); - addValueToRow(rowIndex, ipv6Address); + if (ipv6Str.contains(":")){ + addValueToRow(rowIndex, ipv6Str); + }else { + String ipv6Address = IPUtils.fromBigInteger(new BigInteger(ipv6Str)); + addValueToRow(rowIndex, ipv6Address); + } + } break; case "ARRAY": diff --git a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java index acc7712..3b99c18 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java +++ b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java @@ -924,16 +924,17 @@ public class RowBatchTest { @Test public void testIPv4() throws DorisException, IOException { - ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); - childrenBuilder.add( - new Field("k1", FieldType.nullable(new ArrowType.Int(32, false)), null), - new Field("k2", FieldType.nullable(new ArrowType.Int(32, true)), null)); + ImmutableList<Field> fields = + ImmutableList.of( + new Field("k1", FieldType.nullable(new ArrowType.Int(32, false)), null), + new Field("k2", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("k3", FieldType.nullable(new ArrowType.Utf8()), null)); VectorSchemaRoot root = VectorSchemaRoot.create( - new org.apache.arrow.vector.types.pojo.Schema( - childrenBuilder.build(), null), + new org.apache.arrow.vector.types.pojo.Schema(fields, null), new RootAllocator(Integer.MAX_VALUE)); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter( @@ -942,38 +943,48 @@ public class RowBatchTest { arrowStreamWriter.start(); root.setRowCount(5); - FieldVector vector = root.getVector("k1"); - UInt4Vector uInt4Vector = (UInt4Vector) vector; + long[] ipValues = {0, 255, 65535, 16777215, 4294967295L}; + String[] ipStrings = { + "0.0.0.0", "0.0.0.255", "0.0.255.255", "0.255.255.255", "255.255.255.255" + }; + + UInt4Vector uInt4Vector = (UInt4Vector) root.getVector("k1"); uInt4Vector.setInitialCapacity(5); - uInt4Vector.allocateNew(); - uInt4Vector.setIndexDefined(0); - uInt4Vector.setSafe(0, 0); - uInt4Vector.setIndexDefined(1); - uInt4Vector.setSafe(1, 255); - uInt4Vector.setIndexDefined(2); - uInt4Vector.setSafe(2, 65535); - uInt4Vector.setIndexDefined(3); - uInt4Vector.setSafe(3, 16777215); - uInt4Vector.setIndexDefined(4); - uInt4Vector.setWithPossibleTruncate(4, 4294967295L); - - FieldVector vector1 = root.getVector("k2"); - IntVector int4Vector = (IntVector) vector1; - int4Vector.setInitialCapacity(5); - int4Vector.allocateNew(); - int4Vector.setIndexDefined(0); - int4Vector.setSafe(0, 0); - int4Vector.setIndexDefined(1); - int4Vector.setSafe(1, 255); - int4Vector.setIndexDefined(2); - int4Vector.setSafe(2, 65535); - int4Vector.setIndexDefined(3); - int4Vector.setSafe(3, 16777215); - int4Vector.setIndexDefined(4); - int4Vector.setWithPossibleTruncate(4, 4294967295L); - - vector.setValueCount(5); - vector1.setValueCount(5); + uInt4Vector.allocateNew(5); // Fixed from 4 to 5 to match actual row count + + IntVector intVector = (IntVector) root.getVector("k2"); + intVector.setInitialCapacity(5); + intVector.allocateNew(5); // Fixed from 4 to 5 to match actual row count + + VarCharVector varCharVector = (VarCharVector) root.getVector("k3"); + varCharVector.setInitialCapacity(5); + varCharVector.allocateNew(5); // Fixed from 4 to 5 to match actual row count + + for (int i = 0; i < 5; i++) { + uInt4Vector.setIndexDefined(i); + if (i < 4) { + uInt4Vector.setSafe(i, (int) ipValues[i]); + } else { + uInt4Vector.setWithPossibleTruncate( + i, ipValues[i]); // Large value that might be truncated + } + + intVector.setIndexDefined(i); + if (i < 4) { + intVector.setSafe(i, (int) ipValues[i]); + } else { + intVector.setWithPossibleTruncate( + i, ipValues[i]); // Large value that might be truncated + } + + varCharVector.setIndexDefined(i); + byte[] bytes = ipStrings[i].getBytes(StandardCharsets.UTF_8); + varCharVector.setSafe(i, bytes, 0, bytes.length); + } + + uInt4Vector.setValueCount(5); + intVector.setValueCount(5); + varCharVector.setValueCount(5); arrowStreamWriter.writeBatch(); arrowStreamWriter.end(); @@ -988,31 +999,26 @@ public class RowBatchTest { String schemaStr = "{\"properties\":[" - + "{\"type\":\"IPV4\",\"name\":\"k1\",\"comment\":\"\"}, " - + "{\"type\":\"IPV4\",\"name\":\"k2\",\"comment\":\"\"}" + + "{\"type\":\"IPV4\",\"name\":\"k1\",\"comment\":\"\"}," + + "{\"type\":\"IPV4\",\"name\":\"k2\",\"comment\":\"\"}," + + "{\"type\":\"IPV4\",\"name\":\"k3\",\"comment\":\"\"}" + "], \"status\":200}"; Schema schema = RestService.parseSchema(schemaStr, logger); RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false); - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow0 = rowBatch.next(); - assertEquals("0.0.0.0", actualRow0.get(0)); - assertEquals("0.0.0.0", actualRow0.get(1)); - List<Object> actualRow1 = rowBatch.next(); - assertEquals("0.0.0.255", actualRow1.get(0)); - assertEquals("0.0.0.255", actualRow1.get(1)); - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow2 = rowBatch.next(); - assertEquals("0.0.255.255", actualRow2.get(0)); - assertEquals("0.0.255.255", actualRow2.get(1)); - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow3 = rowBatch.next(); - assertEquals("0.255.255.255", actualRow3.get(0)); - assertEquals("0.255.255.255", actualRow3.get(1)); - List<Object> actualRow4 = rowBatch.next(); - assertEquals("255.255.255.255", actualRow4.get(0)); - assertEquals("255.255.255.255", actualRow4.get(1)); + + // Validate each row of data + for (int i = 0; i < 5; i++) { + Assert.assertTrue( + "Expected row " + i + " to exist, but it doesn't", rowBatch.hasNext()); + List<Object> actualRow = rowBatch.next(); + assertEquals(ipStrings[i], actualRow.get(0)); + assertEquals(ipStrings[i], actualRow.get(1)); + assertEquals(ipStrings[i], actualRow.get(2)); + } + + // Ensure no more rows exist Assert.assertFalse(rowBatch.hasNext()); thrown.expect(NoSuchElementException.class); thrown.expectMessage(startsWith("Get row offset:")); @@ -1021,15 +1027,16 @@ public class RowBatchTest { @Test public void testIPv6() throws DorisException, IOException { - - ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); - childrenBuilder.add(new Field("k1", FieldType.nullable(new ArrowType.Utf8()), null)); + ImmutableList<Field> childrenFields = + ImmutableList.of( + new Field("k1", FieldType.nullable(new ArrowType.Utf8()), null), + new Field("k2", FieldType.nullable(new ArrowType.Utf8()), null)); VectorSchemaRoot root = VectorSchemaRoot.create( - new org.apache.arrow.vector.types.pojo.Schema( - childrenBuilder.build(), null), + new org.apache.arrow.vector.types.pojo.Schema(childrenFields, null), new RootAllocator(Integer.MAX_VALUE)); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter( @@ -1038,52 +1045,55 @@ public class RowBatchTest { arrowStreamWriter.start(); root.setRowCount(13); - FieldVector vector = root.getVector("k1"); - VarCharVector ipv6Vector = (VarCharVector) vector; + VarCharVector ipv6Vector = (VarCharVector) root.getVector("k1"); + VarCharVector ipv6Vector1 = (VarCharVector) root.getVector("k2"); ipv6Vector.setInitialCapacity(13); ipv6Vector.allocateNew(); - ipv6Vector.setIndexDefined(0); - ipv6Vector.setValueLengthSafe(0, 1); - ipv6Vector.setSafe(0, "0".getBytes()); - - ipv6Vector.setIndexDefined(1); - ipv6Vector.setValueLengthSafe(0, 1); - ipv6Vector.setSafe(1, "1".getBytes()); - - ipv6Vector.setIndexDefined(2); - ipv6Vector.setSafe(2, "65535".getBytes()); - - ipv6Vector.setIndexDefined(3); - ipv6Vector.setSafe(3, "65536".getBytes()); - - ipv6Vector.setIndexDefined(4); - ipv6Vector.setSafe(4, "4294967295".getBytes()); - - ipv6Vector.setIndexDefined(5); - ipv6Vector.setSafe(5, "4294967296".getBytes()); - - ipv6Vector.setIndexDefined(6); - ipv6Vector.setSafe(6, "8589934591".getBytes()); - - ipv6Vector.setIndexDefined(7); - ipv6Vector.setSafe(7, "281470681743359".getBytes()); - - ipv6Vector.setIndexDefined(8); - ipv6Vector.setSafe(8, "281470681743360".getBytes()); - - ipv6Vector.setIndexDefined(9); - ipv6Vector.setSafe(9, "281474976710655".getBytes()); - - ipv6Vector.setIndexDefined(10); - ipv6Vector.setSafe(10, "281474976710656".getBytes()); - - ipv6Vector.setIndexDefined(11); - ipv6Vector.setSafe(11, "340277174624079928635746639885392347137".getBytes()); - - ipv6Vector.setIndexDefined(12); - ipv6Vector.setSafe(12, "340282366920938463463374607431768211455".getBytes()); + ipv6Vector1.setInitialCapacity(13); + ipv6Vector1.allocateNew(13); + + String[] k1Values = { + "0", + "1", + "65535", + "65536", + "4294967295", + "4294967296", + "8589934591", + "281470681743359", + "281470681743360", + "281474976710655", + "281474976710656", + "340277174624079928635746639885392347137", + "340282366920938463463374607431768211455" + }; + + String[] k2Values = { + "::", + "::1", + "::ffff", + "::0.1.0.0", + "::255.255.255.255", + "::1:0:0", + "::1:ffff:ffff", + "::fffe:ffff:ffff", + "::ffff:0.0.0.0", + "::ffff:255.255.255.255", + "::1:0:0:0", + "ffff::1:ffff:ffff:1", + "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff" + }; + + for (int i = 0; i < 13; i++) { + ipv6Vector.setIndexDefined(i); + ipv6Vector.setSafe(i, k1Values[i].getBytes()); + + ipv6Vector1.setIndexDefined(i); + ipv6Vector1.setSafe(i, k2Values[i].getBytes()); + } - vector.setValueCount(13); + ipv6Vector.setValueCount(13); + ipv6Vector1.setValueCount(13); arrowStreamWriter.writeBatch(); arrowStreamWriter.end(); @@ -1098,63 +1108,36 @@ public class RowBatchTest { String schemaStr = "{\"properties\":[" - + "{\"type\":\"IPV6\",\"name\":\"k1\",\"comment\":\"\"}" + + "{\"type\":\"IPV6\",\"name\":\"k1\",\"comment\":\"\"}," + + "{\"type\":\"IPV6\",\"name\":\"k2\",\"comment\":\"\"}" + "], \"status\":200}"; Schema schema = RestService.parseSchema(schemaStr, logger); RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false); - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow0 = rowBatch.next(); - assertEquals("::", actualRow0.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow1 = rowBatch.next(); - assertEquals("::1", actualRow1.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow2 = rowBatch.next(); - assertEquals("::ffff", actualRow2.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow3 = rowBatch.next(); - assertEquals("::0.1.0.0", actualRow3.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow4 = rowBatch.next(); - assertEquals("::255.255.255.255", actualRow4.get(0)); - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow5 = rowBatch.next(); - assertEquals("::1:0:0", actualRow5.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow6 = rowBatch.next(); - assertEquals("::1:ffff:ffff", actualRow6.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow7 = rowBatch.next(); - assertEquals("::fffe:ffff:ffff", actualRow7.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow8 = rowBatch.next(); - assertEquals("::ffff:0.0.0.0", actualRow8.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow9 = rowBatch.next(); - assertEquals("::ffff:255.255.255.255", actualRow9.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow10 = rowBatch.next(); - assertEquals("::1:0:0:0", actualRow10.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow11 = rowBatch.next(); - assertEquals("ffff::1:ffff:ffff:1", actualRow11.get(0)); - - Assert.assertTrue(rowBatch.hasNext()); - List<Object> actualRow12 = rowBatch.next(); - assertEquals("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", actualRow12.get(0)); + String[][] expectedResults = { + {"::", "::"}, + {"::1", "::1"}, + {"::ffff", "::ffff"}, + {"::0.1.0.0", "::0.1.0.0"}, + {"::255.255.255.255", "::255.255.255.255"}, + {"::1:0:0", "::1:0:0"}, + {"::1:ffff:ffff", "::1:ffff:ffff"}, + {"::fffe:ffff:ffff", "::fffe:ffff:ffff"}, + {"::ffff:0.0.0.0", "::ffff:0.0.0.0"}, + {"::ffff:255.255.255.255", "::ffff:255.255.255.255"}, + {"::1:0:0:0", "::1:0:0:0"}, + {"ffff::1:ffff:ffff:1", "ffff::1:ffff:ffff:1"}, + {"ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"} + }; + + for (String[] expectedResult : expectedResults) { + Assert.assertTrue(rowBatch.hasNext()); + List<Object> actualRow = rowBatch.next(); + assertEquals(expectedResult[0], actualRow.get(0)); + assertEquals(expectedResult[1], actualRow.get(1)); + } Assert.assertFalse(rowBatch.hasNext()); thrown.expect(NoSuchElementException.class); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org