This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new d4af6308 (Fix)[case] Fix unstable cases (#415) d4af6308 is described below commit d4af630843b4605f93686517d1f3939f300c40d2 Author: wudi <676366...@qq.com> AuthorDate: Tue Jul 2 11:06:25 2024 +0800 (Fix)[case] Fix unstable cases (#415) --- .../doris/flink/sink/copy/BatchStageLoad.java | 5 +++ .../doris/flink/sink/copy/DorisCopyWriter.java | 4 +- .../java/org/apache/doris/flink/sink/TestUtil.java | 42 +++++++++++++++++++ .../flink/sink/batch/TestDorisBatchStreamLoad.java | 23 ++--------- .../doris/flink/sink/copy/TestDorisCopyWriter.java | 48 +++++++++++----------- 5 files changed, 75 insertions(+), 47 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java index 0080dee1..be8adcb0 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java @@ -427,4 +427,9 @@ public class BatchStageLoad implements Serializable { public void setHttpClientBuilder(HttpClientBuilder httpClientBuilder) { this.httpClientBuilder = httpClientBuilder; } + + @VisibleForTesting + public boolean isLoadThreadAlive() { + return loadThreadAlive; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java index 469b3f2d..b47f2ebe 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java @@ -197,7 +197,7 @@ public class DorisCopyWriter<IN> } @VisibleForTesting - public void setBatchStageLoad(BatchStageLoad batchStageLoad) { - this.batchStageLoad = batchStageLoad; + public BatchStageLoad getBatchStageLoad() { + return batchStageLoad; } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestUtil.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestUtil.java new file mode 100644 index 00000000..9858e05e --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestUtil.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.util.function.SupplierWithException; + +import java.util.concurrent.TimeoutException; + +public class TestUtil { + + public static void waitUntilCondition( + SupplierWithException<Boolean, Exception> condition, + Deadline timeout, + long retryIntervalMillis, + String errorMsg) + throws Exception { + while (timeout.hasTimeLeft() && !(Boolean) condition.get()) { + long timeLeft = Math.max(0L, timeout.timeLeft().toMillis()); + Thread.sleep(Math.min(retryIntervalMillis, timeLeft)); + } + + if (!timeout.hasTimeLeft()) { + throw new TimeoutException(errorMsg); + } + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java index d73ff440..6080db2d 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java @@ -18,13 +18,13 @@ package org.apache.doris.flink.sink.batch; import org.apache.flink.api.common.time.Deadline; -import org.apache.flink.util.function.SupplierWithException; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.sink.BackendUtil; import org.apache.doris.flink.sink.HttpTestUtil; +import org.apache.doris.flink.sink.TestUtil; import org.apache.doris.flink.sink.writer.LabelGenerator; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.impl.client.CloseableHttpClient; @@ -42,7 +42,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import static org.mockito.ArgumentMatchers.any; @@ -97,7 +96,7 @@ public class TestDorisBatchStreamLoad { DorisBatchStreamLoad loader = new DorisBatchStreamLoad( options, readOptions, executionOptions, new LabelGenerator("label", false)); - waitUntilCondition( + TestUtil.waitUntilCondition( () -> loader.isLoadThreadAlive(), Deadline.fromNow(Duration.ofSeconds(10)), 100L, @@ -137,7 +136,7 @@ public class TestDorisBatchStreamLoad { new DorisBatchStreamLoad( options, readOptions, executionOptions, new LabelGenerator("label", false)); - waitUntilCondition( + TestUtil.waitUntilCondition( () -> loader.isLoadThreadAlive(), Deadline.fromNow(Duration.ofSeconds(10)), 100L, @@ -168,20 +167,4 @@ public class TestDorisBatchStreamLoad { backendUtilMockedStatic.close(); } } - - public static void waitUntilCondition( - SupplierWithException<Boolean, Exception> condition, - Deadline timeout, - long retryIntervalMillis, - String errorMsg) - throws Exception { - while (timeout.hasTimeLeft() && !(Boolean) condition.get()) { - long timeLeft = Math.max(0L, timeout.timeLeft().toMillis()); - Thread.sleep(Math.min(retryIntervalMillis, timeLeft)); - } - - if (!timeout.hasTimeLeft()) { - throw new TimeoutException(errorMsg); - } - } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java index c36805ba..9e1327be 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java @@ -17,6 +17,7 @@ package org.apache.doris.flink.sink.copy; +import org.apache.flink.api.common.time.Deadline; import org.apache.flink.api.connector.sink2.Sink; import org.apache.doris.flink.cfg.DorisExecutionOptions; @@ -24,8 +25,8 @@ import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.sink.HttpTestUtil; import org.apache.doris.flink.sink.OptionUtils; +import org.apache.doris.flink.sink.TestUtil; import org.apache.doris.flink.sink.writer.DorisWriterState; -import org.apache.doris.flink.sink.writer.LabelGenerator; import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.impl.client.CloseableHttpClient; @@ -36,8 +37,10 @@ import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; +import java.time.Duration; import java.util.Collection; import java.util.List; +import java.util.regex.Pattern; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -62,18 +65,9 @@ public class TestDorisCopyWriter { HttpClientBuilder httpClientBuilder = mock(HttpClientBuilder.class); CloseableHttpClient httpClient = mock(CloseableHttpClient.class); when(httpClientBuilder.build()).thenReturn(httpClient); - BatchStageLoad stageLoad = - new BatchStageLoad( - dorisOptions, - readOptions, - executionOptions, - new LabelGenerator("label", true)); - stageLoad.setHttpClientBuilder(httpClientBuilder); - CloseableHttpResponse uploadResponse = HttpTestUtil.getResponse("", false, true); CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse("", true, false); when(httpClient.execute(any())).thenReturn(uploadResponse).thenReturn(preCommitResponse); - Sink.InitContext initContext = mock(Sink.InitContext.class); // when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1)); DorisCopyWriter<String> copyWriter = @@ -83,8 +77,15 @@ public class TestDorisCopyWriter { dorisOptions, readOptions, executionOptions); - copyWriter.setBatchStageLoad(stageLoad); - stageLoad.setCurrentCheckpointID(1); + copyWriter.getBatchStageLoad().setHttpClientBuilder(httpClientBuilder); + copyWriter.getBatchStageLoad().setCurrentCheckpointID(1); + + TestUtil.waitUntilCondition( + () -> copyWriter.getBatchStageLoad().isLoadThreadAlive(), + Deadline.fromNow(Duration.ofSeconds(10)), + 100L, + "Condition was not met in given timeout."); + Assert.assertTrue(copyWriter.getBatchStageLoad().isLoadThreadAlive()); // no data Collection<DorisCopyCommittable> committableList = copyWriter.prepareCommit(); Assert.assertEquals(0, committableList.size()); @@ -98,18 +99,16 @@ public class TestDorisCopyWriter { DorisCopyCommittable committable = committableList.toArray(new DorisCopyCommittable[0])[0]; Assert.assertEquals("127.0.0.1:8030", committable.getHostPort()); + Pattern copySql = + Pattern.compile( + "COPY INTO `db`.`table` FROM @~\\('.doris_[0-9a-f]{32}_table_0_1_0}'\\)"); // todo: compare properties - Assert.assertTrue( - committable - .getCopySQL() - .startsWith("COPY INTO `db`.`table` FROM @~('{label_table_0_1_0}')")); + Assert.assertTrue(copySql.matcher(committable.getCopySQL()).find()); copyWriter.close(); } @Test public void testSnapshot() throws Exception { - CloseableHttpClient httpClient = mock(CloseableHttpClient.class); - Sink.InitContext initContext = mock(Sink.InitContext.class); // when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1)); DorisCopyWriter<String> copyWriter = @@ -119,13 +118,12 @@ public class TestDorisCopyWriter { dorisOptions, readOptions, executionOptions); - BatchStageLoad stageLoad = - new BatchStageLoad( - dorisOptions, - readOptions, - executionOptions, - new LabelGenerator("label", true)); - copyWriter.setBatchStageLoad(stageLoad); + TestUtil.waitUntilCondition( + () -> copyWriter.getBatchStageLoad().isLoadThreadAlive(), + Deadline.fromNow(Duration.ofSeconds(10)), + 100L, + "Condition was not met in given timeout."); + Assert.assertTrue(copyWriter.getBatchStageLoad().isLoadThreadAlive()); List<DorisWriterState> writerStates = copyWriter.snapshotState(1); Assert.assertTrue(writerStates.isEmpty()); copyWriter.close(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org