This is an automated email from the ASF dual-hosted git repository. zykkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new c94c331 fix buffer memory grow bug (#203) c94c331 is described below commit c94c3311e5f0c4e4b80300e808c4a1209e24efba Author: wudi <676366...@qq.com> AuthorDate: Sun Oct 8 14:32:01 2023 +0800 fix buffer memory grow bug (#203) Co-authored-by: wudi <> --- .../doris/flink/sink/batch/BatchRecordBuffer.java | 20 ++++++++-------- .../flink/sink/batch/TestBatchRecordBuffer.java | 27 ++++++++++++++++++---- 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java index 99876bb..5fa601d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java @@ -17,7 +17,6 @@ package org.apache.doris.flink.sink.batch; -import org.apache.doris.flink.sink.writer.RecordBuffer; import org.apache.flink.annotation.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,22 +59,25 @@ public class BatchRecordBuffer { @VisibleForTesting public void ensureCapacity(int length) { - if(buffer.remaining() >= length){ + int lineDelimiterSize = this.lineDelimiter.length; + if(buffer.remaining() - lineDelimiterSize >= length){ return; } int currentRemain = buffer.remaining(); int currentCapacity = buffer.capacity(); - - int target = buffer.remaining() + length; - int capacity = buffer.capacity(); - //grow 512kb each time - target = Math.max(target, Math.min(capacity + 512 * 1024, capacity * 2)); - ByteBuffer tmp = ByteBuffer.allocate(target); + // add lineDelimiter length + int needed = length - buffer.remaining() + lineDelimiterSize; + // grow at least 1MB + long grow = Math.max(needed, 1024 * 1024); + // grow at least 50% of the current size + grow = Math.max(buffer.capacity() / 2, grow); + int newCapacity = (int) Math.min(Integer.MAX_VALUE, buffer.capacity() + grow); + ByteBuffer tmp = ByteBuffer.allocate(newCapacity); buffer.flip(); tmp.put(buffer); buffer.clear(); buffer = tmp; - LOG.info("record length {},buffer remain {} ,grow capacity {} to {}", length, currentRemain, currentCapacity, target); + LOG.info("record length {},buffer remain {} ,grow capacity {} to {}", length, currentRemain, currentCapacity, newCapacity); } public String getLabelName() { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java index 18cb79a..1139358 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java @@ -55,21 +55,38 @@ public class TestBatchRecordBuffer { BatchRecordBuffer recordBuffer = new BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),1); recordBuffer.ensureCapacity(10); - Assert.assertEquals(recordBuffer.getBuffer().capacity(), 10 + 1); + //grow at least 1MB + Assert.assertEquals(recordBuffer.getBuffer().capacity(), 1024 * 1024 + 1); recordBuffer.ensureCapacity(100); - Assert.assertEquals(recordBuffer.getBuffer().capacity(), 100 + 11); + Assert.assertEquals(recordBuffer.getBuffer().capacity(), 1024 * 1024 + 1); recordBuffer.ensureCapacity(1024); - Assert.assertEquals(recordBuffer.getBuffer().capacity(), 1024 + 111); + Assert.assertEquals(recordBuffer.getBuffer().capacity(), 1024 * 1024 + 1); + //not need grow recordBuffer = new BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),16); - recordBuffer.ensureCapacity(16); + recordBuffer.ensureCapacity(15); Assert.assertEquals(recordBuffer.getBuffer().capacity(), 16); recordBuffer.insert("1234567890".getBytes(StandardCharsets.UTF_8)); recordBuffer.ensureCapacity(8); - Assert.assertEquals(recordBuffer.getBuffer().capacity(), 32); + Assert.assertEquals(recordBuffer.getBuffer().capacity(), 1024 * 1024 + 16); + + recordBuffer = new BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),10); + recordBuffer.insert("1234567".getBytes(StandardCharsets.UTF_8)); + recordBuffer.insert("123456789012345678901234567890".getBytes(StandardCharsets.UTF_8)); + + // + recordBuffer = new BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),10); + recordBuffer.ensureCapacity(2 * 1024 * 1024); + Assert.assertEquals(recordBuffer.getBuffer().capacity(), 2 * 1024 * 1024 - 10 + 1 + 10); + + //grow at least 50% of the current size + recordBuffer = new BatchRecordBuffer("\n".getBytes(StandardCharsets.UTF_8),5 * 1024 * 1024); + recordBuffer.insert(ByteBuffer.allocate(2 * 1024 * 1024).array()); + recordBuffer.insert(ByteBuffer.allocate(3 * 1024 * 1024 + 1).array()); + Assert.assertEquals(recordBuffer.getBuffer().capacity(), 5 * 1024 * 1024 + 5 * 1024 * 1024 / 2); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org