swapna267 commented on code in PR #15279:
URL: https://github.com/apache/iceberg/pull/15279#discussion_r2825221271


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java:
##########
@@ -88,26 +88,33 @@ public DynamicTableSink createDynamicTableSink(Context 
context) {
     ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
     ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable();
     Map<String, String> writeProps = resolvedCatalogTable.getOptions();
-    ResolvedSchema resolvedSchema =
-        ResolvedSchema.of(
-            resolvedCatalogTable.getResolvedSchema().getColumns().stream()
-                .filter(Column::isPhysical)
-                .collect(Collectors.toList()));
 
-    TableLoader tableLoader;
-    if (catalog != null) {
-      tableLoader = createTableLoader(catalog, 
objectIdentifier.toObjectPath());
+    Configuration flinkConf = new Configuration();
+    writeProps.forEach(flinkConf::setString);
+
+    boolean useDynamicSink = 
flinkConf.get(FlinkCreateTableOptions.USE_DYNAMIC_ICEBERG_SINK);
+
+    if (useDynamicSink) {
+      return getIcebergTableSinkWithDynamicSinkProps(context, flinkConf, 
writeProps);
     } else {
-      tableLoader =
-          createTableLoader(
-              resolvedCatalogTable,
-              writeProps,
-              objectIdentifier.getDatabaseName(),
-              objectIdentifier.getObjectName());
-    }
+      TableLoader tableLoader;
+      if (catalog != null) {
+        tableLoader = createTableLoader(catalog, 
objectIdentifier.toObjectPath());
+      } else {
+        tableLoader =
+            createTableLoader(
+                resolvedCatalogTable,
+                writeProps,
+                objectIdentifier.getDatabaseName(),
+                objectIdentifier.getObjectName());
+      }
 
-    return new IcebergTableSink(
-        tableLoader, resolvedSchema, context.getConfiguration(), writeProps);
+      return new IcebergTableSink(
+          tableLoader,
+          resolvedCatalogTable.getResolvedSchema(),

Review Comment:
   I moved this into 
[IcebergTableSink](https://github.com/swapna267/iceberg/blob/2638c69089331f77044d132be7c94927677ddd33/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java#L139)
 
   
   As DynamicSink SQL table is a logical table in Flink catalog unlike other 
physical tables to be created in Iceberg, i moved this filtering to only path 
of creation of physical tables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to