mxm commented on code in PR #13072:
URL: https://github.com/apache/iceberg/pull/13072#discussion_r2158263584


##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java:
##########
@@ -71,34 +92,69 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
         !overwrite || context.isBounded(),
         "Unbounded data stream doesn't support overwrite operation.");
 
-    List<String> equalityColumns =
-        
tableSchema.getPrimaryKey().map(UniqueConstraint::getColumns).orElseGet(ImmutableList::of);
+    if (resolvedSchema != null) {
+      List<String> equalityColumns =
+          resolvedSchema
+              .getPrimaryKey()
+              .map(UniqueConstraint::getColumns)
+              .orElseGet(ImmutableList::of);
 
-    return new DataStreamSinkProvider() {
-      @Override
-      public DataStreamSink<?> consumeDataStream(
-          ProviderContext providerContext, DataStream<RowData> dataStream) {
-        if 
(readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK)) {
-          return IcebergSink.forRowData(dataStream)
-              .tableLoader(tableLoader)
-              .tableSchema(tableSchema)
-              .equalityFieldColumns(equalityColumns)
-              .overwrite(overwrite)
-              .setAll(writeProps)
-              .flinkConf(readableConfig)
-              .append();
-        } else {
-          return FlinkSink.forRowData(dataStream)
-              .tableLoader(tableLoader)
-              .tableSchema(tableSchema)
-              .equalityFieldColumns(equalityColumns)
-              .overwrite(overwrite)
-              .setAll(writeProps)
-              .flinkConf(readableConfig)
-              .append();
+      return (DataStreamSinkProvider)
+          (providerContext, dataStream) -> {
+            if (Boolean.TRUE.equals(
+                
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK))) {
+              return IcebergSink.forRowData(dataStream)
+                  .tableLoader(tableLoader)
+                  .resolvedSchema(resolvedSchema)
+                  .equalityFieldColumns(equalityColumns)
+                  .overwrite(overwrite)
+                  .setAll(writeProps)
+                  .flinkConf(readableConfig)
+                  .append();
+            } else {
+              return FlinkSink.forRowData(dataStream)
+                  .tableLoader(tableLoader)
+                  .resolvedSchema(resolvedSchema)
+                  .equalityFieldColumns(equalityColumns)
+                  .overwrite(overwrite)
+                  .setAll(writeProps)
+                  .flinkConf(readableConfig)
+                  .append();
+            }
+          };
+    } else {
+      List<String> equalityColumns =
+          tableSchema
+              .getPrimaryKey()
+              
.map(org.apache.flink.table.legacy.api.constraints.UniqueConstraint::getColumns)
+              .orElseGet(ImmutableList::of);
+
+      return new DataStreamSinkProvider() {
+        @Override
+        public DataStreamSink<?> consumeDataStream(
+            ProviderContext providerContext, DataStream<RowData> dataStream) {
+          if 
(readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK)) {
+            return IcebergSink.forRowData(dataStream)
+                .tableLoader(tableLoader)
+                .tableSchema(tableSchema)
+                .equalityFieldColumns(equalityColumns)
+                .overwrite(overwrite)
+                .setAll(writeProps)
+                .flinkConf(readableConfig)
+                .append();
+          } else {
+            return FlinkSink.forRowData(dataStream)
+                .tableLoader(tableLoader)
+                .tableSchema(tableSchema)
+                .equalityFieldColumns(equalityColumns)
+                .overwrite(overwrite)
+                .setAll(writeProps)
+                .flinkConf(readableConfig)
+                .append();

Review Comment:
   NIT, style is inconsistent here. Use of Lambda for DataStreamProvider and 
use of anonymous classes in else block.



##########
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java:
##########
@@ -369,6 +652,61 @@ public void testConvertFlinkSchemaBaseOnIcebergSchema() {
                 Types.NestedField.optional(102, "string", 
Types.StringType.get())),
             Sets.newHashSet(101));
 
+    ResolvedSchema flinkSchema =
+        new ResolvedSchema(
+            List.of(
+                Column.physical("int", DataTypes.INT().notNull()),
+                Column.physical("string", DataTypes.STRING().nullable())),
+            Collections.emptyList(),
+            UniqueConstraint.primaryKey("pk", List.of("int")));
+    Schema convertedSchema = FlinkSchemaUtil.convert(baseSchema, flinkSchema);
+    assertThat(convertedSchema.asStruct()).isEqualTo(baseSchema.asStruct());
+    assertThat(convertedSchema.identifierFieldIds()).containsExactly(101);
+  }
+
+  @Test
+  public void testConvertFlinkSchemaWithPrimaryKeys() {
+    Schema icebergSchema =
+        new Schema(
+            Lists.newArrayList(
+                Types.NestedField.required(1, "int", Types.IntegerType.get()),
+                Types.NestedField.required(2, "string", 
Types.StringType.get())),
+            Sets.newHashSet(1, 2));
+
+    ResolvedSchema resolvedSchema = 
FlinkSchemaUtil.toResolvedSchema(icebergSchema);
+    assertThat(resolvedSchema.getPrimaryKey())
+        .isPresent()
+        .get()
+        .satisfies(k -> assertThat(k.getColumns()).containsExactly("int", 
"string"));
+  }
+
+  @Test
+  public void testConvertFlinkSchemaWithNestedColumnInPrimaryKeys() {
+    Schema icebergSchema =
+        new Schema(
+            Lists.newArrayList(
+                Types.NestedField.required(
+                    1,
+                    "struct",
+                    Types.StructType.of(
+                        Types.NestedField.required(2, "inner", 
Types.IntegerType.get())))),
+            Sets.newHashSet(2));
+
+    assertThatThrownBy(() -> FlinkSchemaUtil.toResolvedSchema(icebergSchema))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageStartingWith("Invalid primary key")
+        .hasMessageContaining("Column 'struct.inner' does not exist.");
+  }
+
+  @Test
+  public void testConvertFlinkTableSchemaBaseOnIcebergSchema() {
+    Schema baseSchema =

Review Comment:
   These test only change one method call. We could use TestTemplate. On the 
other hand, removing the old tests will be easier once We remove TableSchema.



##########
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java:
##########
@@ -482,6 +536,315 @@ public void testRangeDistributionStatisticsMigration() 
throws Exception {
     }
   }
 
+  @TestTemplate
+  public void 
testOverrideWriteConfigWithUnknownDistributionModeWithTableSchema() {
+    Map<String, String> newProps = Maps.newHashMap();
+    newProps.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), "UNRECOGNIZED");
+
+    List<Row> rows = createRows("");
+    DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), 
ROW_TYPE_INFO);
+
+    FlinkSink.Builder builder =
+        FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_TABLE_SCHEMA)
+            .table(table)
+            .tableLoader(tableLoader)
+            .writeParallelism(writeParallelism)
+            .setAll(newProps);
+
+    assertThatThrownBy(builder::append)
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid distribution mode: UNRECOGNIZED");
+  }
+
+  @TestTemplate
+  public void 
testRangeDistributionWithoutSortOrderUnpartitionedWithTableSchema() {
+    assumeThat(partitioned).isFalse();
+
+    table
+        .updateProperties()
+        .set(TableProperties.WRITE_DISTRIBUTION_MODE, 
DistributionMode.RANGE.modeName())
+        .commit();
+
+    int numOfCheckpoints = 6;
+    DataStream<Row> dataStream =

Review Comment:
   Same with the tests in this class. Duplicating is probably fine until we 
remove TableSchema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to