This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-java.git


The following commit(s) were added to refs/heads/main by this push:
     new c2234549 GH-729: [JDBC] Fix BinaryConsumer consuming null value (#730)
c2234549 is described below

commit c2234549827488d32b6698f91e26053a293d808e
Author: wangyunlai <[email protected]>
AuthorDate: Mon Apr 28 14:23:46 2025 +0800

    GH-729: [JDBC] Fix BinaryConsumer consuming null value (#730)
    
    ## What's Changed
    
    Set `startOffset` of the next item when `BinaryConsumer` consuming
    `null` value.
    
    Closes #729 .
---
 .../adapter/jdbc/consumer/BinaryConsumer.java      | 25 +++++++++++-----------
 .../adapter/jdbc/consumer/BinaryConsumerTest.java  | 11 +++++++++-
 2 files changed, 23 insertions(+), 13 deletions(-)

diff --git 
a/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java
 
b/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java
index edbc6360..73ec04b8 100644
--- 
a/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java
+++ 
b/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java
@@ -51,13 +51,15 @@ public abstract class BinaryConsumer extends 
BaseConsumer<VarBinaryVector> {
 
   /** consume a InputStream. */
   public void consume(InputStream is) throws IOException {
+    while (currentIndex >= vector.getValueCapacity()) {
+      vector.reallocValidityAndOffsetBuffers();
+    }
+
+    final int startOffset = vector.getStartOffset(currentIndex);
+    final ArrowBuf offsetBuffer = vector.getOffsetBuffer();
+    int dataLength = 0;
+
     if (is != null) {
-      while (currentIndex >= vector.getValueCapacity()) {
-        vector.reallocValidityAndOffsetBuffers();
-      }
-      final int startOffset = vector.getStartOffset(currentIndex);
-      final ArrowBuf offsetBuffer = vector.getOffsetBuffer();
-      int dataLength = 0;
       int read;
       while ((read = is.read(reuseBytes)) != -1) {
         while (vector.getDataBuffer().capacity() < (startOffset + dataLength + 
read)) {
@@ -66,11 +68,12 @@ public abstract class BinaryConsumer extends 
BaseConsumer<VarBinaryVector> {
         vector.getDataBuffer().setBytes(startOffset + dataLength, reuseBytes, 
0, read);
         dataLength += read;
       }
-      offsetBuffer.setInt(
-          (currentIndex + 1) * ((long) VarBinaryVector.OFFSET_WIDTH), 
startOffset + dataLength);
+
       BitVectorHelper.setBit(vector.getValidityBuffer(), currentIndex);
-      vector.setLastSet(currentIndex);
     }
+    offsetBuffer.setInt(
+        (currentIndex + 1) * ((long) VarBinaryVector.OFFSET_WIDTH), 
startOffset + dataLength);
+    vector.setLastSet(currentIndex);
   }
 
   public void moveWriterPosition() {
@@ -95,9 +98,7 @@ public abstract class BinaryConsumer extends 
BaseConsumer<VarBinaryVector> {
     @Override
     public void consume(ResultSet resultSet) throws SQLException, IOException {
       InputStream is = resultSet.getBinaryStream(columnIndexInResultSet);
-      if (!resultSet.wasNull()) {
-        consume(is);
-      }
+      consume(is);
       moveWriterPosition();
     }
   }
diff --git 
a/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java
 
b/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java
index b1e25379..bb836578 100644
--- 
a/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java
+++ 
b/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java
@@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import org.apache.arrow.vector.BaseValueVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.junit.jupiter.api.Test;
@@ -65,7 +66,11 @@ public class BinaryConsumerTest extends AbstractConsumerTest 
{
         nullable,
         binaryConsumer -> {
           for (byte[] value : values) {
-            binaryConsumer.consume(new ByteArrayInputStream(value));
+            if (value != null) {
+              binaryConsumer.consume(new ByteArrayInputStream(value));
+            } else {
+              binaryConsumer.consume((InputStream) null);
+            }
             binaryConsumer.moveWriterPosition();
           }
         },
@@ -119,5 +124,9 @@ public class BinaryConsumerTest extends 
AbstractConsumerTest {
       testRecords[i] = createBytes(DEFAULT_RECORD_BYTE_COUNT);
     }
     testConsumeInputStream(testRecords, false);
+
+    byte[] bytes1 = new byte[] {1, 2, 3};
+    byte[] bytes2 = new byte[] {4, 5, 6};
+    testConsumeInputStream(new byte[][] {bytes1, null, bytes2}, true);
   }
 }

Reply via email to