This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e6e0ab5 [Fix](batch-stream-load) handle types that do not have line 
delimiters (#619)
3e6e0ab5 is described below

commit 3e6e0ab5ff438b6ca3e3c6693008c37d1fd07990
Author: Alex Riedler <[email protected]>
AuthorDate: Mon Nov 17 22:42:59 2025 -0500

    [Fix](batch-stream-load) handle types that do not have line delimiters 
(#619)
---
 .../flink/sink/batch/DorisBatchStreamLoad.java     |  2 +-
 .../flink/sink/batch/TestDorisBatchStreamLoad.java | 51 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 1 deletion(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 136c407e..a7d7c8c6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -380,7 +380,7 @@ public class DorisBatchStreamLoad implements Serializable {
         if (buffer.getBuffer().isEmpty()) {
             return false;
         }
-        if (!mergeBuffer.getBuffer().isEmpty()) {
+        if (!mergeBuffer.getBuffer().isEmpty() && 
mergeBuffer.getLineDelimiter() != null) {
             mergeBuffer.getBuffer().add(mergeBuffer.getLineDelimiter());
             mergeBuffer.setBufferSizeBytes(
                     mergeBuffer.getBufferSizeBytes() + 
mergeBuffer.getLineDelimiter().length);
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
index 784334aa..a7998397 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
@@ -26,6 +26,7 @@ import org.apache.doris.flink.sink.BackendUtil;
 import org.apache.doris.flink.sink.HttpTestUtil;
 import org.apache.doris.flink.sink.TestUtil;
 import org.apache.doris.flink.sink.writer.LabelGenerator;
+import org.apache.doris.flink.sink.writer.LoadConstants;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
@@ -45,6 +46,7 @@ import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 
 import static 
org.apache.doris.flink.sink.batch.TestBatchBufferStream.mergeByteArrays;
 import static org.mockito.ArgumentMatchers.any;
@@ -226,4 +228,53 @@ public class TestDorisBatchStreamLoad {
         flag = loader.mergeBuffer(bufferList, buffer);
         Assert.assertEquals(false, flag);
     }
+
+    @Test
+    public void mergeBufferNullDelimiterTest() {
+        DorisReadOptions readOptions = DorisReadOptions.builder().build();
+        Properties streamProperties = new Properties();
+        streamProperties.setProperty(
+                LoadConstants.FORMAT_KEY, LoadConstants.ARROW); // this makes 
lineDelimiter null
+        DorisExecutionOptions executionOptions =
+                
DorisExecutionOptions.builder().setStreamLoadProp(streamProperties).build();
+        DorisOptions options =
+                DorisOptions.builder()
+                        .setFenodes("127.0.0.1:8030")
+                        .setBenodes("127.0.0.1:9030")
+                        .setTableIdentifier("db.tbl")
+                        .build();
+
+        DorisBatchStreamLoad loader =
+                new DorisBatchStreamLoad(
+                        options, readOptions, executionOptions, new 
LabelGenerator("xx", false), 0);
+
+        List<BatchRecordBuffer> bufferList = new ArrayList<>();
+        BatchRecordBuffer recordBuffer = new BatchRecordBuffer("db", "tbl", 
null, 0);
+        recordBuffer.insert("111".getBytes(StandardCharsets.UTF_8));
+        recordBuffer.setLabelName("label2");
+        BatchRecordBuffer buffer = new BatchRecordBuffer("db", "tbl", null, 0);
+        buffer.insert("222".getBytes(StandardCharsets.UTF_8));
+        buffer.setLabelName("label1");
+
+        boolean flag = loader.mergeBuffer(bufferList, buffer);
+        Assert.assertEquals(false, flag);
+
+        bufferList.add(buffer);
+        bufferList.add(recordBuffer);
+        flag = loader.mergeBuffer(bufferList, buffer);
+        Assert.assertEquals(true, flag);
+        byte[] bytes = mergeByteArrays(buffer.getBuffer());
+        Assert.assertArrayEquals(bytes, 
"222111".getBytes(StandardCharsets.UTF_8));
+
+        // multi table
+        bufferList.clear();
+        bufferList.add(buffer);
+        BatchRecordBuffer recordBuffer2 =
+                new BatchRecordBuffer("db", "tbl2", 
"\n".getBytes(StandardCharsets.UTF_8), 0);
+        recordBuffer2.insert("333".getBytes(StandardCharsets.UTF_8));
+        recordBuffer2.setLabelName("label3");
+        bufferList.add(recordBuffer2);
+        flag = loader.mergeBuffer(bufferList, buffer);
+        Assert.assertEquals(false, flag);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to