stevenzwu commented on code in PR #10859:
URL: https://github.com/apache/iceberg/pull/10859#discussion_r1702143575
##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java:
##########
@@ -349,18 +396,20 @@ private <T> DataStreamSink<T> chainIcebergOperators() {
// Find out the equality field id list based on the user-provided
equality field column names.
List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds();
- // Convert the requested flink table schema to flink row type.
RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
+ int writerParallelism =
Review Comment:
writer parallelism is also needed by `distributeDataStream` method as the
downstream operator parallelism for range partitioner.
##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java:
##########
@@ -60,6 +61,14 @@ private FlinkWriteOptions() {}
public static final ConfigOption<String> DISTRIBUTION_MODE =
ConfigOptions.key("distribution-mode").stringType().noDefaultValue();
+ public static final ConfigOption<String> RANGE_DISTRIBUTION_STATISTICS_TYPE =
+ ConfigOptions.key("range-distribution-statistics-type")
+ .stringType()
+ .defaultValue(StatisticsType.Auto.name());
+
+ public static final ConfigOption<Double> CLOSE_FILE_COST_WEIGHT_PERCENTAGE =
+
ConfigOptions.key("close-file-cost-weight-percentage").doubleType().defaultValue(0.02d);
Review Comment:
open to feedback on the config name, type, and default value. `0.02` means
2% of close file weight on the target weight per task. it avoids placing more
than 50 files in one writer task.
##########
docs/docs/flink-configuration.md:
##########
@@ -146,14 +146,16 @@ INSERT INTO tableName /*+
OPTIONS('upsert-enabled'='true') */
...
```
-| Flink option | Default |
Description |
-| ---------------------- | ------------------------------------------ |
------------------------------------------------------------ |
-| write-format | Table write.format.default | File
format to use for this write operation; parquet, avro, or orc |
-| target-file-size-bytes | As per table property |
Overrides this table's write.target-file-size-bytes |
-| upsert-enabled | Table write.upsert.enabled |
Overrides this table's write.upsert.enabled |
-| overwrite-enabled | false |
Overwrite the table's data, overwrite mode shouldn't be enable when configuring
to use UPSERT data stream. |
-| distribution-mode | Table write.distribution-mode |
Overrides this table's write.distribution-mode |
-| compression-codec | Table write.(fileformat).compression-codec |
Overrides this table's compression codec for this write |
-| compression-level | Table write.(fileformat).compression-level |
Overrides this table's compression level for Parquet and Avro tables for this
write |
-| compression-strategy | Table write.orc.compression-strategy |
Overrides this table's compression strategy for ORC tables for this write |
-| write-parallelism | Upstream operator parallelism |
Overrides the writer parallelism |
+| Flink option | Default
| Description
|
+|------------------------------------|--------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| write-format | Table write.format.default
| File format to use for this write operation; parquet, avro, or orc
|
+| target-file-size-bytes | As per table property
| Overrides this table's write.target-file-size-bytes
|
+| upsert-enabled | Table write.upsert.enabled
| Overrides this table's write.upsert.enabled
|
+| overwrite-enabled | false
| Overwrite the table's data, overwrite mode shouldn't be enable when
configuring to use UPSERT data stream.
|
+| distribution-mode | Table write.distribution-mode
| Overrides this table's write.distribution-mode. RANGE distribution is in
experimental status.
|
+| range-distribution-statistics-type | Auto
| Range distribution data statistics collection type: Map, Sketch, Auto. Map
type collects accurate sampling count for every single key. It should be used
for low cardinality scenarios (like hundreds or thousands). Sketch type
constructs a uniform random sampling via reservoir sampling. It fits well for
high cardinality scenarios (like millions), as memory footprint is kept low.
Auto starts with Maps statistics. But if cardinality is detected higher than a
threshold (currently 10,000), statistics are automatically switched to Sketch. |
Review Comment:
only added these two rows line 156 and 157. Rest of are formatting change
due to wider description column in these two rows.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]