This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 118bea95 [Improve] add buffer-map read-write-lock in batch mode (#555)
118bea95 is described below

commit 118bea95710d11ef00975ffaeb32eeb9e1d3d683
Author: nativeCat <48559636+native...@users.noreply.github.com>
AuthorDate: Tue Feb 18 15:42:40 2025 +0800

    [Improve] add buffer-map read-write-lock in batch mode (#555)
    
    dorisBatchStreamLoad.bufferMap get/remove operation add read-write lock.
---
 .../flink/sink/batch/DorisBatchStreamLoad.java     | 22 +++++++++++++++++++---
 1 file changed, 19 insertions(+), 3 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 479fab64..267a121c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -63,7 +63,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
 import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
@@ -111,6 +113,7 @@ public class DorisBatchStreamLoad implements Serializable {
     private final AtomicLong currentCacheBytes = new AtomicLong(0L);
     private final Lock lock = new ReentrantLock();
     private final Condition block = lock.newCondition();
+    private final Map<String, ReadWriteLock> bufferMapLock = new 
ConcurrentHashMap<>();
 
     public DorisBatchStreamLoad(
             DorisOptions dorisOptions,
@@ -181,7 +184,7 @@ public class DorisBatchStreamLoad implements Serializable {
     public void writeRecord(String database, String table, byte[] record) {
         checkFlushException();
         String bufferKey = getTableIdentifier(database, table);
-
+        getLock(bufferKey).readLock().lock();
         BatchRecordBuffer buffer =
                 bufferMap.computeIfAbsent(
                         bufferKey,
@@ -194,6 +197,7 @@ public class DorisBatchStreamLoad implements Serializable {
 
         int bytes = buffer.insert(record);
         currentCacheBytes.addAndGet(bytes);
+        getLock(bufferKey).readLock().unlock();
         if (currentCacheBytes.get() > maxBlockedBytes) {
             lock.lock();
             try {
@@ -283,11 +287,19 @@ public class DorisBatchStreamLoad implements Serializable 
{
     }
 
     private synchronized void flushBuffer(String bufferKey) {
-        BatchRecordBuffer buffer = bufferMap.get(bufferKey);
+        BatchRecordBuffer buffer;
+        try {
+            getLock(bufferKey).writeLock().lock();
+            buffer = bufferMap.remove(bufferKey);
+        } finally {
+            getLock(bufferKey).writeLock().unlock();
+        }
+        if (buffer == null) {
+            return;
+        }
         
buffer.setLabelName(labelGenerator.generateBatchLabel(buffer.getTable()));
         LOG.debug("flush buffer for key {} with label {}", bufferKey, 
buffer.getLabelName());
         putRecordToFlushQueue(buffer);
-        bufferMap.remove(bufferKey);
     }
 
     private void putRecordToFlushQueue(BatchRecordBuffer buffer) {
@@ -374,6 +386,10 @@ public class DorisBatchStreamLoad implements Serializable {
         return true;
     }
 
+    private ReadWriteLock getLock(String bufferKey) {
+        return bufferMapLock.computeIfAbsent(bufferKey, k -> new 
ReentrantReadWriteLock());
+    }
+
     class LoadAsyncExecutor implements Runnable {
 
         private int flushQueueSize;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to