This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 654ec15fff5 [Fix](ecosystem) Fix kettle plugin load field 
correspondence problem (#48412)
654ec15fff5 is described below

commit 654ec15fff5dc6d76679a7f1ed941896974f17dd
Author: wudi <w...@selectdb.com>
AuthorDate: Thu Feb 27 16:16:53 2025 +0800

    [Fix](ecosystem) Fix kettle plugin load field correspondence problem 
(#48412)
    
    ### What problem does this PR solve?
    
    Issue Number: close #48302
    
    1. Fix kettle plugin load field correspondence problem
    2. Optimize some write logic of streamload
---
 .../steps/dorisstreamloader/DorisStreamLoader.java | 28 ++++++++-----
 .../load/DorisBatchStreamLoad.java                 | 47 ++++++++++++++++------
 .../steps/dorisstreamloader/load/DorisOptions.java |  2 +-
 3 files changed, 53 insertions(+), 24 deletions(-)

diff --git 
a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoader.java
 
b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoader.java
index 91b546baf21..1303d628265 100644
--- 
a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoader.java
+++ 
b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoader.java
@@ -72,6 +72,7 @@ public class DorisStreamLoader extends BaseStep implements 
StepInterface {
         return false;
       }
       if ( first ) {
+        logDebug("First process row with meta : " + meta.toString());
         first = false;
         // Cache field indexes.
         data.keynrs = new int[meta.getFieldStream().length];
@@ -84,11 +85,15 @@ public class DorisStreamLoader extends BaseStep implements 
StepInterface {
           data.formatMeta[i] = sourceMeta.clone();
         }
 
+        // use field table name to serializer data
+        String[] fieldNames = new String[meta.getFieldTable().length];
+        System.arraycopy(meta.getFieldTable(), 0, fieldNames, 0, 
meta.getFieldTable().length);
+
         Properties loadProperties = options.getStreamLoadProp();
         //builder serializer
         data.serializer = DorisRecordSerializer.builder()
                 .setType(loadProperties.getProperty(FORMAT_KEY, CSV))
-                .setFieldNames(getInputRowMeta().getFieldNames())
+                .setFieldNames(fieldNames)
                 .setFormatMeta(data.formatMeta)
                 
.setFieldDelimiter(loadProperties.getProperty(FIELD_DELIMITER_KEY, 
FIELD_DELIMITER_DEFAULT))
                 .setLogChannelInterface(log)
@@ -136,17 +141,18 @@ public class DorisStreamLoader extends BaseStep 
implements StepInterface {
           }
         }
       }
+
       options = DorisOptions.builder()
-              .withFenodes(meta.getFenodes())
-              .withDatabase(meta.getDatabase())
-              .withTable(meta.getTable())
-              .withUsername(meta.getUsername())
-              .withPassword(meta.getPassword())
-              .withBufferFlushMaxBytes(meta.getBufferFlushMaxBytes())
-              .withBufferFlushMaxRows(meta.getBufferFlushMaxRows())
-              .withMaxRetries(meta.getMaxRetries())
-              .withStreamLoadProp(streamHeaders)
-              .withDeletable(meta.isDeletable()).build();
+          .withFenodes(meta.getFenodes())
+          .withDatabase(meta.getDatabase())
+          .withTable(meta.getTable())
+          .withUsername(meta.getUsername())
+          .withPassword(meta.getPassword())
+          .withBufferFlushMaxBytes(meta.getBufferFlushMaxBytes())
+          .withBufferFlushMaxRows(meta.getBufferFlushMaxRows())
+          .withMaxRetries(meta.getMaxRetries())
+          .withStreamLoadProp(streamHeaders)
+          .withDeletable(meta.isDeletable()).build();
 
       logDetailed("Initializing step with options: " + options.toString());
       streamLoad = new DorisBatchStreamLoad(options, log);
diff --git 
a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisBatchStreamLoad.java
 
b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisBatchStreamLoad.java
index 226f5f61e73..de5897e2cdd 100644
--- 
a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisBatchStreamLoad.java
+++ 
b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisBatchStreamLoad.java
@@ -49,7 +49,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 import static 
org.pentaho.di.trans.steps.dorisstreamloader.load.LoadConstants.ARROW;
@@ -93,6 +95,7 @@ public class DorisBatchStreamLoad implements Serializable {
     private final AtomicLong currentCacheBytes = new AtomicLong(0L);
     private final Lock lock = new ReentrantLock();
     private final Condition block = lock.newCondition();
+    private final Map<String, ReadWriteLock> bufferMapLock = new 
ConcurrentHashMap<>();
     private final int FLUSH_QUEUE_SIZE = 2;
     private DorisOptions options;
     private LogChannelInterface log;
@@ -188,6 +191,8 @@ public class DorisBatchStreamLoad implements Serializable {
     public void writeRecord(String database, String table, byte[] record) {
         checkFlushException();
         String bufferKey = getTableIdentifier(database, table);
+
+        getLock(bufferKey).readLock().lock();
         BatchRecordBuffer buffer =
                 bufferMap.computeIfAbsent(
                         bufferKey,
@@ -200,6 +205,8 @@ public class DorisBatchStreamLoad implements Serializable {
 
         int bytes = buffer.insert(record);
         currentCacheBytes.addAndGet(bytes);
+        getLock(bufferKey).readLock().unlock();
+
         if (currentCacheBytes.get() > maxBlockedBytes) {
             lock.lock();
             try {
@@ -241,7 +248,7 @@ public class DorisBatchStreamLoad implements Serializable {
      * Force flush and wait for success.
      * @return
      */
-    public  boolean forceFlush() {
+    public boolean forceFlush() {
         return doFlush(null, true, false);
     }
 
@@ -259,8 +266,9 @@ public class DorisBatchStreamLoad implements Serializable {
     }
 
     private synchronized boolean flush(String bufferKey, boolean waitUtilDone) 
{
-        if (bufferMap.isEmpty()) {
+        if (!waitUtilDone && bufferMap.isEmpty()) {
             // bufferMap may have been flushed by other threads
+            log.logDetailed("bufferMap is empty, no need to flush {}", 
bufferKey);
             return false;
         }
         if (null == bufferKey) {
@@ -288,12 +296,21 @@ public class DorisBatchStreamLoad implements Serializable 
{
     }
 
     private synchronized void flushBuffer(String bufferKey) {
-        BatchRecordBuffer buffer = bufferMap.get(bufferKey);
+        BatchRecordBuffer buffer;
+        try {
+            getLock(bufferKey).writeLock().lock();
+            buffer = bufferMap.remove(bufferKey);
+        } finally {
+            getLock(bufferKey).writeLock().unlock();
+        }
+        if (buffer == null) {
+            log.logDetailed("buffer key is not exist {}, skipped", bufferKey);
+            return;
+        }
         String label = String.format("%s_%s_%s", "kettle", buffer.getTable(), 
UUID.randomUUID());
         buffer.setLabelName(label);
         log.logDetailed("Flush buffer, table " + bufferKey + ", records " + 
buffer.getNumOfRecords());
         putRecordToFlushQueue(buffer);
-        bufferMap.remove(bufferKey);
     }
 
     private void putRecordToFlushQueue(BatchRecordBuffer buffer) {
@@ -306,6 +323,9 @@ public class DorisBatchStreamLoad implements Serializable {
         } catch (InterruptedException e) {
             throw new RuntimeException("Failed to put record buffer to flush 
queue");
         }
+        // When the load thread reports an error, the flushQueue will be 
cleared,
+        // and need to force a check for the exception.
+        checkFlushException();
     }
 
     private void checkFlushException() {
@@ -317,7 +337,7 @@ public class DorisBatchStreamLoad implements Serializable {
     private void waitAsyncLoadFinish() {
         // Because the flush thread will drainTo once after polling is 
completed
         // if queue_size is 2, at least 4 empty queues must be consumed to 
ensure that flush has been completed
-        for (int i = 0; i < FLUSH_QUEUE_SIZE * 2; i++) {
+        for (int i = 0; i < FLUSH_QUEUE_SIZE * 2 + 1; i++) {
             BatchRecordBuffer empty = new BatchRecordBuffer();
             putRecordToFlushQueue(empty);
         }
@@ -331,8 +351,6 @@ public class DorisBatchStreamLoad implements Serializable {
         // close async executor
         this.loadExecutorService.shutdown();
         this.started.set(false);
-        // clear buffer
-        this.flushQueue.clear();
     }
 
     public boolean mergeBuffer(List<BatchRecordBuffer> recordList, 
BatchRecordBuffer buffer) {
@@ -381,6 +399,10 @@ public class DorisBatchStreamLoad implements Serializable {
         return true;
     }
 
+    private ReadWriteLock getLock(String bufferKey) {
+        return bufferMapLock.computeIfAbsent(bufferKey, k -> new 
ReentrantReadWriteLock());
+    }
+
     class LoadAsyncExecutor implements Runnable {
 
         private int flushQueueSize;
@@ -482,11 +504,6 @@ public class DorisBatchStreamLoad implements Serializable {
                                     lock.unlock();
                                 }
                                 return;
-                            } else if (LoadStatus.LABEL_ALREADY_EXIST.equals(
-                                    respContent.getStatus())) {
-                                // todo: need to abort transaction when 
JobStatus not finished
-                                putBuilder.setLabel(label + "_" + retry);
-                                reason = respContent.getMessage();
                             } else {
                                 String errMsg = null;
                                 if 
(StringUtils.isBlank(respContent.getMessage())
@@ -523,6 +540,12 @@ public class DorisBatchStreamLoad implements Serializable {
                 // get available backend retry
                 refreshLoadUrl(buffer.getDatabase(), buffer.getTable());
                 putBuilder.setUrl(loadUrl);
+                putBuilder.setLabel(label + "_" + retry);
+
+                try {
+                    Thread.sleep(1000L * retry);
+                } catch (InterruptedException e) {
+                }
             }
             buffer.clear();
             buffer = null;
diff --git 
a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisOptions.java
 
b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisOptions.java
index 202b996cafe..2d6489dc48d 100644
--- 
a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisOptions.java
+++ 
b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisOptions.java
@@ -179,7 +179,7 @@ public class DorisOptions {
             Preconditions.checkArgument(database != null, "Database must not 
be null");
             Preconditions.checkArgument(table != null, "Table must not be 
null");
             Preconditions.checkArgument(bufferFlushMaxRows >= 10000, 
"BufferFlushMaxRows must be greater than 10000");
-            Preconditions.checkArgument(bufferFlushMaxBytes >= 10 * 1024 * 
1024, "BufferFlushMaxBytes must be greater than 10MB");
+            Preconditions.checkArgument(bufferFlushMaxBytes >= 10 * 1024 * 
1024, "BufferFlushMaxBytes must be greater than 10485760(10MB)");
             Preconditions.checkArgument(maxRetries >= 0, "MaxRetries must be 
greater than 0");
             return new DorisOptions(fenodes, username, password, database, 
table, bufferFlushMaxRows, bufferFlushMaxBytes, streamLoadProp, maxRetries, 
deletable);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to