stevenzwu commented on code in PR #10393:
URL: https://github.com/apache/iceberg/pull/10393#discussion_r1619139980
##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java:
##########
@@ -48,10 +52,17 @@ public class
TestIcebergSourceFailoverWithWatermarkExtractor extends TestIceberg
@Override
protected IcebergSource.Builder<RowData> sourceBuilder() {
- return IcebergSource.<RowData>builder()
+ Configuration config = new Configuration();
+
config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT,
128);
+ return IcebergSource.forRowData()
.tableLoader(sourceTableResource.tableLoader())
.watermarkColumn("ts")
- .project(TestFixtures.TS_SCHEMA);
+ .project(TestFixtures.TS_SCHEMA)
+ // Prevent combining splits
+ .set(
Review Comment:
ah. we missed the bundling earlier.
##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java:
##########
@@ -333,28 +336,28 @@ private static void restartTaskManager(Runnable
afterFailAction, MiniCluster min
private static class RecordCounterToFail {
private static AtomicInteger records;
- private static CompletableFuture<Void> fail;
+ private static CountDownLatch countDownLatch;
private static CompletableFuture<Void> continueProcessing;
private static <T> DataStream<T> wrapWithFailureAfter(DataStream<T>
stream, int failAfter) {
records = new AtomicInteger();
- fail = new CompletableFuture<>();
continueProcessing = new CompletableFuture<>();
+ countDownLatch = new CountDownLatch(stream.getParallelism());
Review Comment:
curious how is the countdown latch used by the map operator. will it be
serialized and a new cloned object is deserialized on the operator/subtask
side? if yes, I guess there is no difference with completable future. If no,
this ensures all subtasks are running and processed some records before failure
is triggered.
could this be verified by break point in IDE?
##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java:
##########
@@ -333,28 +336,28 @@ private static void restartTaskManager(Runnable
afterFailAction, MiniCluster min
private static class RecordCounterToFail {
private static AtomicInteger records;
- private static CompletableFuture<Void> fail;
+ private static CountDownLatch countDownLatch;
private static CompletableFuture<Void> continueProcessing;
private static <T> DataStream<T> wrapWithFailureAfter(DataStream<T>
stream, int failAfter) {
records = new AtomicInteger();
- fail = new CompletableFuture<>();
continueProcessing = new CompletableFuture<>();
+ countDownLatch = new CountDownLatch(stream.getParallelism());
return stream.map(
record -> {
boolean reachedFailPoint = records.incrementAndGet() > failAfter;
- boolean notFailedYet = !fail.isDone();
+ boolean notFailedYet = countDownLatch.getCount() != 0;
if (notFailedYet && reachedFailPoint) {
- fail.complete(null);
+ countDownLatch.countDown();
continueProcessing.get();
}
return record;
});
}
- private static void waitToFail() throws ExecutionException,
InterruptedException {
- fail.get();
+ private static void waitToFail() throws InterruptedException {
Review Comment:
I also just realized that this method is probably not accurate anymore with
the `testBoundedWithSavepoint`. Maybe it should be called `waitForCondition`?
also the inner class name `RecordCounterToFail` could be renamed to
`RecordCounterToWait`.
##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java:
##########
@@ -48,10 +52,17 @@ public class
TestIcebergSourceFailoverWithWatermarkExtractor extends TestIceberg
@Override
protected IcebergSource.Builder<RowData> sourceBuilder() {
- return IcebergSource.<RowData>builder()
+ Configuration config = new Configuration();
+
config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT,
128);
Review Comment:
is this config necessary?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]