stevenzwu commented on code in PR #10859:
URL: https://github.com/apache/iceberg/pull/10859#discussion_r1706431358


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java:
##########
@@ -177,4 +185,288 @@ public void 
testOverrideWriteConfigWithUnknownDistributionMode() {
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessage("Invalid distribution mode: UNRECOGNIZED");
   }
+
+  @TestTemplate
+  public void testRangeDistributionWithoutSortOrder() throws Exception {
+    table
+        .updateProperties()
+        .set(TableProperties.WRITE_DISTRIBUTION_MODE, 
DistributionMode.RANGE.modeName())
+        .commit();
+
+    int numOfCheckpoints = 6;
+    DataStream<Row> dataStream =
+        env.addSource(
+            
createRangeDistributionBoundedSource(createCharRows(numOfCheckpoints, 10)),
+            ROW_TYPE_INFO);
+    FlinkSink.Builder builder =
+        FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+            .table(table)
+            .tableLoader(tableLoader)
+            .writeParallelism(parallelism);
+
+    if (partitioned) {
+      // sort based on partition columns
+      builder.append();
+      env.execute(getClass().getSimpleName());
+
+      table.refresh();
+      // ordered in reverse timeline from the newest snapshot to the oldest 
snapshot
+      List<Snapshot> snapshots = 
Lists.newArrayList(table.snapshots().iterator());
+      // only keep the snapshots with added data files
+      snapshots =
+          snapshots.stream()
+              .filter(snapshot -> 
snapshot.addedDataFiles(table.io()).iterator().hasNext())
+              .collect(Collectors.toList());
+
+      // Sometimes we will have more checkpoints than the bounded source if we 
pass the
+      // auto checkpoint interval. Thus producing multiple snapshots.
+      assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints);
+    } else {
+      // Range distribution requires either sort order or partition spec 
defined
+      assertThatThrownBy(builder::append)

Review Comment:
   I see your point. I was hoping to also use `TestTemplate` to test different 
parallelism of 1 and 2. 
   
   We can still split the test to 
`testRangeDistributionWithoutSortOrderPartitioned` and 
`testRangeDistributionWithoutSortOrderUnpartitioned`. But we probably also need 
to move away from `@TestTemplate` and `@BeforeEach`. We can use method 
parameterization. but we won't be able to leverage
   `@BeforeEach` and `@AfterEach` anymore. I guess we can use `assume` to 
exclude the `partitioned` dimension after split?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to