stevenzwu commented on code in PR #10859:
URL: https://github.com/apache/iceberg/pull/10859#discussion_r1706431358
##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java:
##########
@@ -177,4 +185,288 @@ public void
testOverrideWriteConfigWithUnknownDistributionMode() {
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid distribution mode: UNRECOGNIZED");
}
+
+ @TestTemplate
+ public void testRangeDistributionWithoutSortOrder() throws Exception {
+ table
+ .updateProperties()
+ .set(TableProperties.WRITE_DISTRIBUTION_MODE,
DistributionMode.RANGE.modeName())
+ .commit();
+
+ int numOfCheckpoints = 6;
+ DataStream<Row> dataStream =
+ env.addSource(
+
createRangeDistributionBoundedSource(createCharRows(numOfCheckpoints, 10)),
+ ROW_TYPE_INFO);
+ FlinkSink.Builder builder =
+ FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .writeParallelism(parallelism);
+
+ if (partitioned) {
+ // sort based on partition columns
+ builder.append();
+ env.execute(getClass().getSimpleName());
+
+ table.refresh();
+ // ordered in reverse timeline from the newest snapshot to the oldest
snapshot
+ List<Snapshot> snapshots =
Lists.newArrayList(table.snapshots().iterator());
+ // only keep the snapshots with added data files
+ snapshots =
+ snapshots.stream()
+ .filter(snapshot ->
snapshot.addedDataFiles(table.io()).iterator().hasNext())
+ .collect(Collectors.toList());
+
+ // Sometimes we will have more checkpoints than the bounded source if we
pass the
+ // auto checkpoint interval. Thus producing multiple snapshots.
+ assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
+ } else {
+ // Range distribution requires either sort order or partition spec
defined
+ assertThatThrownBy(builder::append)
Review Comment:
I see your point. I was hoping to also use `TestTemplate` to test different
parallelism of 1 and 2.
We can still split the test to
`testRangeDistributionWithoutSortOrderPartitioned` and
`testRangeDistributionWithoutSortOrderUnpartitioned`. But we probably also need
to move away from `@TestTemplate` and `@BeforeEach`. We can use method
parameterization. but we won't be able to leverage
`@BeforeEach` and `@AfterEach` anymore. I guess we can use `assume` to
exclude the `partitioned` dimension after split?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]