This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 118bea95 [Improve] add buffer-map read-write-lock in batch mode (#555) 118bea95 is described below commit 118bea95710d11ef00975ffaeb32eeb9e1d3d683 Author: nativeCat <48559636+native...@users.noreply.github.com> AuthorDate: Tue Feb 18 15:42:40 2025 +0800 [Improve] add buffer-map read-write-lock in batch mode (#555) dorisBatchStreamLoad.bufferMap get/remove operation add read-write lock. --- .../flink/sink/batch/DorisBatchStreamLoad.java | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 479fab64..267a121c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -63,7 +63,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT; import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; @@ -111,6 +113,7 @@ public class DorisBatchStreamLoad implements Serializable { private final AtomicLong currentCacheBytes = new AtomicLong(0L); private final Lock lock = new ReentrantLock(); private final Condition block = lock.newCondition(); + private final Map<String, ReadWriteLock> bufferMapLock = new ConcurrentHashMap<>(); public DorisBatchStreamLoad( DorisOptions dorisOptions, @@ -181,7 +184,7 @@ public class DorisBatchStreamLoad implements Serializable { public void writeRecord(String database, String table, byte[] record) { checkFlushException(); String bufferKey = getTableIdentifier(database, table); - + getLock(bufferKey).readLock().lock(); BatchRecordBuffer buffer = bufferMap.computeIfAbsent( bufferKey, @@ -194,6 +197,7 @@ public class DorisBatchStreamLoad implements Serializable { int bytes = buffer.insert(record); currentCacheBytes.addAndGet(bytes); + getLock(bufferKey).readLock().unlock(); if (currentCacheBytes.get() > maxBlockedBytes) { lock.lock(); try { @@ -283,11 +287,19 @@ public class DorisBatchStreamLoad implements Serializable { } private synchronized void flushBuffer(String bufferKey) { - BatchRecordBuffer buffer = bufferMap.get(bufferKey); + BatchRecordBuffer buffer; + try { + getLock(bufferKey).writeLock().lock(); + buffer = bufferMap.remove(bufferKey); + } finally { + getLock(bufferKey).writeLock().unlock(); + } + if (buffer == null) { + return; + } buffer.setLabelName(labelGenerator.generateBatchLabel(buffer.getTable())); LOG.debug("flush buffer for key {} with label {}", bufferKey, buffer.getLabelName()); putRecordToFlushQueue(buffer); - bufferMap.remove(bufferKey); } private void putRecordToFlushQueue(BatchRecordBuffer buffer) { @@ -374,6 +386,10 @@ public class DorisBatchStreamLoad implements Serializable { return true; } + private ReadWriteLock getLock(String bufferKey) { + return bufferMapLock.computeIfAbsent(bufferKey, k -> new ReentrantReadWriteLock()); + } + class LoadAsyncExecutor implements Runnable { private int flushQueueSize; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org