swapna267 commented on code in PR #16450:
URL: https://github.com/apache/iceberg/pull/16450#discussion_r3382279790
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableRecordGenerator.java:
##########
@@ -28,12 +34,61 @@
public abstract class DynamicTableRecordGenerator implements
DynamicRecordGenerator<RowData> {
private final RowType rowType;
+ private final Configuration flinkConfiguration;
+ private final Map<String, String> writeProperties;
+ private final Map<String, Integer> fieldNameToPosition;
- public DynamicTableRecordGenerator(RowType rowType) {
+ public DynamicTableRecordGenerator(
+ RowType rowType, Map<String, String> writeProperties, Configuration
flinkConfiguration) {
this.rowType = rowType;
+ this.writeProperties = writeProperties;
+ this.fieldNameToPosition = fieldNameToPositionMapping();
+ this.flinkConfiguration = flinkConfiguration;
}
protected RowType rowType() {
return rowType;
}
+
+ protected Map<String, String> writeProperties() {
+ return writeProperties;
+ }
+
+ protected Configuration flinkConfiguration() {
+ return flinkConfiguration;
+ }
Review Comment:
FlinkDynamicSinkConf doesn't include all the configs needed, for example
database, table overrides.
As there can be other implementations of DynamicTableRecordGenerator , i
think it's better to expose this and let the implementations have the
flexibility . Extend to something like a default fallback schema to come from
the flink configuration.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]