mxm commented on code in PR #13608:
URL: https://github.com/apache/iceberg/pull/13608#discussion_r2218766727


##########
docs/docs/flink-writes.md:
##########
@@ -396,3 +396,115 @@ To use SinkV2 based implementation, replace `FlinkSink` 
with `IcebergSink` in th
 
      - The `RANGE` distribution mode is not yet available for the `IcebergSink`
      - When using `IcebergSink` use `uidSuffix` instead of the `uidPrefix`
+
+
+# Dynamic Iceberg Flink Sink
+
+Dynamic Flink Iceberg Sink allows:
+
+1. **Writing to any number of tables**  
+   A single sink can dynamically route records to multiple Iceberg tables.
+
+2. **Dynamic table creation and updates**  
+   Tables are created and updated based on user-defined routing logic.
+
+3. **Dynamic schema and partition evolution**  
+   Table schemas and partition specs update during streaming execution.
+
+All configurations are controlled through the `DynamicRecord` class, 
eliminating the need for Flink job restarts when requirements change.
+
+```java
+    
+    DynamicIcebergSink.forInput(dataStream)
+            .generator(new Generator())
+            .catalogLoader(catalogLoader)
+            .writeParallelism(parallelism)
+            .immediateTableUpdate(immediateUpdate)
+            .append();
+        
+```
+
+### Dynamic Sink Configuration
+
+The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here 
are the key configuration methods:
+
+| Method                                               | Description           
                                                                                
                                                                  |
