rodmeneses commented on code in PR #14063:
URL: https://github.com/apache/iceberg/pull/14063#discussion_r2349793841


##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java:
##########
@@ -87,4 +87,8 @@ private FlinkWriteOptions() {}
   @Experimental
   public static final ConfigOption<Duration> TABLE_REFRESH_INTERVAL =
       
ConfigOptions.key("table-refresh-interval").durationType().noDefaultValue();
+
+  //  specify the uidSuffix to be used for the underlying IcebergSink
+  public static final ConfigOption<String> UID_SUFFIX =
+      ConfigOptions.key("uid-suffix").stringType().noDefaultValue();

Review Comment:
   Thanks @pvary for the analysis. From what I can see in the code,
   there are 2 codepaths for creating the ` Function<String, 
DataStream<RowData>> inputCreator ` instance:
   
   1.
   ```java
   private Builder forRowData(DataStream<RowData> newRowDataInput) {
     this.inputCreator = ignored -> newRowDataInput;
     return this;
   }
   ```
   
   2.
   It happens in forMapperOutputType, in the location you shared above:
   ```java
    this.inputCreator =
             newUidSuffix -> {
               // Input stream order is crucial for some situation(e.g. in cdc 
case). Therefore, we
               // need to set the parallelism of map operator same as its input 
to keep map operator
               // chaining its input, and avoid rebalanced by default.
               SingleOutputStreamOperator<RowData> inputStream =
                   input.map(mapper, 
outputType).setParallelism(input.getParallelism());
               if (newUidSuffix != null) {
                 String uid = String.format("Sink pre-writer mapper: %s", 
newUidSuffix);
                 inputStream.name(uid).uid(uid);
               }
   ```
   
   We call the Function’s apply method only once in:
   ```java
   public DataStreamSink<RowData> append() {
     IcebergSink sink = build();
     String suffix = defaultSuffix(uidSuffix, table.name());
     DataStream<RowData> rowDataInput = inputCreator.apply(suffix);
   ```
   
   This means that the suffix that is passed will never be NULL, making the 
check:
   ```java
   if (newUidSuffix != null) {
   ```
   Completely unnecessary.
   One could wonder whether it makes sense to leave that there or not, but we 
can decide upon that in another context not related to this PR.
   
   As for the second question, the use `defaultSuffix`. What this PR brings, is 
a way to specify the `uidSuffix` used, at the `IcebergSink` level: the suffix 
will be  
   propagated and used to all of the downstream operators that the 
`IcebergSink` creates, including but not limited to `Sink pre-writer mapper: 
{}` and `shuffle-{}`, etc.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to