deniskuzZ commented on code in PR #6389:
URL: https://github.com/apache/hive/pull/6389#discussion_r3040621658
##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java:
##########
@@ -880,34 +890,77 @@ private boolean shouldDo(List<Integer> partitionPos,
Operator<? extends Operator
if (colStats == null || colStats.isEmpty()) {
return true;
}
- long partCardinality = 1;
-
- // compute cardinality for partition columns
- for (Integer idx : partitionPos) {
- ColumnInfo ci = fsParent.getSchema().getSignature().get(idx);
- ColStatistics partStats =
fsParent.getStatistics().getColumnStatisticsFromColName(ci.getInternalName());
- if (partStats == null) {
- // statistics for this partition are for some reason not available
- return true;
- }
- partCardinality = partCardinality * partStats.getCountDistint();
+ long partCardinality = computePartCardinality(
+ partitionPos, customPartitionExprs, tStats, fsParent, allRSCols);
+ if (partCardinality == 0) {
+ // no partition columns at all
+ return false;
+ }
+ if (partCardinality < 0) {
+ // stats unavailable, be conservative -> sort
+ return true;
}
if (MAX_WRITERS < 0) {
- double orcMemPool =
this.parseCtx.getConf().getDouble(OrcConf.MEMORY_POOL.getHiveConfName(),
- (Double) OrcConf.MEMORY_POOL.getDefaultValue());
- long orcStripSize =
this.parseCtx.getConf().getLong(OrcConf.STRIPE_SIZE.getHiveConfName(),
- (Long) OrcConf.STRIPE_SIZE.getDefaultValue());
- MemoryInfo memoryInfo = new MemoryInfo(this.parseCtx.getConf());
- LOG.debug("Memory info during SDPO opt: {}", memoryInfo);
- long executorMem = memoryInfo.getMaxExecutorMemory();
- MAX_WRITERS = (long) (executorMem * orcMemPool) / orcStripSize;
+ MAX_WRITERS = computeMaxWriters();
+ }
+ return partCardinality > MAX_WRITERS;
+ }
+
+ private long computeMaxWriters() {
+ double orcMemPool =
this.parseCtx.getConf().getDouble(OrcConf.MEMORY_POOL.getHiveConfName(),
+ (Double) OrcConf.MEMORY_POOL.getDefaultValue());
+ long orcStripSize =
this.parseCtx.getConf().getLong(OrcConf.STRIPE_SIZE.getHiveConfName(),
+ (Long) OrcConf.STRIPE_SIZE.getDefaultValue());
+ MemoryInfo memoryInfo = new MemoryInfo(this.parseCtx.getConf());
+ LOG.debug("Memory info during SDPO opt: {}", memoryInfo);
+ long executorMem = memoryInfo.getMaxExecutorMemory();
+ return (long) (executorMem * orcMemPool) / orcStripSize;
+ }
+ /**
+ * Computes the partition cardinality based on column NDV statistics.
+ * @return positive value = estimated cardinality, 0 = no partition
columns, -1 = stats unavailable
+ */
+ private long computePartCardinality(List<Integer> partitionPos,
+ List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customPartitionExprs,
+ Statistics tStats, Operator<? extends OperatorDesc> fsParent,
+ ArrayList<ExprNodeDesc> allRSCols) {
+
+ if (!partitionPos.isEmpty()) {
+ long cardinality = 1;
+ for (Integer idx : partitionPos) {
+ ColumnInfo ci = fsParent.getSchema().getSignature().get(idx);
+ ColStatistics partStats =
tStats.getColumnStatisticsFromColName(ci.getInternalName());
+ if (partStats == null) {
+ return -1;
+ }
+ cardinality *= partStats.getCountDistint();
+ }
+ return cardinality;
}
- if (partCardinality <= MAX_WRITERS) {
- return false;
+
+ if (!customPartitionExprs.isEmpty()) {
+ // extract source column names from custom expressions (same approach
as allStaticPartitions)
+ Set<String> partColNames = new HashSet<>();
+ for (Function<List<ExprNodeDesc>, ExprNodeDesc> expr :
customPartitionExprs) {
+ ExprNodeDesc resolved = expr.apply(allRSCols);
+ for (ExprNodeColumnDesc colDesc :
ExprNodeDescUtils.findAllColumnDescs(resolved)) {
+ partColNames.add(colDesc.getColumn());
+ }
+ }
+ long cardinality = 1;
+ for (String colName : partColNames) {
+ ColStatistics partStats =
tStats.getColumnStatisticsFromColName(colName);
+ if (partStats == null) {
+ return -1;
+ }
+ cardinality *= partStats.getCountDistint();
Review Comment:
addressed in 6b5e1d032716ef96452bb208d166518f896d7da7
##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java:
##########
@@ -210,10 +210,17 @@ public Object process(Node nd, Stack<Node> stack,
NodeProcessorCtx procCtx,
LinkedList<Integer> customSortOrder = new
LinkedList<>(dpCtx.getCustomSortOrder());
LinkedList<Integer> customNullOrder = new
LinkedList<>(dpCtx.getCustomSortNullOrder());
- // If custom expressions (partition or sort) are present, there is an
explicit requirement to do sorting
- if (customPartitionExprs.isEmpty() && customSortExprs.isEmpty() &&
!shouldDo(partitionPositions, fsParent)) {
+ // If custom sort expressions are present, there is an explicit
requirement to do sorting.
+ // Custom partition expressions are evaluated inside shouldDo based on
column stats.
+ if (customSortExprs.isEmpty() && !shouldDo(partitionPositions,
customPartitionExprs, fsParent, allRSCols)) {
return null;
}
+
+ // Mark that sorting will be applied with custom partition expressions,
so the writer layer
+ // (e.g. Iceberg) knows the input is ordered and can use a clustered
writer.
+ if (!customPartitionExprs.isEmpty()) {
+ dpCtx.setHasCustomPartitionOrSortExpression(true);
+ }
Review Comment:
addressed in
https://github.com/apache/hive/commit/6b5e1d032716ef96452bb208d166518f896d7da7
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]