mxm commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3009894410


##########
docs/docs/flink-writes.md:
##########
@@ -483,11 +483,10 @@ We need the following information (DynamicRecord) for 
every record:
 | `Schema`           | The schema of the record.                               
                                  |
 | `Spec`             | The expected partitioning specification for the record. 
                                  |
 | `RowData`          | The actual row data to be written.                      
                                  |
-| `DistributionMode` | The distribution mode for writing the record (currently 
supports NONE or HASH).           |
+| `DistributionMode` | The distribution mode for writing the record (NONE, 
HASH or optional). When unspecified, the record won't be shuffled at all. |

Review Comment:
   I think it is better to be explicit, despite the new constructor which 
somewhat hides this implementation detail. It's not completely hidden though 
because users can retrieve the value via `DynmicRecord#distributionMode()`.



##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##########
@@ -411,12 +488,32 @@ public DataStreamSink<DynamicRecordInternal> append() {
               .name(operatorName("generator"))
               .returns(type);
 
-      DataStreamSink<DynamicRecordInternal> rowDataDataStreamSink =
+      // Forward writer: chained with generator via forward edge, no data 
shuffle
+      ForwardWriterSink forwardWriterSink =
+          new ForwardWriterSink(
+              sink.catalogLoader, sink.writeProperties, sink.flinkConfig, 
sink.cacheMaximumSize);
+      TypeInformation<CommittableMessage<DynamicWriteResult>> 
writeResultTypeInfo =
+          CommittableMessageTypeInfo.of(sink::getWriteResultSerializer);
+
+      DataStream<CommittableMessage<DynamicWriteResult>> forwardWritten =
+          converted
+              .getSideOutput(
+                  new 
OutputTag<>(DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM, sideOutputType))
+              .transform(
+                  operatorName("Forward-Writer"),
+                  writeResultTypeInfo,
+                  new SinkWriterOperatorFactory<>(forwardWriterSink))
+              .uid(prefixIfNotNull(uidPrefix, "-forward-writer"));
+
+      // Inject forward write results into sink — they'll be unioned in 
addPreCommitTopology
+      sink.setForwardWriteResults(forwardWritten);

Review Comment:
   Why are we not creating the `forwardWritten` DynamicSink in the `build()` 
method. We wouldn't have to set it here then and could pass it directly to 
`DynamicIcebergSink`.



##########
docs/docs/flink-writes.md:
##########
@@ -547,6 +546,28 @@ The Dynamic Iceberg Flink Sink is configured using the 
Builder pattern. Here are
 | `tableCreator(TableCreator creator)` | When DynamicIcebergSink creates new 
Iceberg tables, allows overriding how tables are created - setting custom table 
properties and location based on the table name. |
 | `dropUnusedColumns(boolean enabled)`                 | When enabled, drops 
all columns from the current table schema which are not contained in the input 
schema (see the caveats above on dropping columns).                  |
 
+### Distribution Modes
+
+The `DistributionMode` set on each `DynamicRecord` controls how that record is 
routed from the processor to the writer:
+
+| Mode          | Behavior |
+|---------------|----------|
+| `NONE`        | Records are distributed across writer subtasks in a 
round-robin fashion (or by equality fields if set). |
+| `HASH`        | Records are distributed by partition key (partitioned 
tables) or equality fields (unpartitioned tables). Ensures that records for the 
same partition are handled by the same writer subtask. |
+| (unspecified) | Forward mode: bypasses distribution entirely and sends 
records directly via a forward edge (see below). |

Review Comment:
   ```suggestion
   | `null` | Forward mode: bypasses distribution entirely and sends records 
directly via a forward edge (see below). |
   ```



##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##########
@@ -167,6 +180,55 @@ public SimpleVersionedSerializer<DynamicWriteResult> 
getWriteResultSerializer()
     return new DynamicWriteResultSerializer();
   }
 
+  /**
+   * A lightweight Sink used with {@link SinkWriterOperatorFactory} for the 
forward write path.
+   * Implements {@link SupportsCommitter} so that {@code SinkWriterOperator} 
emits committables
+   * downstream. The committer is never called — committing is handled by the 
main sink.
+   */
+  @VisibleForTesting
+  static class ForwardWriterSink
+      implements Sink<DynamicRecordInternal>, 
SupportsCommitter<DynamicWriteResult> {
+
+    private final CatalogLoader catalogLoader;
+    private final Map<String, String> writeProperties;
+    private final Configuration flinkConfig;
+    private final int cacheMaximumSize;
+
+    ForwardWriterSink(
+        CatalogLoader catalogLoader,
+        Map<String, String> writeProperties,
+        Configuration flinkConfig,
+        int cacheMaximumSize) {
+      this.catalogLoader = catalogLoader;
+      this.writeProperties = writeProperties;
+      this.flinkConfig = flinkConfig;
+      this.cacheMaximumSize = cacheMaximumSize;
+    }
+
+    @Override
+    public SinkWriter<DynamicRecordInternal> createWriter(WriterInitContext 
context) {
+      return new DynamicWriter(
+          catalogLoader.loadCatalog(),
+          writeProperties,
+          flinkConfig,
+          cacheMaximumSize,
+          new DynamicWriterMetrics(context.metricGroup()),
+          context.getTaskInfo().getIndexOfThisSubtask(),
+          context.getTaskInfo().getAttemptNumber());
+    }
+
+    @Override
+    public Committer<DynamicWriteResult> createCommitter(CommitterInitContext 
context) {
+      throw new UnsupportedOperationException(
+          "WriterSink is used only for writing; committing is handled by the 
main sink");
+    }

Review Comment:
   It looks like we are abusing the `Sink` interface and the 
`SupportsCommitter` interface. Do we actually need to use these interfaces at 
all for this class? 



##########
docs/docs/flink-writes.md:
##########
@@ -483,11 +483,10 @@ We need the following information (DynamicRecord) for 
every record:
 | `Schema`           | The schema of the record.                               
                                  |
 | `Spec`             | The expected partitioning specification for the record. 
                                  |
 | `RowData`          | The actual row data to be written.                      
                                  |
-| `DistributionMode` | The distribution mode for writing the record (currently 
supports NONE or HASH).           |
+| `DistributionMode` | The distribution mode for writing the record (NONE, 
HASH or optional). When unspecified, the record won't be shuffled at all. |

Review Comment:
   ```suggestion
   | `DistributionMode` | The distribution mode for writing the record (NONE, 
HASH or `null`). When `null`, the record won't be shuffled at all. |
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to