pvary commented on code in PR #11497:
URL: https://github.com/apache/iceberg/pull/11497#discussion_r2063950514
##########
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskInfraExtension.java:
##########
@@ -48,16 +48,7 @@ class MaintenanceTaskInfraExtension implements
BeforeEachCallback {
@Override
public void beforeEach(ExtensionContext context) {
- env = StreamExecutionEnvironment.getExecutionEnvironment();
- source = new ManualSource<>(env, TypeInformation.of(Trigger.class));
- // Adds the watermark to mimic the behaviour expected for the input of the
maintenance tasks
- triggerStream =
- source
- .dataStream()
- .assignTimestampsAndWatermarks(new
TableMaintenance.PunctuatedWatermarkStrategy())
- .name(IGNORED_OPERATOR_NAME)
- .forceNonParallel();
- sink = new CollectingSink<>();
+ init(StreamExecutionEnvironment.getExecutionEnvironment());
Review Comment:
Reverted
##########
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskTestBase.java:
##########
@@ -37,16 +39,54 @@ class MaintenanceTaskTestBase extends OperatorTestBase {
@RegisterExtension MaintenanceTaskInfraExtension infra = new
MaintenanceTaskInfraExtension();
void runAndWaitForSuccess(
+ StreamExecutionEnvironment env,
+ ManualSource<Trigger> triggerSource,
+ CollectingSink<TaskResult> collectingSink)
+ throws Exception {
+ runAndWaitForResult(env, triggerSource, collectingSink, false, null, () ->
true);
+ }
+
+ void runAndWaitForSuccess(
+ StreamExecutionEnvironment env,
+ ManualSource<Trigger> triggerSource,
+ CollectingSink<TaskResult> collectingSink,
+ Supplier<Boolean> waitForCondition)
+ throws Exception {
+ runAndWaitForResult(env, triggerSource, collectingSink, false, null,
waitForCondition);
+ }
+
+ Configuration runAndWaitForSavepoint(
+ StreamExecutionEnvironment env,
+ ManualSource<Trigger> triggerSource,
+ CollectingSink<TaskResult> collectingSink,
+ File savepointDir)
+ throws Exception {
+
+ return runAndWaitForResult(env, triggerSource, collectingSink, false,
savepointDir, () -> true);
+ }
+
+ void runAndWaitForFailure(
+ StreamExecutionEnvironment env,
+ ManualSource<Trigger> triggerSource,
+ CollectingSink<TaskResult> collectingSink)
+ throws Exception {
+ runAndWaitForResult(env, triggerSource, collectingSink, true, null, () ->
true);
+ }
+
+ Configuration runAndWaitForResult(
Review Comment:
Removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]