pvary commented on code in PR #11497:
URL: https://github.com/apache/iceberg/pull/11497#discussion_r2063950514


##########
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskInfraExtension.java:
##########
@@ -48,16 +48,7 @@ class MaintenanceTaskInfraExtension implements 
BeforeEachCallback {
 
   @Override
   public void beforeEach(ExtensionContext context) {
-    env = StreamExecutionEnvironment.getExecutionEnvironment();
-    source = new ManualSource<>(env, TypeInformation.of(Trigger.class));
-    // Adds the watermark to mimic the behaviour expected for the input of the 
maintenance tasks
-    triggerStream =
-        source
-            .dataStream()
-            .assignTimestampsAndWatermarks(new 
TableMaintenance.PunctuatedWatermarkStrategy())
-            .name(IGNORED_OPERATOR_NAME)
-            .forceNonParallel();
-    sink = new CollectingSink<>();
+    init(StreamExecutionEnvironment.getExecutionEnvironment());

Review Comment:
   Reverted



##########
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskTestBase.java:
##########
@@ -37,16 +39,54 @@ class MaintenanceTaskTestBase extends OperatorTestBase {
   @RegisterExtension MaintenanceTaskInfraExtension infra = new 
MaintenanceTaskInfraExtension();
 
   void runAndWaitForSuccess(
+      StreamExecutionEnvironment env,
+      ManualSource<Trigger> triggerSource,
+      CollectingSink<TaskResult> collectingSink)
+      throws Exception {
+    runAndWaitForResult(env, triggerSource, collectingSink, false, null, () -> 
true);
+  }
+
+  void runAndWaitForSuccess(
+      StreamExecutionEnvironment env,
+      ManualSource<Trigger> triggerSource,
+      CollectingSink<TaskResult> collectingSink,
+      Supplier<Boolean> waitForCondition)
+      throws Exception {
+    runAndWaitForResult(env, triggerSource, collectingSink, false, null, 
waitForCondition);
+  }
+
+  Configuration runAndWaitForSavepoint(
+      StreamExecutionEnvironment env,
+      ManualSource<Trigger> triggerSource,
+      CollectingSink<TaskResult> collectingSink,
+      File savepointDir)
+      throws Exception {
+
+    return runAndWaitForResult(env, triggerSource, collectingSink, false, 
savepointDir, () -> true);
+  }
+
+  void runAndWaitForFailure(
+      StreamExecutionEnvironment env,
+      ManualSource<Trigger> triggerSource,
+      CollectingSink<TaskResult> collectingSink)
+      throws Exception {
+    runAndWaitForResult(env, triggerSource, collectingSink, true, null, () -> 
true);
+  }
+
+  Configuration runAndWaitForResult(

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to