mxm commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3009894410
##########
docs/docs/flink-writes.md:
##########
@@ -483,11 +483,10 @@ We need the following information (DynamicRecord) for
every record:
| `Schema` | The schema of the record.
|
| `Spec` | The expected partitioning specification for the record.
|
| `RowData` | The actual row data to be written.
|
-| `DistributionMode` | The distribution mode for writing the record (currently
supports NONE or HASH). |
+| `DistributionMode` | The distribution mode for writing the record (NONE,
HASH or optional). When unspecified, the record won't be shuffled at all. |
Review Comment:
I think it is better to be explicit, despite the new constructor which
somewhat hides this implementation detail. It's not completely hidden though
because users can retrieve the value via `DynmicRecord#distributionMode()`.
##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##########
@@ -411,12 +488,32 @@ public DataStreamSink<DynamicRecordInternal> append() {
.name(operatorName("generator"))
.returns(type);
- DataStreamSink<DynamicRecordInternal> rowDataDataStreamSink =
+ // Forward writer: chained with generator via forward edge, no data
shuffle
+ ForwardWriterSink forwardWriterSink =
+ new ForwardWriterSink(
+ sink.catalogLoader, sink.writeProperties, sink.flinkConfig,
sink.cacheMaximumSize);
+ TypeInformation<CommittableMessage<DynamicWriteResult>>
writeResultTypeInfo =
+ CommittableMessageTypeInfo.of(sink::getWriteResultSerializer);
+
+ DataStream<CommittableMessage<DynamicWriteResult>> forwardWritten =
+ converted
+ .getSideOutput(
+ new
OutputTag<>(DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM, sideOutputType))
+ .transform(
+ operatorName("Forward-Writer"),
+ writeResultTypeInfo,
+ new SinkWriterOperatorFactory<>(forwardWriterSink))
+ .uid(prefixIfNotNull(uidPrefix, "-forward-writer"));
+
+ // Inject forward write results into sink — they'll be unioned in
addPreCommitTopology
+ sink.setForwardWriteResults(forwardWritten);
Review Comment:
Why are we not creating the `forwardWritten` DynamicSink in the `build()`
method. We wouldn't have to set it here then and could pass it directly to
`DynamicIcebergSink`.
##########
docs/docs/flink-writes.md:
##########
@@ -547,6 +546,28 @@ The Dynamic Iceberg Flink Sink is configured using the
Builder pattern. Here are
| `tableCreator(TableCreator creator)` | When DynamicIcebergSink creates new
Iceberg tables, allows overriding how tables are created - setting custom table
properties and location based on the table name. |
| `dropUnusedColumns(boolean enabled)` | When enabled, drops
all columns from the current table schema which are not contained in the input
schema (see the caveats above on dropping columns). |
+### Distribution Modes
+
+The `DistributionMode` set on each `DynamicRecord` controls how that record is
routed from the processor to the writer:
+
+| Mode | Behavior |
+|---------------|----------|
+| `NONE` | Records are distributed across writer subtasks in a
round-robin fashion (or by equality fields if set). |
+| `HASH` | Records are distributed by partition key (partitioned
tables) or equality fields (unpartitioned tables). Ensures that records for the
same partition are handled by the same writer subtask. |
+| (unspecified) | Forward mode: bypasses distribution entirely and sends
records directly via a forward edge (see below). |
Review Comment:
```suggestion
| `null` | Forward mode: bypasses distribution entirely and sends records
directly via a forward edge (see below). |
```
##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##########
@@ -167,6 +180,55 @@ public SimpleVersionedSerializer<DynamicWriteResult>
getWriteResultSerializer()
return new DynamicWriteResultSerializer();
}
+ /**
+ * A lightweight Sink used with {@link SinkWriterOperatorFactory} for the
forward write path.
+ * Implements {@link SupportsCommitter} so that {@code SinkWriterOperator}
emits committables
+ * downstream. The committer is never called — committing is handled by the
main sink.
+ */
+ @VisibleForTesting
+ static class ForwardWriterSink
+ implements Sink<DynamicRecordInternal>,
SupportsCommitter<DynamicWriteResult> {
+
+ private final CatalogLoader catalogLoader;
+ private final Map<String, String> writeProperties;
+ private final Configuration flinkConfig;
+ private final int cacheMaximumSize;
+
+ ForwardWriterSink(
+ CatalogLoader catalogLoader,
+ Map<String, String> writeProperties,
+ Configuration flinkConfig,
+ int cacheMaximumSize) {
+ this.catalogLoader = catalogLoader;
+ this.writeProperties = writeProperties;
+ this.flinkConfig = flinkConfig;
+ this.cacheMaximumSize = cacheMaximumSize;
+ }
+
+ @Override
+ public SinkWriter<DynamicRecordInternal> createWriter(WriterInitContext
context) {
+ return new DynamicWriter(
+ catalogLoader.loadCatalog(),
+ writeProperties,
+ flinkConfig,
+ cacheMaximumSize,
+ new DynamicWriterMetrics(context.metricGroup()),
+ context.getTaskInfo().getIndexOfThisSubtask(),
+ context.getTaskInfo().getAttemptNumber());
+ }
+
+ @Override
+ public Committer<DynamicWriteResult> createCommitter(CommitterInitContext
context) {
+ throw new UnsupportedOperationException(
+ "WriterSink is used only for writing; committing is handled by the
main sink");
+ }
Review Comment:
It looks like we are abusing the `Sink` interface and the
`SupportsCommitter` interface. Do we actually need to use these interfaces at
all for this class?
##########
docs/docs/flink-writes.md:
##########
@@ -483,11 +483,10 @@ We need the following information (DynamicRecord) for
every record:
| `Schema` | The schema of the record.
|
| `Spec` | The expected partitioning specification for the record.
|
| `RowData` | The actual row data to be written.
|
-| `DistributionMode` | The distribution mode for writing the record (currently
supports NONE or HASH). |
+| `DistributionMode` | The distribution mode for writing the record (NONE,
HASH or optional). When unspecified, the record won't be shuffled at all. |
Review Comment:
```suggestion
| `DistributionMode` | The distribution mode for writing the record (NONE,
HASH or `null`). When `null`, the record won't be shuffled at all. |
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]