stevenzwu commented on code in PR #7077:
URL: https://github.com/apache/iceberg/pull/7077#discussion_r1162037437
##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java:
##########
@@ -153,16 +162,37 @@ public String orcCompressionStrategy() {
.parse();
}
- public DistributionMode distributionMode() {
+ public DistributionMode distributionMode(List<Integer> equalityFieldIds) {
String modeName =
confParser
.stringConf()
.option(FlinkWriteOptions.DISTRIBUTION_MODE.key())
.flinkConfig(FlinkWriteOptions.DISTRIBUTION_MODE)
.tableProperty(TableProperties.WRITE_DISTRIBUTION_MODE)
- .defaultValue(TableProperties.WRITE_DISTRIBUTION_MODE_NONE)
- .parse();
- return DistributionMode.fromName(modeName);
+ .parseOptional();
+
+ return modeName == null
+ ? defaultWriteDistributionMode(equalityFieldIds)
+ : DistributionMode.fromName(modeName);
+ }
+
+ private DistributionMode defaultWriteDistributionMode(List<Integer>
equalityFieldIds) {
+ if (table.sortOrder().isSorted()) {
+ return RANGE;
+ } else if (table.spec().isPartitioned()) {
+ if (!equalityFieldIds.isEmpty()) {
+ PartitionSpec partitionSpec = table.spec();
+ for (PartitionField partitionField : partitionSpec.fields()) {
+ if (!equalityFieldIds.contains(partitionField.sourceId())) {
+ return NONE;
+ }
+ }
+ }
+
+ return HASH;
Review Comment:
is this logic correct? we shall only do HASH if equalityFieldIds list is not
empty and it contains all the partition fields. right now, if the
equalityFieldIds is empty, we will do HASH distribution.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]