mxm commented on code in PR #13260: URL: https://github.com/apache/iceberg/pull/13260#discussion_r2138136779
########## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java: ########## @@ -771,16 +769,23 @@ private DataStream<RowData> distributeDataStreamByHashDistributionMode( } } + private int resolveParallelismFor(DataStream<RowData> input) { + // if the writeParallelism is not specified, we set the default to the input parallelism to + // encourage chaining. + if (flinkWriteConf.writeParallelism() == null) { + return input.getParallelism(); + } else { + return flinkWriteConf.writeParallelism(); + } Review Comment: NIT ```suggestion return Optional.ofNullable(flinkWriteConf.writeParallelism()) .orElse(input.getParallelism()); ``` ########## flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java: ########## @@ -388,6 +388,28 @@ void testErrorOnNullForRequiredField() throws Exception { assertThatThrownBy(() -> env.execute()).hasRootCauseInstanceOf(NullPointerException.class); } + @TestTemplate + void testDefaultWriteParallelism() throws Exception { + // since source dataStream parallelism used in the test is 1, we focus on parallelism > 1 so the + // assertion makes sense + assumeThat(parallelism).isGreaterThan(1); Review Comment: Why don't we test the `parallelism == 1` case? It certainly doesn't hurt. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org