pvary commented on code in PR #11497: URL: https://github.com/apache/iceberg/pull/11497#discussion_r2063950514
########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskInfraExtension.java: ########## @@ -48,16 +48,7 @@ class MaintenanceTaskInfraExtension implements BeforeEachCallback { @Override public void beforeEach(ExtensionContext context) { - env = StreamExecutionEnvironment.getExecutionEnvironment(); - source = new ManualSource<>(env, TypeInformation.of(Trigger.class)); - // Adds the watermark to mimic the behaviour expected for the input of the maintenance tasks - triggerStream = - source - .dataStream() - .assignTimestampsAndWatermarks(new TableMaintenance.PunctuatedWatermarkStrategy()) - .name(IGNORED_OPERATOR_NAME) - .forceNonParallel(); - sink = new CollectingSink<>(); + init(StreamExecutionEnvironment.getExecutionEnvironment()); Review Comment: Reverted ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskTestBase.java: ########## @@ -37,16 +39,54 @@ class MaintenanceTaskTestBase extends OperatorTestBase { @RegisterExtension MaintenanceTaskInfraExtension infra = new MaintenanceTaskInfraExtension(); void runAndWaitForSuccess( + StreamExecutionEnvironment env, + ManualSource<Trigger> triggerSource, + CollectingSink<TaskResult> collectingSink) + throws Exception { + runAndWaitForResult(env, triggerSource, collectingSink, false, null, () -> true); + } + + void runAndWaitForSuccess( + StreamExecutionEnvironment env, + ManualSource<Trigger> triggerSource, + CollectingSink<TaskResult> collectingSink, + Supplier<Boolean> waitForCondition) + throws Exception { + runAndWaitForResult(env, triggerSource, collectingSink, false, null, waitForCondition); + } + + Configuration runAndWaitForSavepoint( + StreamExecutionEnvironment env, + ManualSource<Trigger> triggerSource, + CollectingSink<TaskResult> collectingSink, + File savepointDir) + throws Exception { + + return runAndWaitForResult(env, triggerSource, collectingSink, false, savepointDir, () -> true); + } + + void runAndWaitForFailure( + StreamExecutionEnvironment env, + ManualSource<Trigger> triggerSource, + CollectingSink<TaskResult> collectingSink) + throws Exception { + runAndWaitForResult(env, triggerSource, collectingSink, true, null, () -> true); + } + + Configuration runAndWaitForResult( Review Comment: Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org