This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 654ec15fff5 [Fix](ecosystem) Fix kettle plugin load field correspondence problem (#48412) 654ec15fff5 is described below commit 654ec15fff5dc6d76679a7f1ed941896974f17dd Author: wudi <w...@selectdb.com> AuthorDate: Thu Feb 27 16:16:53 2025 +0800 [Fix](ecosystem) Fix kettle plugin load field correspondence problem (#48412) ### What problem does this PR solve? Issue Number: close #48302 1. Fix kettle plugin load field correspondence problem 2. Optimize some write logic of streamload --- .../steps/dorisstreamloader/DorisStreamLoader.java | 28 ++++++++----- .../load/DorisBatchStreamLoad.java | 47 ++++++++++++++++------ .../steps/dorisstreamloader/load/DorisOptions.java | 2 +- 3 files changed, 53 insertions(+), 24 deletions(-) diff --git a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoader.java b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoader.java index 91b546baf21..1303d628265 100644 --- a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoader.java +++ b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoader.java @@ -72,6 +72,7 @@ public class DorisStreamLoader extends BaseStep implements StepInterface { return false; } if ( first ) { + logDebug("First process row with meta : " + meta.toString()); first = false; // Cache field indexes. data.keynrs = new int[meta.getFieldStream().length]; @@ -84,11 +85,15 @@ public class DorisStreamLoader extends BaseStep implements StepInterface { data.formatMeta[i] = sourceMeta.clone(); } + // use field table name to serializer data + String[] fieldNames = new String[meta.getFieldTable().length]; + System.arraycopy(meta.getFieldTable(), 0, fieldNames, 0, meta.getFieldTable().length); + Properties loadProperties = options.getStreamLoadProp(); //builder serializer data.serializer = DorisRecordSerializer.builder() .setType(loadProperties.getProperty(FORMAT_KEY, CSV)) - .setFieldNames(getInputRowMeta().getFieldNames()) + .setFieldNames(fieldNames) .setFormatMeta(data.formatMeta) .setFieldDelimiter(loadProperties.getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT)) .setLogChannelInterface(log) @@ -136,17 +141,18 @@ public class DorisStreamLoader extends BaseStep implements StepInterface { } } } + options = DorisOptions.builder() - .withFenodes(meta.getFenodes()) - .withDatabase(meta.getDatabase()) - .withTable(meta.getTable()) - .withUsername(meta.getUsername()) - .withPassword(meta.getPassword()) - .withBufferFlushMaxBytes(meta.getBufferFlushMaxBytes()) - .withBufferFlushMaxRows(meta.getBufferFlushMaxRows()) - .withMaxRetries(meta.getMaxRetries()) - .withStreamLoadProp(streamHeaders) - .withDeletable(meta.isDeletable()).build(); + .withFenodes(meta.getFenodes()) + .withDatabase(meta.getDatabase()) + .withTable(meta.getTable()) + .withUsername(meta.getUsername()) + .withPassword(meta.getPassword()) + .withBufferFlushMaxBytes(meta.getBufferFlushMaxBytes()) + .withBufferFlushMaxRows(meta.getBufferFlushMaxRows()) + .withMaxRetries(meta.getMaxRetries()) + .withStreamLoadProp(streamHeaders) + .withDeletable(meta.isDeletable()).build(); logDetailed("Initializing step with options: " + options.toString()); streamLoad = new DorisBatchStreamLoad(options, log); diff --git a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisBatchStreamLoad.java b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisBatchStreamLoad.java index 226f5f61e73..de5897e2cdd 100644 --- a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisBatchStreamLoad.java +++ b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisBatchStreamLoad.java @@ -49,7 +49,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import static org.pentaho.di.trans.steps.dorisstreamloader.load.LoadConstants.ARROW; @@ -93,6 +95,7 @@ public class DorisBatchStreamLoad implements Serializable { private final AtomicLong currentCacheBytes = new AtomicLong(0L); private final Lock lock = new ReentrantLock(); private final Condition block = lock.newCondition(); + private final Map<String, ReadWriteLock> bufferMapLock = new ConcurrentHashMap<>(); private final int FLUSH_QUEUE_SIZE = 2; private DorisOptions options; private LogChannelInterface log; @@ -188,6 +191,8 @@ public class DorisBatchStreamLoad implements Serializable { public void writeRecord(String database, String table, byte[] record) { checkFlushException(); String bufferKey = getTableIdentifier(database, table); + + getLock(bufferKey).readLock().lock(); BatchRecordBuffer buffer = bufferMap.computeIfAbsent( bufferKey, @@ -200,6 +205,8 @@ public class DorisBatchStreamLoad implements Serializable { int bytes = buffer.insert(record); currentCacheBytes.addAndGet(bytes); + getLock(bufferKey).readLock().unlock(); + if (currentCacheBytes.get() > maxBlockedBytes) { lock.lock(); try { @@ -241,7 +248,7 @@ public class DorisBatchStreamLoad implements Serializable { * Force flush and wait for success. * @return */ - public boolean forceFlush() { + public boolean forceFlush() { return doFlush(null, true, false); } @@ -259,8 +266,9 @@ public class DorisBatchStreamLoad implements Serializable { } private synchronized boolean flush(String bufferKey, boolean waitUtilDone) { - if (bufferMap.isEmpty()) { + if (!waitUtilDone && bufferMap.isEmpty()) { // bufferMap may have been flushed by other threads + log.logDetailed("bufferMap is empty, no need to flush {}", bufferKey); return false; } if (null == bufferKey) { @@ -288,12 +296,21 @@ public class DorisBatchStreamLoad implements Serializable { } private synchronized void flushBuffer(String bufferKey) { - BatchRecordBuffer buffer = bufferMap.get(bufferKey); + BatchRecordBuffer buffer; + try { + getLock(bufferKey).writeLock().lock(); + buffer = bufferMap.remove(bufferKey); + } finally { + getLock(bufferKey).writeLock().unlock(); + } + if (buffer == null) { + log.logDetailed("buffer key is not exist {}, skipped", bufferKey); + return; + } String label = String.format("%s_%s_%s", "kettle", buffer.getTable(), UUID.randomUUID()); buffer.setLabelName(label); log.logDetailed("Flush buffer, table " + bufferKey + ", records " + buffer.getNumOfRecords()); putRecordToFlushQueue(buffer); - bufferMap.remove(bufferKey); } private void putRecordToFlushQueue(BatchRecordBuffer buffer) { @@ -306,6 +323,9 @@ public class DorisBatchStreamLoad implements Serializable { } catch (InterruptedException e) { throw new RuntimeException("Failed to put record buffer to flush queue"); } + // When the load thread reports an error, the flushQueue will be cleared, + // and need to force a check for the exception. + checkFlushException(); } private void checkFlushException() { @@ -317,7 +337,7 @@ public class DorisBatchStreamLoad implements Serializable { private void waitAsyncLoadFinish() { // Because the flush thread will drainTo once after polling is completed // if queue_size is 2, at least 4 empty queues must be consumed to ensure that flush has been completed - for (int i = 0; i < FLUSH_QUEUE_SIZE * 2; i++) { + for (int i = 0; i < FLUSH_QUEUE_SIZE * 2 + 1; i++) { BatchRecordBuffer empty = new BatchRecordBuffer(); putRecordToFlushQueue(empty); } @@ -331,8 +351,6 @@ public class DorisBatchStreamLoad implements Serializable { // close async executor this.loadExecutorService.shutdown(); this.started.set(false); - // clear buffer - this.flushQueue.clear(); } public boolean mergeBuffer(List<BatchRecordBuffer> recordList, BatchRecordBuffer buffer) { @@ -381,6 +399,10 @@ public class DorisBatchStreamLoad implements Serializable { return true; } + private ReadWriteLock getLock(String bufferKey) { + return bufferMapLock.computeIfAbsent(bufferKey, k -> new ReentrantReadWriteLock()); + } + class LoadAsyncExecutor implements Runnable { private int flushQueueSize; @@ -482,11 +504,6 @@ public class DorisBatchStreamLoad implements Serializable { lock.unlock(); } return; - } else if (LoadStatus.LABEL_ALREADY_EXIST.equals( - respContent.getStatus())) { - // todo: need to abort transaction when JobStatus not finished - putBuilder.setLabel(label + "_" + retry); - reason = respContent.getMessage(); } else { String errMsg = null; if (StringUtils.isBlank(respContent.getMessage()) @@ -523,6 +540,12 @@ public class DorisBatchStreamLoad implements Serializable { // get available backend retry refreshLoadUrl(buffer.getDatabase(), buffer.getTable()); putBuilder.setUrl(loadUrl); + putBuilder.setLabel(label + "_" + retry); + + try { + Thread.sleep(1000L * retry); + } catch (InterruptedException e) { + } } buffer.clear(); buffer = null; diff --git a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisOptions.java b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisOptions.java index 202b996cafe..2d6489dc48d 100644 --- a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisOptions.java +++ b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisOptions.java @@ -179,7 +179,7 @@ public class DorisOptions { Preconditions.checkArgument(database != null, "Database must not be null"); Preconditions.checkArgument(table != null, "Table must not be null"); Preconditions.checkArgument(bufferFlushMaxRows >= 10000, "BufferFlushMaxRows must be greater than 10000"); - Preconditions.checkArgument(bufferFlushMaxBytes >= 10 * 1024 * 1024, "BufferFlushMaxBytes must be greater than 10MB"); + Preconditions.checkArgument(bufferFlushMaxBytes >= 10 * 1024 * 1024, "BufferFlushMaxBytes must be greater than 10485760(10MB)"); Preconditions.checkArgument(maxRetries >= 0, "MaxRetries must be greater than 0"); return new DorisOptions(fenodes, username, password, database, table, bufferFlushMaxRows, bufferFlushMaxBytes, streamLoadProp, maxRetries, deletable); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org