rodmeneses commented on code in PR #14063:
URL: https://github.com/apache/iceberg/pull/14063#discussion_r2349793841
##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java:
##########
@@ -87,4 +87,8 @@ private FlinkWriteOptions() {}
@Experimental
public static final ConfigOption<Duration> TABLE_REFRESH_INTERVAL =
ConfigOptions.key("table-refresh-interval").durationType().noDefaultValue();
+
+ // specify the uidSuffix to be used for the underlying IcebergSink
+ public static final ConfigOption<String> UID_SUFFIX =
+ ConfigOptions.key("uid-suffix").stringType().noDefaultValue();
Review Comment:
Thanks @pvary for the analysis. From what I can see in the code,
there are 2 codepaths for creating the ` Function<String,
DataStream<RowData>> inputCreator ` instance:
1.
```java
private Builder forRowData(DataStream<RowData> newRowDataInput) {
this.inputCreator = ignored -> newRowDataInput;
return this;
}
```
2.
It happens in forMapperOutputType, in the location you shared above:
```java
this.inputCreator =
newUidSuffix -> {
// Input stream order is crucial for some situation(e.g. in cdc
case). Therefore, we
// need to set the parallelism of map operator same as its input
to keep map operator
// chaining its input, and avoid rebalanced by default.
SingleOutputStreamOperator<RowData> inputStream =
input.map(mapper,
outputType).setParallelism(input.getParallelism());
if (newUidSuffix != null) {
String uid = String.format("Sink pre-writer mapper: %s",
newUidSuffix);
inputStream.name(uid).uid(uid);
}
```
We call the Function’s apply method only once in:
```java
public DataStreamSink<RowData> append() {
IcebergSink sink = build();
String suffix = defaultSuffix(uidSuffix, table.name());
DataStream<RowData> rowDataInput = inputCreator.apply(suffix);
```
This means that the suffix that is passed will never be NULL, making the
check:
```java
if (newUidSuffix != null) {
```
Completely unnecessary.
One could wonder whether it makes sense to leave that there or not, but we
can decide upon that in another context not related to this PR.
As for the second question, the use `defaultSuffix`. What this PR brings, is
a way to specify the `uidSuffix` used, at the `IcebergSink` level: the suffix
will be
propagated and used to all of the downstream operators that the
`IcebergSink` creates, including but not limited to `Sink pre-writer mapper:
{}` and `shuffle-{}`, etc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]