mxm commented on code in PR #13072: URL: https://github.com/apache/iceberg/pull/13072#discussion_r2158263584
########## flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java: ########## @@ -71,34 +92,69 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { !overwrite || context.isBounded(), "Unbounded data stream doesn't support overwrite operation."); - List<String> equalityColumns = - tableSchema.getPrimaryKey().map(UniqueConstraint::getColumns).orElseGet(ImmutableList::of); + if (resolvedSchema != null) { + List<String> equalityColumns = + resolvedSchema + .getPrimaryKey() + .map(UniqueConstraint::getColumns) + .orElseGet(ImmutableList::of); - return new DataStreamSinkProvider() { - @Override - public DataStreamSink<?> consumeDataStream( - ProviderContext providerContext, DataStream<RowData> dataStream) { - if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK)) { - return IcebergSink.forRowData(dataStream) - .tableLoader(tableLoader) - .tableSchema(tableSchema) - .equalityFieldColumns(equalityColumns) - .overwrite(overwrite) - .setAll(writeProps) - .flinkConf(readableConfig) - .append(); - } else { - return FlinkSink.forRowData(dataStream) - .tableLoader(tableLoader) - .tableSchema(tableSchema) - .equalityFieldColumns(equalityColumns) - .overwrite(overwrite) - .setAll(writeProps) - .flinkConf(readableConfig) - .append(); + return (DataStreamSinkProvider) + (providerContext, dataStream) -> { + if (Boolean.TRUE.equals( + readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK))) { + return IcebergSink.forRowData(dataStream) + .tableLoader(tableLoader) + .resolvedSchema(resolvedSchema) + .equalityFieldColumns(equalityColumns) + .overwrite(overwrite) + .setAll(writeProps) + .flinkConf(readableConfig) + .append(); + } else { + return FlinkSink.forRowData(dataStream) + .tableLoader(tableLoader) + .resolvedSchema(resolvedSchema) + .equalityFieldColumns(equalityColumns) + .overwrite(overwrite) + .setAll(writeProps) + .flinkConf(readableConfig) + .append(); + } + }; + } else { + List<String> equalityColumns = + tableSchema + .getPrimaryKey() + .map(org.apache.flink.table.legacy.api.constraints.UniqueConstraint::getColumns) + .orElseGet(ImmutableList::of); + + return new DataStreamSinkProvider() { + @Override + public DataStreamSink<?> consumeDataStream( + ProviderContext providerContext, DataStream<RowData> dataStream) { + if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK)) { + return IcebergSink.forRowData(dataStream) + .tableLoader(tableLoader) + .tableSchema(tableSchema) + .equalityFieldColumns(equalityColumns) + .overwrite(overwrite) + .setAll(writeProps) + .flinkConf(readableConfig) + .append(); + } else { + return FlinkSink.forRowData(dataStream) + .tableLoader(tableLoader) + .tableSchema(tableSchema) + .equalityFieldColumns(equalityColumns) + .overwrite(overwrite) + .setAll(writeProps) + .flinkConf(readableConfig) + .append(); Review Comment: NIT, style is inconsistent here. Use of Lambda for DataStreamProvider and use of anonymous classes in else block. ########## flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java: ########## @@ -369,6 +652,61 @@ public void testConvertFlinkSchemaBaseOnIcebergSchema() { Types.NestedField.optional(102, "string", Types.StringType.get())), Sets.newHashSet(101)); + ResolvedSchema flinkSchema = + new ResolvedSchema( + List.of( + Column.physical("int", DataTypes.INT().notNull()), + Column.physical("string", DataTypes.STRING().nullable())), + Collections.emptyList(), + UniqueConstraint.primaryKey("pk", List.of("int"))); + Schema convertedSchema = FlinkSchemaUtil.convert(baseSchema, flinkSchema); + assertThat(convertedSchema.asStruct()).isEqualTo(baseSchema.asStruct()); + assertThat(convertedSchema.identifierFieldIds()).containsExactly(101); + } + + @Test + public void testConvertFlinkSchemaWithPrimaryKeys() { + Schema icebergSchema = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "int", Types.IntegerType.get()), + Types.NestedField.required(2, "string", Types.StringType.get())), + Sets.newHashSet(1, 2)); + + ResolvedSchema resolvedSchema = FlinkSchemaUtil.toResolvedSchema(icebergSchema); + assertThat(resolvedSchema.getPrimaryKey()) + .isPresent() + .get() + .satisfies(k -> assertThat(k.getColumns()).containsExactly("int", "string")); + } + + @Test + public void testConvertFlinkSchemaWithNestedColumnInPrimaryKeys() { + Schema icebergSchema = + new Schema( + Lists.newArrayList( + Types.NestedField.required( + 1, + "struct", + Types.StructType.of( + Types.NestedField.required(2, "inner", Types.IntegerType.get())))), + Sets.newHashSet(2)); + + assertThatThrownBy(() -> FlinkSchemaUtil.toResolvedSchema(icebergSchema)) + .isInstanceOf(ValidationException.class) + .hasMessageStartingWith("Invalid primary key") + .hasMessageContaining("Column 'struct.inner' does not exist."); + } + + @Test + public void testConvertFlinkTableSchemaBaseOnIcebergSchema() { + Schema baseSchema = Review Comment: These test only change one method call. We could use TestTemplate. On the other hand, removing the old tests will be easier once We remove TableSchema. ########## flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java: ########## @@ -482,6 +536,315 @@ public void testRangeDistributionStatisticsMigration() throws Exception { } } + @TestTemplate + public void testOverrideWriteConfigWithUnknownDistributionModeWithTableSchema() { + Map<String, String> newProps = Maps.newHashMap(); + newProps.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), "UNRECOGNIZED"); + + List<Row> rows = createRows(""); + DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), ROW_TYPE_INFO); + + FlinkSink.Builder builder = + FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_TABLE_SCHEMA) + .table(table) + .tableLoader(tableLoader) + .writeParallelism(writeParallelism) + .setAll(newProps); + + assertThatThrownBy(builder::append) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid distribution mode: UNRECOGNIZED"); + } + + @TestTemplate + public void testRangeDistributionWithoutSortOrderUnpartitionedWithTableSchema() { + assumeThat(partitioned).isFalse(); + + table + .updateProperties() + .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName()) + .commit(); + + int numOfCheckpoints = 6; + DataStream<Row> dataStream = Review Comment: Same with the tests in this class. Duplicating is probably fine until we remove TableSchema. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org