Guosmilesmile commented on code in PR #13608: URL: https://github.com/apache/iceberg/pull/13608#discussion_r2218809163
########## docs/docs/flink-writes.md: ########## @@ -396,3 +396,115 @@ To use SinkV2 based implementation, replace `FlinkSink` with `IcebergSink` in th - The `RANGE` distribution mode is not yet available for the `IcebergSink` - When using `IcebergSink` use `uidSuffix` instead of the `uidPrefix` + + +# Dynamic Iceberg Flink Sink + +Dynamic Flink Iceberg Sink allows: + +1. **Writing to any number of tables** + A single sink can dynamically route records to multiple Iceberg tables. + +2. **Dynamic table creation and updates** + Tables are created and updated based on user-defined routing logic. + +3. **Dynamic schema and partition evolution** + Table schemas and partition specs update during streaming execution. + +All configurations are controlled through the `DynamicRecord` class, eliminating the need for Flink job restarts when requirements change. + +```java + + DynamicIcebergSink.forInput(dataStream) + .generator(new Generator()) + .catalogLoader(catalogLoader) + .writeParallelism(parallelism) + .immediateTableUpdate(immediateUpdate) + .append(); + +``` + +### Dynamic Sink Configuration + +The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here are the key configuration methods: + +| Method | Description | +|------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `set(String property, String value)` | Set any Iceberg write property (e.g., `"write.format"`, `"write.upsert.enabled"`).Check out all the options here: [write-options](flink-configuration.md#write-options) | +| `setAll(Map<String, String> properties)` | Set multiple properties at once | +| `overwrite(boolean enabled)` | Enable overwrite mode | +| `writeParallelism(int parallelism)` | Set writer parallelism | +| `uidPrefix(String prefix)` | Set operator UID prefix | +| `snapshotProperties(Map<String, String> properties)` | Set snapshot metadata properties | +| `toBranch(String branch)` | Write to a specific branch | +| `cacheMaxSize(int maxSize)` | Set cache size for table metadata | +| `cacheRefreshMs(long refreshMs)` | Set cache refresh interval | +| `inputSchemasPerTableCacheMaxSize(int size)` | Set max input schemas to cache per table | +| `immediateTableUpdate(boolean enabled)` | Controls whether table metadata (schema/partition spec) updates immediately (default: false) | + + +### Notes + +- **Range distribution mode**: Currently, the dynamic sink does not support the `RANGE` distribution mode. Review Comment: Right, I have add it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org