stevenzwu commented on code in PR #10859: URL: https://github.com/apache/iceberg/pull/10859#discussion_r1702143575
########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java: ########## @@ -349,18 +396,20 @@ private <T> DataStreamSink<T> chainIcebergOperators() { // Find out the equality field id list based on the user-provided equality field column names. List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds(); - // Convert the requested flink table schema to flink row type. RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema); + int writerParallelism = Review Comment: writer parallelism is also needed by `distributeDataStream` method as the downstream operator parallelism for range partitioner. ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java: ########## @@ -60,6 +61,14 @@ private FlinkWriteOptions() {} public static final ConfigOption<String> DISTRIBUTION_MODE = ConfigOptions.key("distribution-mode").stringType().noDefaultValue(); + public static final ConfigOption<String> RANGE_DISTRIBUTION_STATISTICS_TYPE = + ConfigOptions.key("range-distribution-statistics-type") + .stringType() + .defaultValue(StatisticsType.Auto.name()); + + public static final ConfigOption<Double> CLOSE_FILE_COST_WEIGHT_PERCENTAGE = + ConfigOptions.key("close-file-cost-weight-percentage").doubleType().defaultValue(0.02d); Review Comment: open to feedback on the config name, type, and default value. `0.02` means 2% of close file weight on the target weight per task. it avoids placing more than 50 files in one writer task. ########## docs/docs/flink-configuration.md: ########## @@ -146,14 +146,16 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */ ... ``` -| Flink option | Default | Description | -| ---------------------- | ------------------------------------------ | ------------------------------------------------------------ | -| write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc | -| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes | -| upsert-enabled | Table write.upsert.enabled | Overrides this table's write.upsert.enabled | -| overwrite-enabled | false | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. | -| distribution-mode | Table write.distribution-mode | Overrides this table's write.distribution-mode | -| compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write | -| compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write | -| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write | -| write-parallelism | Upstream operator parallelism | Overrides the writer parallelism | +| Flink option | Default | Description | +|------------------------------------|--------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc | +| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes | +| upsert-enabled | Table write.upsert.enabled | Overrides this table's write.upsert.enabled | +| overwrite-enabled | false | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. | +| distribution-mode | Table write.distribution-mode | Overrides this table's write.distribution-mode. RANGE distribution is in experimental status. | +| range-distribution-statistics-type | Auto | Range distribution data statistics collection type: Map, Sketch, Auto. Map type collects accurate sampling count for every single key. It should be used for low cardinality scenarios (like hundreds or thousands). Sketch type constructs a uniform random sampling via reservoir sampling. It fits well for high cardinality scenarios (like millions), as memory footprint is kept low. Auto starts with Maps statistics. But if cardinality is detected higher than a threshold (currently 10,000), statistics are automatically switched to Sketch. | Review Comment: only added these two rows line 156 and 157. Rest of are formatting change due to wider description column in these two rows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org