huaxingao commented on code in PR #8042:
URL: https://github.com/apache/iceberg/pull/8042#discussion_r1263268472


##########
spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDistributionAndOrderingUtils.scala:
##########
@@ -57,10 +61,30 @@ object ExtendedDistributionAndOrderingUtils {
         } else {
           conf.numShufflePartitions
         }
-        // the conversion to catalyst expressions above produces SortOrder 
expressions
-        // for OrderedDistribution and generic expressions for 
ClusteredDistribution
-        // this allows RepartitionByExpression to pick either range or hash 
partitioning
-        RepartitionByExpression(ArraySeq.unsafeWrapArray(distribution), query, 
finalNumPartitions)
+
+        val tableProperties = if(table.isInstanceOf[RowLevelOperationTable])  {
+          table.asInstanceOf[RowLevelOperationTable].table.properties()
+        } else {
+          table.properties()
+        }
+
+        val strictDistributionMode = tableProperties
+          .getOrDefault(TableProperties.STRICT_TABLE_DISTRIBUTION_AND_ORDERING,
+            TableProperties.STRICT_TABLE_DISTRIBUTION_AND_ORDERING_DEFAULT)
+        if(strictDistributionMode.equals("false") && 
write.requiredDistribution().isInstanceOf[ClusteredDistribution]) {

Review Comment:
   space after `if`?



##########
spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDistributionAndOrderingUtils.scala:
##########
@@ -57,10 +61,30 @@ object ExtendedDistributionAndOrderingUtils {
         } else {
           conf.numShufflePartitions
         }
-        // the conversion to catalyst expressions above produces SortOrder 
expressions
-        // for OrderedDistribution and generic expressions for 
ClusteredDistribution
-        // this allows RepartitionByExpression to pick either range or hash 
partitioning
-        RepartitionByExpression(ArraySeq.unsafeWrapArray(distribution), query, 
finalNumPartitions)
+
+        val tableProperties = if(table.isInstanceOf[RowLevelOperationTable])  {
+          table.asInstanceOf[RowLevelOperationTable].table.properties()
+        } else {
+          table.properties()
+        }
+
+        val strictDistributionMode = tableProperties
+          .getOrDefault(TableProperties.STRICT_TABLE_DISTRIBUTION_AND_ORDERING,
+            TableProperties.STRICT_TABLE_DISTRIBUTION_AND_ORDERING_DEFAULT)
+        if(strictDistributionMode.equals("false") && 
write.requiredDistribution().isInstanceOf[ClusteredDistribution]) {
+          // if strict distribution mode is not enabled, then we fallback to 
spark AQE
+          // to determine the number of partitions by colaesceing and 
un-skewing partitions
+          // Also to note, Rebalance is only supported for hash distribution 
mode till spark 3.3
+          // By default the strictDistributionMode is set to true, to not 
disrupt regular
+          // plan of RepartitionByExpression
+          RebalancePartitions(ArraySeq.unsafeWrapArray(distribution), query)
+        }
+        else {

Review Comment:
   `} else {`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to