This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3294588  [connector](read) compatible with ip data type (#289)
3294588 is described below

commit 3294588498a54d08ca9ed18f419ad1eeb2b239e5
Author: Petrichor <1401597...@qq.com>
AuthorDate: Fri Mar 21 13:58:15 2025 +0800

    [connector](read) compatible with ip data type (#289)
---
 .../apache/doris/spark/client/read/RowBatch.java   |  31 ++-
 .../doris/spark/client/read/RowBatchTest.java      | 295 ++++++++++-----------
 2 files changed, 158 insertions(+), 168 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
index 86ad0cc..bf576f1 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
@@ -282,18 +282,19 @@ public class RowBatch implements Serializable {
                         }
                         break;
                     case "IPV4":
-                        Preconditions.checkArgument(mt.equals(MinorType.UINT4) 
|| mt.equals(MinorType.INT),
+                        Preconditions.checkArgument(mt.equals(MinorType.UINT4) 
|| mt.equals(MinorType.INT) || mt.equals(MinorType.VARCHAR),
                                 typeMismatchMessage(colName, currentType, mt));
-                        BaseIntVector ipv4Vector;
-                        if (mt.equals(MinorType.INT)) {
-                            ipv4Vector = (IntVector) curFieldVector;
+
+                        if (mt.equals(MinorType.VARCHAR)) {
+                            VarCharVector vector = (VarCharVector) 
curFieldVector;
+                            for (int i = 0; i < rowCountInOneBatch; i++) {
+                                addValueToRow(i, vector.isNull(i) ? null : new 
String(vector.get(i)));
+                            }
                         } else {
-                            ipv4Vector = (UInt4Vector) curFieldVector;
-                        }
-                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
-                            Object fieldValue = ipv4Vector.isNull(rowIndex) ? 
null :
-                                    
IPUtils.convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex));
-                            addValueToRow(rowIndex, fieldValue);
+                            BaseIntVector vector = (mt.equals(MinorType.INT)) 
? (IntVector) curFieldVector : (UInt4Vector) curFieldVector;
+                            for (int i = 0; i < rowCountInOneBatch; i++) {
+                                addValueToRow(i, vector.isNull(i) ? null : 
IPUtils.convertLongToIPv4Address(vector.getValueAsLong(i)));
+                            }
                         }
                         break;
                     case "FLOAT":
@@ -464,9 +465,15 @@ public class RowBatch implements Serializable {
                                 addValueToRow(rowIndex, null);
                                 break;
                             }
+                            // Compatible with IPv6  in Doris 2.1.3 and above.
                             String ipv6Str = new 
String(ipv6VarcharVector.get(rowIndex));
-                            String ipv6Address = IPUtils.fromBigInteger(new 
BigInteger(ipv6Str));
-                            addValueToRow(rowIndex, ipv6Address);
+                            if (ipv6Str.contains(":")){
+                                addValueToRow(rowIndex, ipv6Str);
+                            }else {
+                                String ipv6Address = 
IPUtils.fromBigInteger(new BigInteger(ipv6Str));
+                                addValueToRow(rowIndex, ipv6Address);
+                            }
+
                         }
                         break;
                     case "ARRAY":
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
 
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
index acc7712..3b99c18 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
@@ -924,16 +924,17 @@ public class RowBatchTest {
     @Test
     public void testIPv4() throws DorisException, IOException {
 
-        ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
-        childrenBuilder.add(
-                new Field("k1", FieldType.nullable(new ArrowType.Int(32, 
false)), null),
-                new Field("k2", FieldType.nullable(new ArrowType.Int(32, 
true)), null));
+        ImmutableList<Field> fields =
+                ImmutableList.of(
+                        new Field("k1", FieldType.nullable(new 
ArrowType.Int(32, false)), null),
+                        new Field("k2", FieldType.nullable(new 
ArrowType.Int(32, true)), null),
+                        new Field("k3", FieldType.nullable(new 
ArrowType.Utf8()), null));
 
         VectorSchemaRoot root =
                 VectorSchemaRoot.create(
-                        new org.apache.arrow.vector.types.pojo.Schema(
-                                childrenBuilder.build(), null),
+                        new org.apache.arrow.vector.types.pojo.Schema(fields, 
null),
                         new RootAllocator(Integer.MAX_VALUE));
+
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
         ArrowStreamWriter arrowStreamWriter =
                 new ArrowStreamWriter(
@@ -942,38 +943,48 @@ public class RowBatchTest {
         arrowStreamWriter.start();
         root.setRowCount(5);
 
-        FieldVector vector = root.getVector("k1");
-        UInt4Vector uInt4Vector = (UInt4Vector) vector;
+        long[] ipValues = {0, 255, 65535, 16777215, 4294967295L};
+        String[] ipStrings = {
+                "0.0.0.0", "0.0.0.255", "0.0.255.255", "0.255.255.255", 
"255.255.255.255"
+        };
+
+        UInt4Vector uInt4Vector = (UInt4Vector) root.getVector("k1");
         uInt4Vector.setInitialCapacity(5);
-        uInt4Vector.allocateNew();
-        uInt4Vector.setIndexDefined(0);
-        uInt4Vector.setSafe(0, 0);
-        uInt4Vector.setIndexDefined(1);
-        uInt4Vector.setSafe(1, 255);
-        uInt4Vector.setIndexDefined(2);
-        uInt4Vector.setSafe(2, 65535);
-        uInt4Vector.setIndexDefined(3);
-        uInt4Vector.setSafe(3, 16777215);
-        uInt4Vector.setIndexDefined(4);
-        uInt4Vector.setWithPossibleTruncate(4, 4294967295L);
-
-        FieldVector vector1 = root.getVector("k2");
-        IntVector int4Vector = (IntVector) vector1;
-        int4Vector.setInitialCapacity(5);
-        int4Vector.allocateNew();
-        int4Vector.setIndexDefined(0);
-        int4Vector.setSafe(0, 0);
-        int4Vector.setIndexDefined(1);
-        int4Vector.setSafe(1, 255);
-        int4Vector.setIndexDefined(2);
-        int4Vector.setSafe(2, 65535);
-        int4Vector.setIndexDefined(3);
-        int4Vector.setSafe(3, 16777215);
-        int4Vector.setIndexDefined(4);
-        int4Vector.setWithPossibleTruncate(4, 4294967295L);
-
-        vector.setValueCount(5);
-        vector1.setValueCount(5);
+        uInt4Vector.allocateNew(5); // Fixed from 4 to 5 to match actual row 
count
+
+        IntVector intVector = (IntVector) root.getVector("k2");
+        intVector.setInitialCapacity(5);
+        intVector.allocateNew(5); // Fixed from 4 to 5 to match actual row 
count
+
+        VarCharVector varCharVector = (VarCharVector) root.getVector("k3");
+        varCharVector.setInitialCapacity(5);
+        varCharVector.allocateNew(5); // Fixed from 4 to 5 to match actual row 
count
+
+        for (int i = 0; i < 5; i++) {
+            uInt4Vector.setIndexDefined(i);
+            if (i < 4) {
+                uInt4Vector.setSafe(i, (int) ipValues[i]);
+            } else {
+                uInt4Vector.setWithPossibleTruncate(
+                        i, ipValues[i]); // Large value that might be truncated
+            }
+
+            intVector.setIndexDefined(i);
+            if (i < 4) {
+                intVector.setSafe(i, (int) ipValues[i]);
+            } else {
+                intVector.setWithPossibleTruncate(
+                        i, ipValues[i]); // Large value that might be truncated
+            }
+
+            varCharVector.setIndexDefined(i);
+            byte[] bytes = ipStrings[i].getBytes(StandardCharsets.UTF_8);
+            varCharVector.setSafe(i, bytes, 0, bytes.length);
+        }
+
+        uInt4Vector.setValueCount(5);
+        intVector.setValueCount(5);
+        varCharVector.setValueCount(5);
         arrowStreamWriter.writeBatch();
 
         arrowStreamWriter.end();
@@ -988,31 +999,26 @@ public class RowBatchTest {
 
         String schemaStr =
                 "{\"properties\":["
-                        + 
"{\"type\":\"IPV4\",\"name\":\"k1\",\"comment\":\"\"}, "
-                        + 
"{\"type\":\"IPV4\",\"name\":\"k2\",\"comment\":\"\"}"
+                        + 
"{\"type\":\"IPV4\",\"name\":\"k1\",\"comment\":\"\"},"
+                        + 
"{\"type\":\"IPV4\",\"name\":\"k2\",\"comment\":\"\"},"
+                        + 
"{\"type\":\"IPV4\",\"name\":\"k3\",\"comment\":\"\"}"
                         + "], \"status\":200}";
 
         Schema schema = RestService.parseSchema(schemaStr, logger);
 
         RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow0 = rowBatch.next();
-        assertEquals("0.0.0.0", actualRow0.get(0));
-        assertEquals("0.0.0.0", actualRow0.get(1));
-        List<Object> actualRow1 = rowBatch.next();
-        assertEquals("0.0.0.255", actualRow1.get(0));
-        assertEquals("0.0.0.255", actualRow1.get(1));
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow2 = rowBatch.next();
-        assertEquals("0.0.255.255", actualRow2.get(0));
-        assertEquals("0.0.255.255", actualRow2.get(1));
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow3 = rowBatch.next();
-        assertEquals("0.255.255.255", actualRow3.get(0));
-        assertEquals("0.255.255.255", actualRow3.get(1));
-        List<Object> actualRow4 = rowBatch.next();
-        assertEquals("255.255.255.255", actualRow4.get(0));
-        assertEquals("255.255.255.255", actualRow4.get(1));
+
+        // Validate each row of data
+        for (int i = 0; i < 5; i++) {
+            Assert.assertTrue(
+                    "Expected row " + i + " to exist, but it doesn't", 
rowBatch.hasNext());
+            List<Object> actualRow = rowBatch.next();
+            assertEquals(ipStrings[i], actualRow.get(0));
+            assertEquals(ipStrings[i], actualRow.get(1));
+            assertEquals(ipStrings[i], actualRow.get(2));
+        }
+
+        // Ensure no more rows exist
         Assert.assertFalse(rowBatch.hasNext());
         thrown.expect(NoSuchElementException.class);
         thrown.expectMessage(startsWith("Get row offset:"));
@@ -1021,15 +1027,16 @@ public class RowBatchTest {
 
     @Test
     public void testIPv6() throws DorisException, IOException {
-
-        ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
-        childrenBuilder.add(new Field("k1", FieldType.nullable(new 
ArrowType.Utf8()), null));
+        ImmutableList<Field> childrenFields =
+                ImmutableList.of(
+                        new Field("k1", FieldType.nullable(new 
ArrowType.Utf8()), null),
+                        new Field("k2", FieldType.nullable(new 
ArrowType.Utf8()), null));
 
         VectorSchemaRoot root =
                 VectorSchemaRoot.create(
-                        new org.apache.arrow.vector.types.pojo.Schema(
-                                childrenBuilder.build(), null),
+                        new 
org.apache.arrow.vector.types.pojo.Schema(childrenFields, null),
                         new RootAllocator(Integer.MAX_VALUE));
+
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
         ArrowStreamWriter arrowStreamWriter =
                 new ArrowStreamWriter(
@@ -1038,52 +1045,55 @@ public class RowBatchTest {
         arrowStreamWriter.start();
         root.setRowCount(13);
 
-        FieldVector vector = root.getVector("k1");
-        VarCharVector ipv6Vector = (VarCharVector) vector;
+        VarCharVector ipv6Vector = (VarCharVector) root.getVector("k1");
+        VarCharVector ipv6Vector1 = (VarCharVector) root.getVector("k2");
         ipv6Vector.setInitialCapacity(13);
         ipv6Vector.allocateNew();
-        ipv6Vector.setIndexDefined(0);
-        ipv6Vector.setValueLengthSafe(0, 1);
-        ipv6Vector.setSafe(0, "0".getBytes());
-
-        ipv6Vector.setIndexDefined(1);
-        ipv6Vector.setValueLengthSafe(0, 1);
-        ipv6Vector.setSafe(1, "1".getBytes());
-
-        ipv6Vector.setIndexDefined(2);
-        ipv6Vector.setSafe(2, "65535".getBytes());
-
-        ipv6Vector.setIndexDefined(3);
-        ipv6Vector.setSafe(3, "65536".getBytes());
-
-        ipv6Vector.setIndexDefined(4);
-        ipv6Vector.setSafe(4, "4294967295".getBytes());
-
-        ipv6Vector.setIndexDefined(5);
-        ipv6Vector.setSafe(5, "4294967296".getBytes());
-
-        ipv6Vector.setIndexDefined(6);
-        ipv6Vector.setSafe(6, "8589934591".getBytes());
-
-        ipv6Vector.setIndexDefined(7);
-        ipv6Vector.setSafe(7, "281470681743359".getBytes());
-
-        ipv6Vector.setIndexDefined(8);
-        ipv6Vector.setSafe(8, "281470681743360".getBytes());
-
-        ipv6Vector.setIndexDefined(9);
-        ipv6Vector.setSafe(9, "281474976710655".getBytes());
-
-        ipv6Vector.setIndexDefined(10);
-        ipv6Vector.setSafe(10, "281474976710656".getBytes());
-
-        ipv6Vector.setIndexDefined(11);
-        ipv6Vector.setSafe(11, 
"340277174624079928635746639885392347137".getBytes());
-
-        ipv6Vector.setIndexDefined(12);
-        ipv6Vector.setSafe(12, 
"340282366920938463463374607431768211455".getBytes());
+        ipv6Vector1.setInitialCapacity(13);
+        ipv6Vector1.allocateNew(13);
+
+        String[] k1Values = {
+                "0",
+                "1",
+                "65535",
+                "65536",
+                "4294967295",
+                "4294967296",
+                "8589934591",
+                "281470681743359",
+                "281470681743360",
+                "281474976710655",
+                "281474976710656",
+                "340277174624079928635746639885392347137",
+                "340282366920938463463374607431768211455"
+        };
+
+        String[] k2Values = {
+                "::",
+                "::1",
+                "::ffff",
+                "::0.1.0.0",
+                "::255.255.255.255",
+                "::1:0:0",
+                "::1:ffff:ffff",
+                "::fffe:ffff:ffff",
+                "::ffff:0.0.0.0",
+                "::ffff:255.255.255.255",
+                "::1:0:0:0",
+                "ffff::1:ffff:ffff:1",
+                "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
+        };
+
+        for (int i = 0; i < 13; i++) {
+            ipv6Vector.setIndexDefined(i);
+            ipv6Vector.setSafe(i, k1Values[i].getBytes());
+
+            ipv6Vector1.setIndexDefined(i);
+            ipv6Vector1.setSafe(i, k2Values[i].getBytes());
+        }
 
-        vector.setValueCount(13);
+        ipv6Vector.setValueCount(13);
+        ipv6Vector1.setValueCount(13);
         arrowStreamWriter.writeBatch();
 
         arrowStreamWriter.end();
@@ -1098,63 +1108,36 @@ public class RowBatchTest {
 
         String schemaStr =
                 "{\"properties\":["
-                        + 
"{\"type\":\"IPV6\",\"name\":\"k1\",\"comment\":\"\"}"
+                        + 
"{\"type\":\"IPV6\",\"name\":\"k1\",\"comment\":\"\"},"
+                        + 
"{\"type\":\"IPV6\",\"name\":\"k2\",\"comment\":\"\"}"
                         + "], \"status\":200}";
 
         Schema schema = RestService.parseSchema(schemaStr, logger);
 
         RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow0 = rowBatch.next();
-        assertEquals("::", actualRow0.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow1 = rowBatch.next();
-        assertEquals("::1", actualRow1.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow2 = rowBatch.next();
-        assertEquals("::ffff", actualRow2.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow3 = rowBatch.next();
-        assertEquals("::0.1.0.0", actualRow3.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow4 = rowBatch.next();
-        assertEquals("::255.255.255.255", actualRow4.get(0));
 
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow5 = rowBatch.next();
-        assertEquals("::1:0:0", actualRow5.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow6 = rowBatch.next();
-        assertEquals("::1:ffff:ffff", actualRow6.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow7 = rowBatch.next();
-        assertEquals("::fffe:ffff:ffff", actualRow7.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow8 = rowBatch.next();
-        assertEquals("::ffff:0.0.0.0", actualRow8.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow9 = rowBatch.next();
-        assertEquals("::ffff:255.255.255.255", actualRow9.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow10 = rowBatch.next();
-        assertEquals("::1:0:0:0", actualRow10.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow11 = rowBatch.next();
-        assertEquals("ffff::1:ffff:ffff:1", actualRow11.get(0));
-
-        Assert.assertTrue(rowBatch.hasNext());
-        List<Object> actualRow12 = rowBatch.next();
-        assertEquals("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", 
actualRow12.get(0));
+        String[][] expectedResults = {
+                {"::", "::"},
+                {"::1", "::1"},
+                {"::ffff", "::ffff"},
+                {"::0.1.0.0", "::0.1.0.0"},
+                {"::255.255.255.255", "::255.255.255.255"},
+                {"::1:0:0", "::1:0:0"},
+                {"::1:ffff:ffff", "::1:ffff:ffff"},
+                {"::fffe:ffff:ffff", "::fffe:ffff:ffff"},
+                {"::ffff:0.0.0.0", "::ffff:0.0.0.0"},
+                {"::ffff:255.255.255.255", "::ffff:255.255.255.255"},
+                {"::1:0:0:0", "::1:0:0:0"},
+                {"ffff::1:ffff:ffff:1", "ffff::1:ffff:ffff:1"},
+                {"ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", 
"ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"}
+        };
+
+        for (String[] expectedResult : expectedResults) {
+            Assert.assertTrue(rowBatch.hasNext());
+            List<Object> actualRow = rowBatch.next();
+            assertEquals(expectedResult[0], actualRow.get(0));
+            assertEquals(expectedResult[1], actualRow.get(1));
+        }
 
         Assert.assertFalse(rowBatch.hasNext());
         thrown.expect(NoSuchElementException.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to