mxm commented on code in PR #15279:
URL: https://github.com/apache/iceberg/pull/15279#discussion_r2798191612


##########
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java:
##########
@@ -328,4 +351,94 @@ private String createWarehouse() {
       throw new UncheckedIOException(e);
     }
   }
+
+  @TestTemplate
+  public void testCreateDynamicIcebergSink() throws 
DatabaseAlreadyExistException {
+    Map<String, String> tableProps = createTableProps();
+    tableProps.put("use-dynamic-iceberg-sink", "true");
+    tableProps.put(
+        "dynamic-record-generator-impl", 
SimpleRowDataTableRecordGenerator.class.getName());
+    tableProps.put("table.props.key1", "val1");
+
+    FlinkCatalogFactory factory = new FlinkCatalogFactory();
+    FlinkCatalog flinkCatalog =
+        (FlinkCatalog) factory.createCatalog(catalogName, tableProps, new 
Configuration());
+    flinkCatalog.createDatabase(
+        databaseName(), new CatalogDatabaseImpl(Maps.newHashMap(), null), 
true);
+
+    // Create table with dynamic sink enabled
+    sql(
+        "CREATE TABLE %s (id BIGINT, data STRING, database_name STRING, 
table_name STRING) WITH %s",
+        TABLE_NAME + "_dynamic", toWithClause(tableProps));
+
+    // Insert data with database and table information
+    sql(
+        "INSERT INTO %s VALUES (1, 'AAA', '%s', '%s'), (2, 'BBB', '%s', '%s'), 
(3, 'CCC', '%s', '%s')",
+        TABLE_NAME + "_dynamic",
+        databaseName(),
+        tableName(),
+        databaseName(),
+        tableName(),
+        databaseName(),
+        tableName());
+
+    // Verify the catalog was created and table exists
+    ObjectPath objectPath = new ObjectPath(databaseName(), tableName());
+    assertThat(flinkCatalog.tableExists(objectPath)).isTrue();
+    Table table =
+        flinkCatalog
+            .getCatalogLoader()
+            .loadCatalog()
+            .loadTable(TableIdentifier.of(databaseName(), tableName()));
+    assertThat(table.properties()).containsEntry("key1", "val1");

Review Comment:
   Could we also verify the records written to the table?



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java:
##########
@@ -131,6 +140,45 @@ public String factoryIdentifier() {
     return FACTORY_IDENTIFIER;
   }
 
+  private IcebergTableSink getIcebergTableSinkWithDynamicSinkProps(
+      Context context, Configuration flinkConf, Map<String, String> 
writeProps) {
+    String dynamicRecordGeneratorImpl =
+        flinkConf.get(FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL);
+    Preconditions.checkNotNull(
+        dynamicRecordGeneratorImpl,
+        "%s must be specified when use-dynamic-iceberg-sink is true",
+        FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL.key());

Review Comment:
   We have many such tests for Dynamic Sink. Not specifying the record 
generator will probably error when it's being created, but it would still be 
nice to check for the particular error message reported back to the user. I'll 
leave it up to you to add it or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to