This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new d4af6308 (Fix)[case] Fix unstable cases (#415)
d4af6308 is described below

commit d4af630843b4605f93686517d1f3939f300c40d2
Author: wudi <676366...@qq.com>
AuthorDate: Tue Jul 2 11:06:25 2024 +0800

    (Fix)[case] Fix unstable cases (#415)
---
 .../doris/flink/sink/copy/BatchStageLoad.java      |  5 +++
 .../doris/flink/sink/copy/DorisCopyWriter.java     |  4 +-
 .../java/org/apache/doris/flink/sink/TestUtil.java | 42 +++++++++++++++++++
 .../flink/sink/batch/TestDorisBatchStreamLoad.java | 23 ++---------
 .../doris/flink/sink/copy/TestDorisCopyWriter.java | 48 +++++++++++-----------
 5 files changed, 75 insertions(+), 47 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
index 0080dee1..be8adcb0 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
@@ -427,4 +427,9 @@ public class BatchStageLoad implements Serializable {
     public void setHttpClientBuilder(HttpClientBuilder httpClientBuilder) {
         this.httpClientBuilder = httpClientBuilder;
     }
+
+    @VisibleForTesting
+    public boolean isLoadThreadAlive() {
+        return loadThreadAlive;
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java
index 469b3f2d..b47f2ebe 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java
@@ -197,7 +197,7 @@ public class DorisCopyWriter<IN>
     }
 
     @VisibleForTesting
-    public void setBatchStageLoad(BatchStageLoad batchStageLoad) {
-        this.batchStageLoad = batchStageLoad;
+    public BatchStageLoad getBatchStageLoad() {
+        return batchStageLoad;
     }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestUtil.java 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestUtil.java
new file mode 100644
index 00000000..9858e05e
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestUtil.java
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.util.concurrent.TimeoutException;
+
+public class TestUtil {
+
+    public static void waitUntilCondition(
+            SupplierWithException<Boolean, Exception> condition,
+            Deadline timeout,
+            long retryIntervalMillis,
+            String errorMsg)
+            throws Exception {
+        while (timeout.hasTimeLeft() && !(Boolean) condition.get()) {
+            long timeLeft = Math.max(0L, timeout.timeLeft().toMillis());
+            Thread.sleep(Math.min(retryIntervalMillis, timeLeft));
+        }
+
+        if (!timeout.hasTimeLeft()) {
+            throw new TimeoutException(errorMsg);
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
index d73ff440..6080db2d 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
@@ -18,13 +18,13 @@
 package org.apache.doris.flink.sink.batch;
 
 import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.sink.BackendUtil;
 import org.apache.doris.flink.sink.HttpTestUtil;
+import org.apache.doris.flink.sink.TestUtil;
 import org.apache.doris.flink.sink.writer.LabelGenerator;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.impl.client.CloseableHttpClient;
@@ -42,7 +42,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.mockito.ArgumentMatchers.any;
@@ -97,7 +96,7 @@ public class TestDorisBatchStreamLoad {
         DorisBatchStreamLoad loader =
                 new DorisBatchStreamLoad(
                         options, readOptions, executionOptions, new 
LabelGenerator("label", false));
-        waitUntilCondition(
+        TestUtil.waitUntilCondition(
                 () -> loader.isLoadThreadAlive(),
                 Deadline.fromNow(Duration.ofSeconds(10)),
                 100L,
@@ -137,7 +136,7 @@ public class TestDorisBatchStreamLoad {
                 new DorisBatchStreamLoad(
                         options, readOptions, executionOptions, new 
LabelGenerator("label", false));
 
-        waitUntilCondition(
+        TestUtil.waitUntilCondition(
                 () -> loader.isLoadThreadAlive(),
                 Deadline.fromNow(Duration.ofSeconds(10)),
                 100L,
@@ -168,20 +167,4 @@ public class TestDorisBatchStreamLoad {
             backendUtilMockedStatic.close();
         }
     }
-
-    public static void waitUntilCondition(
-            SupplierWithException<Boolean, Exception> condition,
-            Deadline timeout,
-            long retryIntervalMillis,
-            String errorMsg)
-            throws Exception {
-        while (timeout.hasTimeLeft() && !(Boolean) condition.get()) {
-            long timeLeft = Math.max(0L, timeout.timeLeft().toMillis());
-            Thread.sleep(Math.min(retryIntervalMillis, timeLeft));
-        }
-
-        if (!timeout.hasTimeLeft()) {
-            throw new TimeoutException(errorMsg);
-        }
-    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java
index c36805ba..9e1327be 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.flink.sink.copy;
 
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.connector.sink2.Sink;
 
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
@@ -24,8 +25,8 @@ import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.sink.HttpTestUtil;
 import org.apache.doris.flink.sink.OptionUtils;
+import org.apache.doris.flink.sink.TestUtil;
 import org.apache.doris.flink.sink.writer.DorisWriterState;
-import org.apache.doris.flink.sink.writer.LabelGenerator;
 import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.impl.client.CloseableHttpClient;
@@ -36,8 +37,10 @@ import org.junit.FixMethodOrder;
 import org.junit.Test;
 import org.junit.runners.MethodSorters;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
+import java.util.regex.Pattern;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
@@ -62,18 +65,9 @@ public class TestDorisCopyWriter {
         HttpClientBuilder httpClientBuilder = mock(HttpClientBuilder.class);
         CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
         when(httpClientBuilder.build()).thenReturn(httpClient);
-        BatchStageLoad stageLoad =
-                new BatchStageLoad(
-                        dorisOptions,
-                        readOptions,
-                        executionOptions,
-                        new LabelGenerator("label", true));
-        stageLoad.setHttpClientBuilder(httpClientBuilder);
-
         CloseableHttpResponse uploadResponse = HttpTestUtil.getResponse("", 
false, true);
         CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse("", 
true, false);
         
when(httpClient.execute(any())).thenReturn(uploadResponse).thenReturn(preCommitResponse);
-
         Sink.InitContext initContext = mock(Sink.InitContext.class);
         // 
when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
         DorisCopyWriter<String> copyWriter =
@@ -83,8 +77,15 @@ public class TestDorisCopyWriter {
                         dorisOptions,
                         readOptions,
                         executionOptions);
-        copyWriter.setBatchStageLoad(stageLoad);
-        stageLoad.setCurrentCheckpointID(1);
+        copyWriter.getBatchStageLoad().setHttpClientBuilder(httpClientBuilder);
+        copyWriter.getBatchStageLoad().setCurrentCheckpointID(1);
+
+        TestUtil.waitUntilCondition(
+                () -> copyWriter.getBatchStageLoad().isLoadThreadAlive(),
+                Deadline.fromNow(Duration.ofSeconds(10)),
+                100L,
+                "Condition was not met in given timeout.");
+        Assert.assertTrue(copyWriter.getBatchStageLoad().isLoadThreadAlive());
         // no data
         Collection<DorisCopyCommittable> committableList = 
copyWriter.prepareCommit();
         Assert.assertEquals(0, committableList.size());
@@ -98,18 +99,16 @@ public class TestDorisCopyWriter {
         DorisCopyCommittable committable = committableList.toArray(new 
DorisCopyCommittable[0])[0];
         Assert.assertEquals("127.0.0.1:8030", committable.getHostPort());
 
+        Pattern copySql =
+                Pattern.compile(
+                        "COPY INTO `db`.`table` FROM 
@~\\('.doris_[0-9a-f]{32}_table_0_1_0}'\\)");
         // todo: compare properties
-        Assert.assertTrue(
-                committable
-                        .getCopySQL()
-                        .startsWith("COPY INTO `db`.`table` FROM 
@~('{label_table_0_1_0}')"));
+        Assert.assertTrue(copySql.matcher(committable.getCopySQL()).find());
         copyWriter.close();
     }
 
     @Test
     public void testSnapshot() throws Exception {
-        CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
-
         Sink.InitContext initContext = mock(Sink.InitContext.class);
         // 
when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
         DorisCopyWriter<String> copyWriter =
@@ -119,13 +118,12 @@ public class TestDorisCopyWriter {
                         dorisOptions,
                         readOptions,
                         executionOptions);
-        BatchStageLoad stageLoad =
-                new BatchStageLoad(
-                        dorisOptions,
-                        readOptions,
-                        executionOptions,
-                        new LabelGenerator("label", true));
-        copyWriter.setBatchStageLoad(stageLoad);
+        TestUtil.waitUntilCondition(
+                () -> copyWriter.getBatchStageLoad().isLoadThreadAlive(),
+                Deadline.fromNow(Duration.ofSeconds(10)),
+                100L,
+                "Condition was not met in given timeout.");
+        Assert.assertTrue(copyWriter.getBatchStageLoad().isLoadThreadAlive());
         List<DorisWriterState> writerStates = copyWriter.snapshotState(1);
         Assert.assertTrue(writerStates.isEmpty());
         copyWriter.close();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to