+|------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `set(String property, String value)`                 | Set any Iceberg write 
property (e.g., `"write.format"`, `"write.upsert.enabled"`).Check out all the 
options here: [write-options](flink-configuration.md#write-options) |
+| `setAll(Map<String, String> properties)`             | Set multiple 
properties at once                                                              
                                                                           |
+| `overwrite(boolean enabled)`                         | Enable overwrite mode 
                                                                                
                                                                  |
+| `writeParallelism(int parallelism)`                  | Set writer 
parallelism                                                                     
                                                                             |
+| `uidPrefix(String prefix)`                           | Set operator UID 
prefix                                                                          
                                                                       |
+| `snapshotProperties(Map<String, String> properties)` | Set snapshot metadata 
properties                                                                      
                                                                  |
+| `toBranch(String branch)`                            | Write to a specific 
branch                                                                          
                                                                    |
+| `cacheMaxSize(int maxSize)`                          | Set cache size for 
table metadata                                                                  
                                                                     |
+| `cacheRefreshMs(long refreshMs)`                     | Set cache refresh 
interval                                                                        
                                                                      |
+| `inputSchemasPerTableCacheMaxSize(int size)`         | Set max input schemas 
to cache per table                                                              
                                                                  |
+| `immediateTableUpdate(boolean enabled)`              | Controls whether 
table metadata (schema/partition spec) updates immediately (default: false)     
                                                                                
                                                                              |
+
+
+### Notes
+
+- **Range distribution mode**: Currently, the dynamic sink does not support 
the `RANGE` distribution mode.
+- **Property Precedence Note**: When conflicts occur between table properties 
and sink properties, the table properties will override the sink properties 
configuration.
+
+#### Configuration Example
+
+```java
+DynamicIcebergSink.Builder<RowData> builder = 
DynamicIcebergSink.forInput(inputStream);
+
+// Set common properties
+builder
+    .set("write.format", "parquet")
+    .set("write.upsert.enabled", "true")
+    .set("write.distribution-mode", "hash");
+
+// Set specific options
+builder
+    .writeParallelism(4)
+    .uidPrefix("dynamic-sink")
+    .cacheMaxSize(500)
+    .cacheRefreshMs(5000);
+
+// Add generator and append sink
+builder.generator(new CustomRecordGenerator());
+builder.append();
+```
+
+#### Dynamic Routing Configuration
+
+Dynamic table routing can be customized by implementing the 
`DynamicRecordGenerator` interface:
+
+```java
+public class CustomRecordGenerator implements DynamicRecordGenerator<RowData> {
+    @Override
+    public DynamicRecord generate(RowData row) {
+        DynamicRecord record = new DynamicRecord();
+        // Set table name based on business logic
+        TableIdentifier tableIdentifier = TableIdentifier.of(database, 
tableName);
+        record.setTableIdentifier(tableIdentifier);
+        record.setData(row);
+        // Set the maximum number of parallel writers for a given 
table/branch/schema/spec
+        record.writeParallelism(2);
+        return record;
+    }
+}
+
+// Set custom record generator when building the sink
+DynamicIcebergSink.Builder<RowData> builder = 
DynamicIcebergSink.forInput(inputStream);
+builder.generator(new CustomRecordGenerator());
+// ... other config ...
+builder.append();
+```
+The user should provide a converter which converts the input record to a 
DynamicRecord.
+We need the following information (DynamicRecord) for every record:
+
+| Property           | Description                                             
                                                                                
                                |
+|--------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `TableIdentifier`  | The target table to which the record will be written. |
+| `Branch`           | The target branch for writing the record (optional).    
                                                                                
                                                  |
+| `Schema`           | The schema of the record.                               
                                                                                
                                  |
+| `Spec`             | The expected partitioning specification for the record. 
                                                                                
                                                               |
+| `RowData`          | The actual row data to be written.                      
                                                                                
                                         |
+| `DistributionMode` | The distribution mode for writing the record (currently 
supports NONE or HASH).                                                         
                                                                                
                                                                                
     |
+| `Parallelism`      | The maximum number of parallel writers for a given 
table/branch/schema/spec (WriteTarget).                                         
                                                                                
             |

Review Comment:
   upsertMode and equality fields are missing here. We can mention these as 
optional.



##########
docs/docs/flink-writes.md:
##########
@@ -396,3 +396,115 @@ To use SinkV2 based implementation, replace `FlinkSink` 
with `IcebergSink` in th
 
      - The `RANGE` distribution mode is not yet available for the `IcebergSink`
      - When using `IcebergSink` use `uidSuffix` instead of the `uidPrefix`
+
+
+# Dynamic Iceberg Flink Sink
+
+Dynamic Flink Iceberg Sink allows:
+
+1. **Writing to any number of tables**  
+   A single sink can dynamically route records to multiple Iceberg tables.
+
+2. **Dynamic table creation and updates**  
+   Tables are created and updated based on user-defined routing logic.
+
+3. **Dynamic schema and partition evolution**  
+   Table schemas and partition specs update during streaming execution.
+
+All configurations are controlled through the `DynamicRecord` class, 
eliminating the need for Flink job restarts when requirements change.
+
+```java
+    
+    DynamicIcebergSink.forInput(dataStream)
+            .generator(new Generator())
+            .catalogLoader(catalogLoader)
+            .writeParallelism(parallelism)
+            .immediateTableUpdate(immediateUpdate)
+            .append();
+        
+```
+
+### Dynamic Sink Configuration
+
+The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here 
are the key configuration methods:
+
+| Method                                               | Description           
                                                                                
                                                                  |
+|------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `set(String property, String value)`                 | Set any Iceberg write 
property (e.g., `"write.format"`, `"write.upsert.enabled"`).Check out all the 
options here: [write-options](flink-configuration.md#write-options) |
+| `setAll(Map<String, String> properties)`             | Set multiple 
properties at once                                                              
                                                                           |
+| `overwrite(boolean enabled)`                         | Enable overwrite mode 
                                                                                
                                                                  |
+| `writeParallelism(int parallelism)`                  | Set writer 
parallelism                                                                     
                                                                             |
+| `uidPrefix(String prefix)`                           | Set operator UID 
prefix                                                                          
                                                                       |
+| `snapshotProperties(Map<String, String> properties)` | Set snapshot metadata 
properties                                                                      
                                                                  |
+| `toBranch(String branch)`                            | Write to a specific 
branch                                                                          
                                                                    |
+| `cacheMaxSize(int maxSize)`                          | Set cache size for 
table metadata                                                                  
                                                                     |
+| `cacheRefreshMs(long refreshMs)`                     | Set cache refresh 
interval                                                                        
                                                                      |
+| `inputSchemasPerTableCacheMaxSize(int size)`         | Set max input schemas 
to cache per table                                                              
                                                                  |
+| `immediateTableUpdate(boolean enabled)`              | Controls whether 
table metadata (schema/partition spec) updates immediately (default: false)     
                                                                                
                                                                              |
+
+
+### Notes
+
+- **Range distribution mode**: Currently, the dynamic sink does not support 
the `RANGE` distribution mode.

Review Comment:
   Let's mention we currently fall back to HASH if RANGED is used.



##########
docs/docs/flink-writes.md:
##########
@@ -396,3 +396,115 @@ To use SinkV2 based implementation, replace `FlinkSink` 
with `IcebergSink` in th
 
      - The `RANGE` distribution mode is not yet available for the `IcebergSink`
      - When using `IcebergSink` use `uidSuffix` instead of the `uidPrefix`
+
+
+# Dynamic Iceberg Flink Sink
+
+Dynamic Flink Iceberg Sink allows:
+
+1. **Writing to any number of tables**  
+   A single sink can dynamically route records to multiple Iceberg tables.
+
+2. **Dynamic table creation and updates**  
+   Tables are created and updated based on user-defined routing logic.
+
+3. **Dynamic schema and partition evolution**  
+   Table schemas and partition specs update during streaming execution.
+
+All configurations are controlled through the `DynamicRecord` class, 
eliminating the need for Flink job restarts when requirements change.
+
+```java
+    
+    DynamicIcebergSink.forInput(dataStream)
+            .generator(new Generator())
+            .catalogLoader(catalogLoader)
+            .writeParallelism(parallelism)
+            .immediateTableUpdate(immediateUpdate)
+            .append();
+        
+```
+
+### Dynamic Sink Configuration
+
+The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here 
are the key configuration methods:
+
+| Method                                               | Description           
                                                                                
                                                                  |
+|------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `set(String property, String value)`                 | Set any Iceberg write 
property (e.g., `"write.format"`, `"write.upsert.enabled"`).Check out all the 
options here: [write-options](flink-configuration.md#write-options) |
+| `setAll(Map<String, String> properties)`             | Set multiple 
properties at once                                                              
                                                                           |
+| `overwrite(boolean enabled)`                         | Enable overwrite mode 
                                                                                
                                                                  |

Review Comment:
   Can we move all properties configs to the bottom?



##########
docs/docs/flink-writes.md:
##########
@@ -396,3 +396,115 @@ To use SinkV2 based implementation, replace `FlinkSink` 
with `IcebergSink` in th
 
      - The `RANGE` distribution mode is not yet available for the `IcebergSink`
      - When using `IcebergSink` use `uidSuffix` instead of the `uidPrefix`
+
+
+# Dynamic Iceberg Flink Sink
+
+Dynamic Flink Iceberg Sink allows:
+
+1. **Writing to any number of tables**  
+   A single sink can dynamically route records to multiple Iceberg tables.
+
+2. **Dynamic table creation and updates**  
+   Tables are created and updated based on user-defined routing logic.
+
+3. **Dynamic schema and partition evolution**  
+   Table schemas and partition specs update during streaming execution.
+
+All configurations are controlled through the `DynamicRecord` class, 
eliminating the need for Flink job restarts when requirements change.
+
+```java
+    
+    DynamicIcebergSink.forInput(dataStream)
+            .generator(new Generator())
+            .catalogLoader(catalogLoader)
+            .writeParallelism(parallelism)
+            .immediateTableUpdate(immediateUpdate)
+            .append();
+        
+```
+
+### Dynamic Sink Configuration
+
+The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here 
are the key configuration methods:
+
+| Method                                               | Description           
                                                                                
                                                                  |
+|------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `set(String property, String value)`                 | Set any Iceberg write 
property (e.g., `"write.format"`, `"write.upsert.enabled"`).Check out all the 
options here: [write-options](flink-configuration.md#write-options) |
+| `setAll(Map<String, String> properties)`             | Set multiple 
properties at once                                                              
                                                                           |
+| `overwrite(boolean enabled)`                         | Enable overwrite mode 
                                                                                
                                                                  |
+| `writeParallelism(int parallelism)`                  | Set writer 
parallelism                                                                     
                                                                             |
+| `uidPrefix(String prefix)`                           | Set operator UID 
prefix                                                                          
                                                                       |
+| `snapshotProperties(Map<String, String> properties)` | Set snapshot metadata 
properties                                                                      
                                                                  |
+| `toBranch(String branch)`                            | Write to a specific 
branch                                                                          
                                                                    |
+| `cacheMaxSize(int maxSize)`                          | Set cache size for 
table metadata                                                                  
                                                                     |
+| `cacheRefreshMs(long refreshMs)`                     | Set cache refresh 
interval                                                                        
                                                                      |
+| `inputSchemasPerTableCacheMaxSize(int size)`         | Set max input schemas 
to cache per table                                                              
                                                                  |
+| `immediateTableUpdate(boolean enabled)`              | Controls whether 
table metadata (schema/partition spec) updates immediately (default: false)     
                                                                                
                                                                              |
+
+
+### Notes
+
+- **Range distribution mode**: Currently, the dynamic sink does not support 
the `RANGE` distribution mode.
+- **Property Precedence Note**: When conflicts occur between table properties 
and sink properties, the table properties will override the sink properties 
configuration.
+
+#### Configuration Example
+
+```java
+DynamicIcebergSink.Builder<RowData> builder = 
DynamicIcebergSink.forInput(inputStream);
+
+// Set common properties
+builder
+    .set("write.format", "parquet")
+    .set("write.upsert.enabled", "true")
+    .set("write.distribution-mode", "hash");

Review Comment:
   I don't think we should advertise these common options. I believe those are 
not actually supported.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to