This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new c94c331  fix buffer memory grow bug (#203)
c94c331 is described below

commit c94c3311e5f0c4e4b80300e808c4a1209e24efba
Author: wudi <676366...@qq.com>
AuthorDate: Sun Oct 8 14:32:01 2023 +0800

    fix buffer memory grow bug (#203)
    
    Co-authored-by: wudi <>
---
 .../doris/flink/sink/batch/BatchRecordBuffer.java  | 20 ++++++++--------
 .../flink/sink/batch/TestBatchRecordBuffer.java    | 27 ++++++++++++++++++----
 2 files changed, 33 insertions(+), 14 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
index 99876bb..5fa601d 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.flink.sink.batch;
 
-import org.apache.doris.flink.sink.writer.RecordBuffer;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,22 +59,25 @@ public class BatchRecordBuffer {
 
     @VisibleForTesting
     public void ensureCapacity(int length) {
-        if(buffer.remaining() >= length){
+        int lineDelimiterSize = this.lineDelimiter.length;
+        if(buffer.remaining() - lineDelimiterSize >= length){
             return;
         }
         int currentRemain = buffer.remaining();
         int currentCapacity = buffer.capacity();
-
-        int target = buffer.remaining() + length;
-        int capacity = buffer.capacity();
-        //grow 512kb each time
-        target = Math.max(target, Math.min(capacity + 512 * 1024, capacity * 
2));
-        ByteBuffer tmp = ByteBuffer.allocate(target);
+        // add lineDelimiter length
+        int needed = length - buffer.remaining() + lineDelimiterSize;
+        // grow at least 1MB
+        long grow = Math.max(needed, 1024 * 1024);
+        // grow at least 50% of the current size
+        grow = Math.max(buffer.capacity() / 2, grow);
+        int newCapacity = (int) Math.min(Integer.MAX_VALUE, buffer.capacity() 
+ grow);
+        ByteBuffer tmp = ByteBuffer.allocate(newCapacity);
         buffer.flip();
         tmp.put(buffer);
         buffer.clear();
         buffer = tmp;
-        LOG.info("record length {},buffer remain {} ,grow capacity {} to {}", 
length, currentRemain, currentCapacity, target);
+        LOG.info("record length {},buffer remain {} ,grow capacity {} to {}", 
length, currentRemain, currentCapacity, newCapacity);
     }
 
     public String getLabelName() {
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java
index 18cb79a..1139358 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java
@@ -55,21 +55,38 @@ public class TestBatchRecordBuffer {
         BatchRecordBuffer recordBuffer = new 
BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),1);
         recordBuffer.ensureCapacity(10);
 
-        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 10 + 1);
+        //grow at least 1MB
+        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 1024 * 1024 + 
1);
 
         recordBuffer.ensureCapacity(100);
-        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 100 + 11);
+        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 1024 * 1024 + 
1);
 
         recordBuffer.ensureCapacity(1024);
-        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 1024 + 111);
+        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 1024 * 1024 + 
1);
 
+        //not need grow
         recordBuffer = new 
BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),16);
-        recordBuffer.ensureCapacity(16);
+        recordBuffer.ensureCapacity(15);
         Assert.assertEquals(recordBuffer.getBuffer().capacity(), 16);
 
         recordBuffer.insert("1234567890".getBytes(StandardCharsets.UTF_8));
         recordBuffer.ensureCapacity(8);
-        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 32);
+        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 1024 * 1024 + 
16);
+
+        recordBuffer = new 
BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),10);
+        recordBuffer.insert("1234567".getBytes(StandardCharsets.UTF_8));
+        
recordBuffer.insert("123456789012345678901234567890".getBytes(StandardCharsets.UTF_8));
+
+        //
+        recordBuffer = new 
BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),10);
+        recordBuffer.ensureCapacity(2 * 1024 * 1024);
+        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 2 * 1024 * 
1024 - 10 + 1 + 10);
+
+        //grow at least 50% of the current size
+        recordBuffer = new 
BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),5 * 1024 * 1024);
+        recordBuffer.insert(ByteBuffer.allocate(2 * 1024 * 1024).array());
+        recordBuffer.insert(ByteBuffer.allocate(3 * 1024 * 1024 + 1).array());
+        Assert.assertEquals(recordBuffer.getBuffer().capacity(), 5 * 1024 * 
1024 + 5 * 1024 * 1024 / 2);
     }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to