This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 2bcfad1d [Improve](Stream) Optimize the situation where Flink may get stuck after the streamload thread exits abnormally (#578) 2bcfad1d is described below commit 2bcfad1dd00552992f6ab397caa95832798b3219 Author: wudi <676366...@qq.com> AuthorDate: Tue Mar 18 16:34:00 2025 +0800 [Improve](Stream) Optimize the situation where Flink may get stuck after the streamload thread exits abnormally (#578) --- .../doris/flink/cfg/DorisExecutionOptions.java | 2 +- .../exception/LabelAlreadyExistsException.java | 2 +- .../doris/flink/sink/writer/DorisStreamLoad.java | 118 +++++++++++++++----- .../doris/flink/sink/writer/DorisWriter.java | 116 +++++--------------- .../doris/flink/sink/writer/LoadConstants.java | 9 ++ .../doris/flink/sink/writer/RecordBuffer.java | 34 +++--- .../doris/flink/sink/writer/RecordStream.java | 7 +- .../doris/flink/table/DorisConfigOptions.java | 4 +- .../sink/DorisSinkMultiTblFailoverITCase.java | 121 ++++++++++++++++++++- .../doris/flink/sink/writer/TestDorisWriter.java | 54 ++++----- 10 files changed, 299 insertions(+), 168 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 371069f5..831a317e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -34,7 +34,7 @@ public class DorisExecutionOptions implements Serializable { private static final long serialVersionUID = 1L; // 0 means disable checker thread - public static final int DEFAULT_CHECK_INTERVAL = 10000; + public static final int DEFAULT_CHECK_INTERVAL = 0; public static final int DEFAULT_MAX_RETRY_TIMES = 3; private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; private static final int DEFAULT_BUFFER_COUNT = 3; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/LabelAlreadyExistsException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/LabelAlreadyExistsException.java index ea86c60a..9a523b16 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/LabelAlreadyExistsException.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/LabelAlreadyExistsException.java @@ -17,7 +17,7 @@ package org.apache.doris.flink.exception; -public class LabelAlreadyExistsException extends RuntimeException { +public class LabelAlreadyExistsException extends DorisRuntimeException { public LabelAlreadyExistsException() { super(); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index c6e39326..18a97959 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -28,10 +28,12 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.exception.DorisRuntimeException; +import org.apache.doris.flink.exception.LabelAlreadyExistsException; import org.apache.doris.flink.exception.StreamLoadException; import org.apache.doris.flink.rest.models.RespContent; import org.apache.doris.flink.sink.EscapeHandler; import org.apache.doris.flink.sink.HttpPutBuilder; +import org.apache.doris.flink.sink.LoadStatus; import org.apache.doris.flink.sink.ResponseUtil; import org.apache.http.client.entity.GzipCompressingEntity; import org.apache.http.client.methods.CloseableHttpResponse; @@ -47,6 +49,7 @@ import java.net.NoRouteToHostException; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -61,6 +64,7 @@ import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE; import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE_GZ; import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; +import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_SUCCESS_STATUS; import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT; import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT_OFF_MODE; @@ -88,7 +92,7 @@ public class DorisStreamLoad implements Serializable { private final boolean enableDelete; private final Properties streamLoadProp; private final RecordStream recordStream; - private volatile Future<CloseableHttpResponse> pendingLoadFuture; + private volatile Future<RespContent> pendingLoadFuture; private volatile Exception httpException = null; private final CloseableHttpClient httpClient; private final ExecutorService executorService; @@ -175,7 +179,7 @@ public class DorisStreamLoad implements Serializable { this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db); } - public Future<CloseableHttpResponse> getPendingLoadFuture() { + public Future<RespContent> getPendingLoadFuture() { return pendingLoadFuture; } @@ -254,19 +258,30 @@ public class DorisStreamLoad implements Serializable { * @param record * @throws IOException */ - public void writeRecord(byte[] record) throws IOException { + public void writeRecord(byte[] record) throws InterruptedException { checkLoadException(); - if (loadBatchFirstRecord) { - loadBatchFirstRecord = false; - } else if (lineDelimiter != null) { - recordStream.write(lineDelimiter); + try { + if (loadBatchFirstRecord) { + loadBatchFirstRecord = false; + } else if (lineDelimiter != null) { + recordStream.write(lineDelimiter); + } + recordStream.write(record); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (httpException != null) { + throw new DorisRuntimeException(httpException.getMessage(), httpException); + } else { + LOG.info("write record interrupted, cause " + e.getClass()); + throw e; + } } - recordStream.write(record); } private void checkLoadException() { if (httpException != null) { - throw new RuntimeException("Stream load http request error, ", httpException); + throw new RuntimeException( + "Stream load http request error, " + httpException.getMessage(), httpException); } } @@ -292,26 +307,32 @@ public class DorisStreamLoad implements Serializable { throw new StreamLoadException("stream load error: " + response.getStatusLine().toString()); } - public RespContent stopLoad() throws IOException { - recordStream.endInput(); - if (enableGroupCommit) { - LOG.info("table {} stream load stopped with group commit on host {}", table, hostPort); - } else { - LOG.info( - "table {} stream load stopped for {} on host {}", - table, - currentLabel, - hostPort); - } - - Preconditions.checkState(pendingLoadFuture != null); + public RespContent stopLoad() throws InterruptedException { try { - return handlePreCommitResponse(pendingLoadFuture.get()); - } catch (NoRouteToHostException nex) { - LOG.error("Failed to connect, cause ", nex); - throw new DorisRuntimeException( - "No Route to Host to " + hostPort + ", exception: " + nex); - } catch (Exception e) { + recordStream.endInput(); + if (enableGroupCommit) { + LOG.info( + "table {} stream load stopped with group commit on host {}", + table, + hostPort); + } else { + LOG.info( + "table {} stream load stopped for {} on host {}", + table, + currentLabel, + hostPort); + } + + Preconditions.checkState(pendingLoadFuture != null); + return pendingLoadFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (httpException != null) { + throw new DorisRuntimeException(httpException.getMessage(), httpException); + } else { + throw e; + } + } catch (ExecutionException e) { throw new DorisRuntimeException(e); } } @@ -365,7 +386,46 @@ public class DorisStreamLoad implements Serializable { () -> { LOG.info(executeMessage); try { - return httpClient.execute(putBuilder.build()); + CloseableHttpResponse execute = + httpClient.execute(putBuilder.build()); + RespContent respContent = handlePreCommitResponse(execute); + + if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { + if (enable2PC + && LoadStatus.LABEL_ALREADY_EXIST.equals( + respContent.getStatus()) + && !JOB_EXIST_FINISHED.equals( + respContent.getExistingJobStatus())) { + LOG.info( + "try to abort {} cause status {}, exist job status {} ", + respContent.getLabel(), + respContent.getStatus(), + respContent.getExistingJobStatus()); + abortLabelExistTransaction(respContent); + throw new LabelAlreadyExistsException( + "Exist label abort finished, retry"); + } else { + String errMsg = + String.format( + "table %s.%s stream load error: %s, see more in %s", + getDb(), + getTable(), + respContent.getMessage(), + respContent.getErrorURL()); + LOG.error("Failed to load, {}", errMsg); + throw new DorisRuntimeException(errMsg); + } + } + return respContent; + } catch (NoRouteToHostException nex) { + LOG.error("Failed to connect, cause ", nex); + httpException = nex; + mainThread.interrupt(); + throw new DorisRuntimeException( + "No Route to Host to " + + hostPort + + ", exception: " + + nex); } catch (Exception e) { LOG.error("Failed to execute load, cause ", e); httpException = e; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index 466b995f..64fbc95f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -27,7 +27,6 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisRuntimeException; -import org.apache.doris.flink.exception.LabelAlreadyExistsException; import org.apache.doris.flink.exception.StreamLoadException; import org.apache.doris.flink.rest.models.RespContent; import org.apache.doris.flink.sink.BackendUtil; @@ -41,7 +40,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -49,12 +47,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; - -import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT; -import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; -import static org.apache.doris.flink.sink.writer.DorisStreamLoad.JOB_EXIST_FINISHED; /** * Doris Writer will load data to doris. @@ -64,8 +56,6 @@ import static org.apache.doris.flink.sink.writer.DorisStreamLoad.JOB_EXIST_FINIS public class DorisWriter<IN> implements DorisAbstractWriter<IN, DorisWriterState, DorisCommittable> { private static final Logger LOG = LoggerFactory.getLogger(DorisWriter.class); - private static final List<String> DORIS_SUCCESS_STATUS = - new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); private final long lastCheckpointId; private long curCheckpointId; private Map<String, DorisStreamLoad> dorisStreamLoadMap = new ConcurrentHashMap<>(); @@ -86,7 +76,6 @@ public class DorisWriter<IN> private SinkWriterMetricGroup sinkMetricGroup; private Map<String, DorisWriteMetrics> sinkMetricsMap = new ConcurrentHashMap<>(); private volatile boolean multiTableLoad = false; - private final ReentrantLock checkLock = new ReentrantLock(); public DorisWriter( Sink.InitContext initContext, @@ -139,13 +128,6 @@ public class DorisWriter<IN> } // get main work thread. executorThread = Thread.currentThread(); - if (intervalTime >= 1000) { - // when uploading data in streaming mode, we need to regularly detect whether there are - // exceptions. - LOG.info("start stream load checkdone thread with interval {} ms", intervalTime); - scheduledExecutorService.scheduleWithFixedDelay( - this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS); - } } @VisibleForTesting @@ -242,59 +224,32 @@ public class DorisWriter<IN> } // disable exception checker before stop load. globalLoading = false; - checkLock.lockInterruptibly(); - try { - // submit stream load http request - List<DorisCommittable> committableList = new ArrayList<>(); - for (Map.Entry<String, DorisStreamLoad> streamLoader : dorisStreamLoadMap.entrySet()) { - String tableIdentifier = streamLoader.getKey(); - if (!loadingMap.getOrDefault(tableIdentifier, false)) { - LOG.debug("skip table {}, no data need to load.", tableIdentifier); - continue; - } - DorisStreamLoad dorisStreamLoad = streamLoader.getValue(); - RespContent respContent = dorisStreamLoad.stopLoad(); - // refresh metrics - if (sinkMetricsMap.containsKey(tableIdentifier)) { - DorisWriteMetrics dorisWriteMetrics = sinkMetricsMap.get(tableIdentifier); - dorisWriteMetrics.flush(respContent); - } - if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { - if (executionOptions.enabled2PC() - && LoadStatus.LABEL_ALREADY_EXIST.equals(respContent.getStatus()) - && !JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) { - LOG.info( - "try to abort {} cause status {}, exist job status {} ", - respContent.getLabel(), - respContent.getStatus(), - respContent.getExistingJobStatus()); - dorisStreamLoad.abortLabelExistTransaction(respContent); - throw new LabelAlreadyExistsException("Exist label abort finished, retry"); - } else { - String errMsg = - String.format( - "table %s stream load error: %s, see more in %s", - tableIdentifier, - respContent.getMessage(), - respContent.getErrorURL()); - LOG.error("Failed to load, {}", errMsg); - throw new DorisRuntimeException(errMsg); - } - } - if (executionOptions.enabled2PC()) { - long txnId = respContent.getTxnId(); - committableList.add( - new DorisCommittable( - dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId)); - } + // submit stream load http request + List<DorisCommittable> committableList = new ArrayList<>(); + for (Map.Entry<String, DorisStreamLoad> streamLoader : dorisStreamLoadMap.entrySet()) { + String tableIdentifier = streamLoader.getKey(); + if (!loadingMap.getOrDefault(tableIdentifier, false)) { + LOG.debug("skip table {}, no data need to load.", tableIdentifier); + continue; + } + DorisStreamLoad dorisStreamLoad = streamLoader.getValue(); + RespContent respContent = dorisStreamLoad.stopLoad(); + // refresh metrics + if (sinkMetricsMap.containsKey(tableIdentifier)) { + DorisWriteMetrics dorisWriteMetrics = sinkMetricsMap.get(tableIdentifier); + dorisWriteMetrics.flush(respContent); + } + if (executionOptions.enabled2PC()) { + long txnId = respContent.getTxnId(); + committableList.add( + new DorisCommittable( + dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId)); } - - // clean loadingMap - loadingMap.clear(); - return committableList; - } finally { - checkLock.unlock(); } + + // clean loadingMap + loadingMap.clear(); + return committableList; } private void abortPossibleSuccessfulTransaction() { @@ -358,25 +313,15 @@ public class DorisWriter<IN> new HttpUtil(dorisReadOptions).getHttpClient())); } - /** Check the streamload http request regularly. */ + /** Http throws an exception actively, there is no need to check regularly. */ + @Deprecated private void checkDone() { if (!globalLoading) { return; } LOG.debug("start timer checker, interval {} ms", intervalTime); - if (checkLock.tryLock()) { - try { - // double check - if (!globalLoading) { - return; - } - for (Map.Entry<String, DorisStreamLoad> streamLoadMap : - dorisStreamLoadMap.entrySet()) { - checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue()); - } - } finally { - checkLock.unlock(); - } + for (Map.Entry<String, DorisStreamLoad> streamLoadMap : dorisStreamLoadMap.entrySet()) { + checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue()); } } @@ -401,6 +346,7 @@ public class DorisWriter<IN> // use send cached data to new txn, then notify to restart the stream if (executionOptions.isUseCache()) { try { + dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend(subtaskId)); if (executionOptions.enabled2PC()) { dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId); @@ -419,9 +365,7 @@ public class DorisWriter<IN> } else { String errorMsg; try { - RespContent content = - dorisStreamLoad.handlePreCommitResponse( - dorisStreamLoad.getPendingLoadFuture().get()); + RespContent content = dorisStreamLoad.getPendingLoadFuture().get(); if (executionOptions.enabled2PC() && LoadStatus.LABEL_ALREADY_EXIST.equals(content.getStatus())) { LOG.info( diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java index 1e026977..54deb4a0 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java @@ -17,6 +17,13 @@ package org.apache.doris.flink.sink.writer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT; +import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; + /** Constants for load. */ public class LoadConstants { public static final String COLUMNS_KEY = "columns"; @@ -35,4 +42,6 @@ public class LoadConstants { public static final String GROUP_COMMIT_OFF_MODE = "off_mode"; public static final String COMPRESS_TYPE = "compress_type"; public static final String COMPRESS_TYPE_GZ = "gz"; + public static final List<String> DORIS_SUCCESS_STATUS = + new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java index c84e1039..aeecaed9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java @@ -64,25 +64,21 @@ public class RecordBuffer { } } - public void stopBufferData() throws IOException { - try { - // add Empty buffer as finish flag. - boolean isEmpty = false; - if (currentWriteBuffer != null) { - currentWriteBuffer.flip(); - // check if the current write buffer is empty. - isEmpty = currentWriteBuffer.limit() == 0; - readQueue.put(currentWriteBuffer); - currentWriteBuffer = null; - } - if (!isEmpty) { - ByteBuffer byteBuffer = writeQueue.take(); - byteBuffer.flip(); - Preconditions.checkState(byteBuffer.limit() == 0); - readQueue.put(byteBuffer); - } - } catch (Exception e) { - throw new IOException(e); + public void stopBufferData() throws InterruptedException { + // add Empty buffer as finish flag. + boolean isEmpty = false; + if (currentWriteBuffer != null) { + currentWriteBuffer.flip(); + // check if the current write buffer is empty. + isEmpty = currentWriteBuffer.limit() == 0; + readQueue.put(currentWriteBuffer); + currentWriteBuffer = null; + } + if (!isEmpty) { + ByteBuffer byteBuffer = writeQueue.take(); + byteBuffer.flip(); + Preconditions.checkState(byteBuffer.limit() == 0); + readQueue.put(byteBuffer); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java index 693967ac..2fe51734 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java @@ -45,7 +45,7 @@ public class RecordStream extends InputStream { recordBuffer.startBufferData(); } - public void endInput() throws IOException { + public void endInput() throws InterruptedException { recordBuffer.stopBufferData(); } @@ -54,15 +54,16 @@ public class RecordStream extends InputStream { try { return recordBuffer.read(buff); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e); } } - public void write(byte[] buff) throws IOException { + public void write(byte[] buff) throws InterruptedException { try { recordBuffer.write(buff); } catch (InterruptedException e) { - throw new RuntimeException(e); + throw e; } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index b99710e3..ffd3ec92 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -216,9 +216,9 @@ public class DorisConfigOptions { public static final ConfigOption<Duration> SINK_CHECK_INTERVAL = ConfigOptions.key("sink.check-interval") .durationType() - .defaultValue(Duration.ofMillis(10000)) + .defaultValue(Duration.ofMillis(0)) .withDescription( - "check exception with the interval while loading, The default is 1s, 0 means disabling the checker thread"); + "check exception with the interval while loading, 0 means disabling the checker thread"); public static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions.key("sink.max-retries") .intType() diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java index d5dd6927..f4146990 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Deadline; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; @@ -48,6 +49,7 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import static org.apache.flink.api.common.JobStatus.FINISHED; import static org.apache.flink.api.common.JobStatus.RUNNING; /** DorisSink abnormal case of multi-table writing */ @@ -57,6 +59,7 @@ public class DorisSinkMultiTblFailoverITCase extends AbstractITCaseService { LoggerFactory.getLogger(DorisSinkMultiTblFailoverITCase.class); static final String DATABASE = "test_multi_failover_sink"; static final String TABLE_MULTI_CSV = "tbl_multi_csv"; + static final String TABLE_MULTI_CSV_NO_EXIST_TBL = "tbl_multi_csv_no_exist"; private final boolean batchMode; public DorisSinkMultiTblFailoverITCase(boolean batchMode) { @@ -68,12 +71,121 @@ public class DorisSinkMultiTblFailoverITCase extends AbstractITCaseService { return new Object[][] {new Object[] {false}, new Object[] {true}}; } + /** + * In an extreme case, during a checkpoint, a piece of data written is bufferCount*bufferSize + */ + @Test + public void testTableNotExistCornerCase() throws Exception { + LOG.info("Start to testTableNotExistCornerCase"); + dropDatabase(); + dropTable(TABLE_MULTI_CSV_NO_EXIST_TBL); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getCheckpointConfig().setCheckpointTimeout(300 * 1000); + env.setParallelism(1); + int checkpointIntervalMs = 10000; + env.enableCheckpointing(checkpointIntervalMs); + + Properties properties = new Properties(); + properties.setProperty("column_separator", ","); + properties.setProperty("format", "csv"); + DorisSink.Builder<RecordWithMeta> builder = DorisSink.builder(); + DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); + executionBuilder + .setLabelPrefix(UUID.randomUUID().toString()) + .enable2PC() + .setBatchMode(batchMode) + .setFlushQueueSize(1) + .setBufferSize(1) + .setBufferCount(3) + .setCheckInterval(0) + .setStreamLoadProp(properties); + DorisOptions.Builder dorisBuilder = DorisOptions.builder(); + dorisBuilder + .setFenodes(getFenodes()) + .setTableIdentifier("") + .setUsername(getDorisUsername()) + .setPassword(getDorisPassword()); + + builder.setDorisReadOptions(DorisReadOptions.builder().build()) + .setDorisExecutionOptions(executionBuilder.build()) + .setSerializer(new RecordWithMetaSerializer()) + .setDorisOptions(dorisBuilder.build()); + + DataStreamSource<RecordWithMeta> mockSource = + env.addSource( + new SourceFunction<RecordWithMeta>() { + @Override + public void run(SourceContext<RecordWithMeta> ctx) throws Exception { + RecordWithMeta record3 = + new RecordWithMeta( + DATABASE, TABLE_MULTI_CSV_NO_EXIST_TBL, "1,3"); + ctx.collect(record3); + } + + @Override + public void cancel() {} + }); + + mockSource.sinkTo(builder.build()); + JobClient jobClient = env.executeAsync(); + CompletableFuture<JobStatus> jobStatus = jobClient.getJobStatus(); + LOG.info("Job status: {}", jobStatus); + + waitForJobStatus( + jobClient, + Collections.singletonList(RUNNING), + Deadline.fromNow(Duration.ofSeconds(60))); + // wait checkpoint failure + List<JobStatus> errorStatus = + Arrays.asList( + JobStatus.FAILING, + JobStatus.CANCELLING, + JobStatus.CANCELED, + JobStatus.FAILED, + JobStatus.RESTARTING); + + waitForJobStatus(jobClient, errorStatus, Deadline.fromNow(Duration.ofSeconds(30))); + + LOG.info("start to create add table"); + initializeTable(TABLE_MULTI_CSV_NO_EXIST_TBL); + + LOG.info("wait job restart success"); + // wait table restart success + waitForJobStatus( + jobClient, + Collections.singletonList(RUNNING), + Deadline.fromNow(Duration.ofSeconds(60))); + + LOG.info("wait job running finished"); + waitForJobStatus( + jobClient, + Collections.singletonList(FINISHED), + Deadline.fromNow(Duration.ofSeconds(60))); + + String queryRes = + String.format( + "select id,task_id from %s.%s ", DATABASE, TABLE_MULTI_CSV_NO_EXIST_TBL); + List<String> expected = Arrays.asList("1,3"); + + if (!batchMode) { + ContainerUtils.checkResult( + getDorisQueryConnection(), LOG, expected, queryRes, 2, false); + } else { + List<String> actualResult = + ContainerUtils.getResult(getDorisQueryConnection(), LOG, expected, queryRes, 2); + LOG.info("actual size: {}, expected size: {}", actualResult.size(), expected.size()); + Assert.assertTrue( + actualResult.size() >= expected.size() && actualResult.containsAll(expected)); + } + } + /** * Four exceptions are simulated in one job 1. Add a table that does not exist 2. flink * checkpoint failed 3. doris cluster restart 4. stream load fail */ @Test - public void testDorisClusterFailoverSink() throws Exception { + public void testMultiTblFailoverSink() throws Exception { + LOG.info("Start to testMultiTblFailoverSink"); int totalTblNum = 3; for (int i = 1; i <= totalTblNum; i++) { String tableName = TABLE_MULTI_CSV + i; @@ -274,4 +386,11 @@ public class DorisSinkMultiTblFailoverITCase extends AbstractITCaseService { LOG, String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table)); } + + private void dropDatabase() { + ContainerUtils.executeSQLStatement( + getDorisQueryConnection(), + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)); + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java index 3e1ab2c2..c1f59c74 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java @@ -26,20 +26,18 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisRuntimeException; -import org.apache.doris.flink.exception.LabelAlreadyExistsException; import org.apache.doris.flink.sink.BackendUtil; import org.apache.doris.flink.sink.DorisCommittable; import org.apache.doris.flink.sink.HttpTestUtil; import org.apache.doris.flink.sink.OptionUtils; import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer; import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.impl.client.CloseableHttpClient; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.mockito.MockedStatic; import java.io.IOException; @@ -51,7 +49,7 @@ import java.util.OptionalLong; import java.util.concurrent.ConcurrentHashMap; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.contains; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.when; @@ -61,9 +59,7 @@ public class TestDorisWriter { DorisOptions dorisOptions; DorisReadOptions readOptions; DorisExecutionOptions executionOptions; - private MockedStatic<BackendUtil> backendUtilMockedStatic; - @Rule public ExpectedException thrown = ExpectedException.none(); @Before public void setUp() { @@ -79,7 +75,7 @@ public class TestDorisWriter { CloseableHttpClient httpClient = mock(CloseableHttpClient.class); CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true); - when(httpClient.execute(any())).thenReturn(preCommitResponse); + when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(preCommitResponse); DorisWriter<String> dorisWriter = initWriter(httpClient); dorisWriter.write("doris,1", null); Collection<DorisCommittable> committableList = dorisWriter.prepareCommit(); @@ -96,12 +92,14 @@ public class TestDorisWriter { CloseableHttpClient httpClient = mock(CloseableHttpClient.class); CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.LABEL_EXIST_PRE_COMMIT_TABLE_RESPONSE, true); - when(httpClient.execute(any())).thenReturn(preCommitResponse); + when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(preCommitResponse); DorisWriter<String> dorisWriter = initWriter(httpClient); dorisWriter.write("doris,1", null); - thrown.expect(LabelAlreadyExistsException.class); - thrown.expectMessage("Exist label abort finished, retry"); - dorisWriter.prepareCommit(); + try { + dorisWriter.prepareCommit(); + } catch (DorisRuntimeException e) { + Assert.assertTrue(e.getMessage().contains("Exist label abort finished, retry")); + } } @Test @@ -110,12 +108,14 @@ public class TestDorisWriter { CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse( HttpTestUtil.LABEL_EXIST_PRE_COMMIT_TABLE_FINISH_RESPONSE, true); - when(httpClient.execute(any())).thenReturn(preCommitResponse); + when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(preCommitResponse); DorisWriter<String> dorisWriter = initWriter(httpClient); dorisWriter.write("doris,1", null); - thrown.expect(DorisRuntimeException.class); - thrown.expectMessage(contains("stream load error")); - dorisWriter.prepareCommit(); + try { + dorisWriter.prepareCommit(); + } catch (DorisRuntimeException e) { + Assert.assertTrue(e.getMessage().contains("stream load error")); + } } @Test @@ -124,7 +124,7 @@ public class TestDorisWriter { CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse( HttpTestUtil.LABEL_EXIST_PRE_COMMIT_TABLE_FINISH_RESPONSE, true); - when(httpClient.execute(any())).thenReturn(preCommitResponse); + when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(preCommitResponse); Map<String, DorisStreamLoad> dorisStreamLoadMap = new ConcurrentHashMap<>(); Map<String, DorisWriteMetrics> dorisWriteMetricsMap = new ConcurrentHashMap<>(); Sink.InitContext initContext = mock(Sink.InitContext.class); @@ -154,13 +154,14 @@ public class TestDorisWriter { new LabelGenerator("", true), httpClient); dorisStreamLoadMap.put(dorisOptions.getTableIdentifier(), dorisStreamLoad); - dorisStreamLoad.startLoad("", false); dorisWriter.setDorisStreamLoadMap(dorisStreamLoadMap); dorisWriter.setDorisMetricsMap(dorisWriteMetricsMap); dorisWriter.write("doris,1", null); - thrown.expect(DorisRuntimeException.class); - thrown.expectMessage(contains("stream load error")); - dorisWriter.prepareCommit(); + try { + dorisWriter.prepareCommit(); + } catch (DorisRuntimeException e) { + Assert.assertTrue(e.getMessage().contains("stream load error")); + } } @Test @@ -171,9 +172,11 @@ public class TestDorisWriter { when(httpClient.execute(any())).thenReturn(preCommitResponse); DorisWriter<String> dorisWriter = initWriter(httpClient); dorisWriter.write("doris,1", null); - thrown.expect(DorisRuntimeException.class); - thrown.expectMessage(contains("stream load error")); - dorisWriter.prepareCommit(); + try { + dorisWriter.prepareCommit(); + } catch (DorisRuntimeException e) { + Assert.assertTrue(e.getMessage().contains("stream load error")); + } } private DorisWriter<String> initWriter(CloseableHttpClient httpClient) throws IOException { @@ -205,7 +208,6 @@ public class TestDorisWriter { new LabelGenerator("", true), httpClient); dorisStreamLoadMap.put(dorisOptions.getTableIdentifier(), dorisStreamLoad); - dorisStreamLoad.startLoad("", false); dorisWriter.setDorisStreamLoadMap(dorisStreamLoadMap); dorisWriter.setDorisMetricsMap(dorisWriteMetricsMap); return dorisWriter; @@ -219,13 +221,13 @@ public class TestDorisWriter { when(httpClient.execute(any())).thenReturn(preCommitResponse); DorisWriter<String> dorisWriter = initWriter(httpClient); BackendUtil mock = mock(BackendUtil.class); - when(mock.tryHttpConnection(any())).thenReturn(true); + when(mock.tryHttpConnection(anyString())).thenReturn(true); dorisWriter.setBackendUtil(mock); + dorisWriter.write("doris,1", null); List<DorisWriterState> writerStates = dorisWriter.snapshotState(1); Assert.assertEquals(1, writerStates.size()); Assert.assertEquals("doris", writerStates.get(0).getLabelPrefix()); - Assert.assertTrue(!dorisWriter.isLoading()); } public DorisWriteMetrics getMockWriteMetrics(SinkWriterMetricGroup sinkWriterMetricGroup) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org