This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new ab9412db [Fix] improve batch mode when streamload failed (#560) ab9412db is described below commit ab9412db00cd36f5e4372cc9c022f12e8d4b4b14 Author: wudi <676366...@qq.com> AuthorDate: Wed Feb 19 18:01:34 2025 +0800 [Fix] improve batch mode when streamload failed (#560) --- .../flink/sink/batch/DorisBatchStreamLoad.java | 24 ++++++++++++++----- .../doris/flink/sink/batch/DorisBatchWriter.java | 11 ++++++++- .../flink/sink/batch/TestDorisBatchStreamLoad.java | 27 +++++----------------- .../flink/sink/batch/TestDorisBatchWriter.java | 3 ++- 4 files changed, 36 insertions(+), 29 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 267a121c..c8473120 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -184,6 +184,7 @@ public class DorisBatchStreamLoad implements Serializable { public void writeRecord(String database, String table, byte[] record) { checkFlushException(); String bufferKey = getTableIdentifier(database, table); + getLock(bufferKey).readLock().lock(); BatchRecordBuffer buffer = bufferMap.computeIfAbsent( @@ -198,6 +199,7 @@ public class DorisBatchStreamLoad implements Serializable { int bytes = buffer.insert(record); currentCacheBytes.addAndGet(bytes); getLock(bufferKey).readLock().unlock(); + if (currentCacheBytes.get() > maxBlockedBytes) { lock.lock(); try { @@ -258,8 +260,9 @@ public class DorisBatchStreamLoad implements Serializable { } private synchronized boolean flush(String bufferKey, boolean waitUtilDone) { - if (bufferMap.isEmpty()) { + if (!waitUtilDone && bufferMap.isEmpty()) { // bufferMap may have been flushed by other threads + LOG.info("bufferMap is empty, no need to flush {}", bufferKey); return false; } if (null == bufferKey) { @@ -295,6 +298,7 @@ public class DorisBatchStreamLoad implements Serializable { getLock(bufferKey).writeLock().unlock(); } if (buffer == null) { + LOG.info("buffer key is not exist {}, skipped", bufferKey); return; } buffer.setLabelName(labelGenerator.generateBatchLabel(buffer.getTable())); @@ -312,6 +316,9 @@ public class DorisBatchStreamLoad implements Serializable { } catch (InterruptedException e) { throw new RuntimeException("Failed to put record buffer to flush queue"); } + // When the load thread reports an error, the flushQueue will be cleared, + // and need to force a check for the exception. + checkFlushException(); } private void checkFlushException() { @@ -321,7 +328,9 @@ public class DorisBatchStreamLoad implements Serializable { } private void waitAsyncLoadFinish() { - for (int i = 0; i < executionOptions.getFlushQueueSize() + 1; i++) { + // Because the queue will have a drainTo operation, it needs to be multiplied by 2 + for (int i = 0; i < executionOptions.getFlushQueueSize() * 2 + 1; i++) { + // eof buffer BatchRecordBuffer empty = new BatchRecordBuffer(); putRecordToFlushQueue(empty); } @@ -335,8 +344,6 @@ public class DorisBatchStreamLoad implements Serializable { // close async executor this.loadExecutorService.shutdown(); this.started.set(false); - // clear buffer - this.flushQueue.clear(); } @VisibleForTesting @@ -407,10 +414,14 @@ public class DorisBatchStreamLoad implements Serializable { recordList.clear(); try { BatchRecordBuffer buffer = flushQueue.poll(2000L, TimeUnit.MILLISECONDS); - if (buffer == null || buffer.getLabelName() == null) { - // label is empty and does not need to load. It is the flag of waitUtilDone + if (buffer == null) { continue; } + if (buffer.getLabelName() == null) { + // When the label is empty, it is the eof buffer for checkpoint flush. + continue; + } + recordList.add(buffer); boolean merge = false; if (!flushQueue.isEmpty()) { @@ -424,6 +435,7 @@ public class DorisBatchStreamLoad implements Serializable { if (!merge) { for (BatchRecordBuffer bf : recordList) { if (bf == null || bf.getLabelName() == null) { + // When the label is empty, it's eof buffer for checkpointFlush. continue; } load(bf.getLabelName(), bf); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java index db486bcb..b0b256ee 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java @@ -18,6 +18,7 @@ package org.apache.doris.flink.sink.batch; import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import org.apache.flink.util.concurrent.ExecutorThreadFactory; @@ -67,6 +68,12 @@ public class DorisBatchWriter<IN> DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, DorisExecutionOptions executionOptions) { + + long restoreCheckpointId = + initContext + .getRestoredCheckpointId() + .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1); + LOG.info("restore from checkpointId {}", restoreCheckpointId); if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) { String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); Preconditions.checkState( @@ -75,6 +82,7 @@ public class DorisBatchWriter<IN> this.database = tableInfo[0]; this.table = tableInfo[1]; } + LOG.info("labelPrefix " + executionOptions.getLabelPrefix()); this.subtaskId = initContext.getSubtaskId(); this.labelPrefix = executionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId(); @@ -130,12 +138,13 @@ public class DorisBatchWriter<IN> @Override public Collection<DorisCommittable> prepareCommit() throws IOException, InterruptedException { - // nothing to commit + checkFlushException(); return Collections.emptyList(); } @Override public List<DorisWriterState> snapshotState(long checkpointId) throws IOException { + checkFlushException(); return new ArrayList<>(); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java index 62d84c99..784334aa 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java @@ -45,7 +45,6 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; import static org.apache.doris.flink.sink.batch.TestBatchBufferStream.mergeByteArrays; import static org.mockito.ArgumentMatchers.any; @@ -124,17 +123,10 @@ public class TestDorisBatchStreamLoad { when(httpClientBuilder.build()).thenReturn(httpClient); when(httpClient.execute(any())).thenReturn(response); loader.writeRecord("db", "tbl", "1,data".getBytes()); - loader.checkpointFlush(); - TestUtil.waitUntilCondition( - () -> !loader.isLoadThreadAlive(), - Deadline.fromNow(Duration.ofSeconds(20)), - 100L, - "testLoadFail wait loader exit failed." + loader.isLoadThreadAlive()); - AtomicReference<Throwable> exception = loader.getException(); - Assert.assertTrue(exception.get() instanceof Exception); - Assert.assertTrue(exception.get().getMessage().contains("stream load error")); - LOG.info("testLoadFail end"); + thrown.expect(Exception.class); + thrown.expectMessage("stream load error"); + loader.checkpointFlush(); } @Test @@ -175,17 +167,10 @@ public class TestDorisBatchStreamLoad { when(httpClientBuilder.build()).thenReturn(httpClient); when(httpClient.execute(any())).thenReturn(response); loader.writeRecord("db", "tbl", "1,data".getBytes()); - loader.checkpointFlush(); - TestUtil.waitUntilCondition( - () -> !loader.isLoadThreadAlive(), - Deadline.fromNow(Duration.ofSeconds(20)), - 100L, - "testLoadError wait loader exit failed." + loader.isLoadThreadAlive()); - AtomicReference<Throwable> exception = loader.getException(); - Assert.assertTrue(exception.get() instanceof Exception); - Assert.assertTrue(exception.get().getMessage().contains("stream load error")); - LOG.info("testLoadError end"); + thrown.expect(Exception.class); + thrown.expectMessage("stream load error"); + loader.checkpointFlush(); } @After diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchWriter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchWriter.java index 225dabd5..9704e250 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchWriter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchWriter.java @@ -57,7 +57,8 @@ public class TestDorisBatchWriter { .build(); thrown.expect(IllegalStateException.class); thrown.expectMessage("tableIdentifier input error"); - DorisBatchWriter batchWriter = new DorisBatchWriter(null, null, options, null, null); + Sink.InitContext initContext = mock(Sink.InitContext.class); + DorisBatchWriter batchWriter = new DorisBatchWriter(initContext, null, options, null, null); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org