This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 3e6e0ab5 [Fix](batch-stream-load) handle types that do not have line
delimiters (#619)
3e6e0ab5 is described below
commit 3e6e0ab5ff438b6ca3e3c6693008c37d1fd07990
Author: Alex Riedler <[email protected]>
AuthorDate: Mon Nov 17 22:42:59 2025 -0500
[Fix](batch-stream-load) handle types that do not have line delimiters
(#619)
---
.../flink/sink/batch/DorisBatchStreamLoad.java | 2 +-
.../flink/sink/batch/TestDorisBatchStreamLoad.java | 51 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 136c407e..a7d7c8c6 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -380,7 +380,7 @@ public class DorisBatchStreamLoad implements Serializable {
if (buffer.getBuffer().isEmpty()) {
return false;
}
- if (!mergeBuffer.getBuffer().isEmpty()) {
+ if (!mergeBuffer.getBuffer().isEmpty() &&
mergeBuffer.getLineDelimiter() != null) {
mergeBuffer.getBuffer().add(mergeBuffer.getLineDelimiter());
mergeBuffer.setBufferSizeBytes(
mergeBuffer.getBufferSizeBytes() +
mergeBuffer.getLineDelimiter().length);
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
index 784334aa..a7998397 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
@@ -26,6 +26,7 @@ import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.HttpTestUtil;
import org.apache.doris.flink.sink.TestUtil;
import org.apache.doris.flink.sink.writer.LabelGenerator;
+import org.apache.doris.flink.sink.writer.LoadConstants;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
@@ -45,6 +46,7 @@ import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import static
org.apache.doris.flink.sink.batch.TestBatchBufferStream.mergeByteArrays;
import static org.mockito.ArgumentMatchers.any;
@@ -226,4 +228,53 @@ public class TestDorisBatchStreamLoad {
flag = loader.mergeBuffer(bufferList, buffer);
Assert.assertEquals(false, flag);
}
+
+ @Test
+ public void mergeBufferNullDelimiterTest() {
+ DorisReadOptions readOptions = DorisReadOptions.builder().build();
+ Properties streamProperties = new Properties();
+ streamProperties.setProperty(
+ LoadConstants.FORMAT_KEY, LoadConstants.ARROW); // this makes
lineDelimiter null
+ DorisExecutionOptions executionOptions =
+
DorisExecutionOptions.builder().setStreamLoadProp(streamProperties).build();
+ DorisOptions options =
+ DorisOptions.builder()
+ .setFenodes("127.0.0.1:8030")
+ .setBenodes("127.0.0.1:9030")
+ .setTableIdentifier("db.tbl")
+ .build();
+
+ DorisBatchStreamLoad loader =
+ new DorisBatchStreamLoad(
+ options, readOptions, executionOptions, new
LabelGenerator("xx", false), 0);
+
+ List<BatchRecordBuffer> bufferList = new ArrayList<>();
+ BatchRecordBuffer recordBuffer = new BatchRecordBuffer("db", "tbl",
null, 0);
+ recordBuffer.insert("111".getBytes(StandardCharsets.UTF_8));
+ recordBuffer.setLabelName("label2");
+ BatchRecordBuffer buffer = new BatchRecordBuffer("db", "tbl", null, 0);
+ buffer.insert("222".getBytes(StandardCharsets.UTF_8));
+ buffer.setLabelName("label1");
+
+ boolean flag = loader.mergeBuffer(bufferList, buffer);
+ Assert.assertEquals(false, flag);
+
+ bufferList.add(buffer);
+ bufferList.add(recordBuffer);
+ flag = loader.mergeBuffer(bufferList, buffer);
+ Assert.assertEquals(true, flag);
+ byte[] bytes = mergeByteArrays(buffer.getBuffer());
+ Assert.assertArrayEquals(bytes,
"222111".getBytes(StandardCharsets.UTF_8));
+
+ // multi table
+ bufferList.clear();
+ bufferList.add(buffer);
+ BatchRecordBuffer recordBuffer2 =
+ new BatchRecordBuffer("db", "tbl2",
"\n".getBytes(StandardCharsets.UTF_8), 0);
+ recordBuffer2.insert("333".getBytes(StandardCharsets.UTF_8));
+ recordBuffer2.setLabelName("label3");
+ bufferList.add(recordBuffer2);
+ flag = loader.mergeBuffer(bufferList, buffer);
+ Assert.assertEquals(false, flag);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]