yegangy0718 commented on code in PR #10331: URL: https://github.com/apache/iceberg/pull/10331#discussion_r1615024325
########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java: ########## @@ -30,71 +42,99 @@ * {@link AggregatedStatistics} received from {@link DataStatisticsOperator} subtasks for specific * checkpoint. */ -class AggregatedStatisticsTracker<D extends DataStatistics<D, S>, S> { +class AggregatedStatisticsTracker { private static final Logger LOG = LoggerFactory.getLogger(AggregatedStatisticsTracker.class); private static final double ACCEPT_PARTIAL_AGGR_THRESHOLD = 90; private final String operatorName; - private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer; private final int parallelism; + private final TypeSerializer<DataStatistics> statisticsSerializer; + private final int downstreamParallelism; + private final StatisticsType statisticsType; Review Comment: the field is not being used ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java: ########## @@ -47,151 +48,190 @@ * distribution to downstream subtasks. */ @Internal -class DataStatisticsOperator<D extends DataStatistics<D, S>, S> - extends AbstractStreamOperator<DataStatisticsOrRecord<D, S>> - implements OneInputStreamOperator<RowData, DataStatisticsOrRecord<D, S>>, OperatorEventHandler { +public class DataStatisticsOperator extends AbstractStreamOperator<StatisticsOrRecord> + implements OneInputStreamOperator<RowData, StatisticsOrRecord>, OperatorEventHandler { private static final long serialVersionUID = 1L; private final String operatorName; private final RowDataWrapper rowDataWrapper; private final SortKey sortKey; private final OperatorEventGateway operatorEventGateway; - private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer; - private transient volatile DataStatistics<D, S> localStatistics; - private transient volatile DataStatistics<D, S> globalStatistics; - private transient ListState<DataStatistics<D, S>> globalStatisticsState; + private final int downstreamParallelism; + private final StatisticsType statisticsType; + private final TypeSerializer<DataStatistics> taskStatisticsSerializer; + private final TypeSerializer<AggregatedStatistics> aggregatedStatisticsSerializer; + + private transient int parallelism; + private transient int subtaskIndex; + private transient ListState<AggregatedStatistics> globalStatisticsState; + // current statistics type may be different from the config due to possible + // migration from Map statistics to Sketch statistics when high cardinality detected + private transient volatile StatisticsType taskStatisticsType; + private transient volatile DataStatistics localStatistics; + private transient volatile AggregatedStatistics globalStatistics; DataStatisticsOperator( String operatorName, Schema schema, SortOrder sortOrder, OperatorEventGateway operatorEventGateway, - TypeSerializer<DataStatistics<D, S>> statisticsSerializer) { + int downstreamParallelism, + StatisticsType statisticsType) { this.operatorName = operatorName; this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), schema.asStruct()); this.sortKey = new SortKey(schema, sortOrder); this.operatorEventGateway = operatorEventGateway; - this.statisticsSerializer = statisticsSerializer; + this.downstreamParallelism = downstreamParallelism; + this.statisticsType = statisticsType; + + SortKeySerializer sortKeySerializer = new SortKeySerializer(schema, sortOrder); + this.taskStatisticsSerializer = new DataStatisticsSerializer(sortKeySerializer); + this.aggregatedStatisticsSerializer = new AggregatedStatisticsSerializer(sortKeySerializer); } @Override public void initializeState(StateInitializationContext context) throws Exception { - localStatistics = statisticsSerializer.createInstance(); - globalStatisticsState = + this.parallelism = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); + this.subtaskIndex = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); + this.globalStatisticsState = context .getOperatorStateStore() .getUnionListState( - new ListStateDescriptor<>("globalStatisticsState", statisticsSerializer)); + new ListStateDescriptor<>("globalStatisticsState", aggregatedStatisticsSerializer)); if (context.isRestored()) { - int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); if (globalStatisticsState.get() == null || !globalStatisticsState.get().iterator().hasNext()) { LOG.warn( "Operator {} subtask {} doesn't have global statistics state to restore", operatorName, subtaskIndex); - globalStatistics = statisticsSerializer.createInstance(); } else { LOG.info( - "Restoring operator {} global statistics state for subtask {}", - operatorName, - subtaskIndex); - globalStatistics = globalStatisticsState.get().iterator().next(); + "Operator {} subtask {} restoring global statistics state", operatorName, subtaskIndex); + this.globalStatistics = globalStatisticsState.get().iterator().next(); } - } else { - globalStatistics = statisticsSerializer.createInstance(); } + + this.taskStatisticsType = StatisticsUtil.collectType(statisticsType, globalStatistics); Review Comment: Does Iceberg repo follows the type that, we always use `this.` to refer to the class variable? If that's the case, then let's update `globalStatistics` to `this.globalStatistics` like what we do in line 113 ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatistics.java: ########## @@ -19,53 +19,87 @@ package org.apache.iceberg.flink.sink.shuffle; import java.io.Serializable; -import org.apache.flink.api.common.typeutils.TypeSerializer; +import java.util.Arrays; +import java.util.Map; +import org.apache.iceberg.SortKey; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Objects; /** * AggregatedStatistics is used by {@link DataStatisticsCoordinator} to collect {@link * DataStatistics} from {@link DataStatisticsOperator} subtasks for specific checkpoint. It stores * the merged {@link DataStatistics} result from all reported subtasks. */ -class AggregatedStatistics<D extends DataStatistics<D, S>, S> implements Serializable { - +class AggregatedStatistics implements Serializable { private final long checkpointId; - private final DataStatistics<D, S> dataStatistics; - - AggregatedStatistics(long checkpoint, TypeSerializer<DataStatistics<D, S>> statisticsSerializer) { - this.checkpointId = checkpoint; - this.dataStatistics = statisticsSerializer.createInstance(); - } + private final StatisticsType type; + private final Map<SortKey, Long> keyFrequency; + private final SortKey[] rangeBounds; - AggregatedStatistics(long checkpoint, DataStatistics<D, S> dataStatistics) { - this.checkpointId = checkpoint; - this.dataStatistics = dataStatistics; - } - - long checkpointId() { - return checkpointId; + AggregatedStatistics( + long checkpointId, + StatisticsType type, + Map<SortKey, Long> keyFrequency, + SortKey[] rangeBounds) { + this.checkpointId = checkpointId; + this.type = type; + this.keyFrequency = keyFrequency; + this.rangeBounds = rangeBounds; Review Comment: do we want to add a check at here to make sure keyFrequency and rangeBounds won't have value at the same time ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java: ########## @@ -47,151 +48,190 @@ * distribution to downstream subtasks. */ @Internal -class DataStatisticsOperator<D extends DataStatistics<D, S>, S> - extends AbstractStreamOperator<DataStatisticsOrRecord<D, S>> - implements OneInputStreamOperator<RowData, DataStatisticsOrRecord<D, S>>, OperatorEventHandler { +public class DataStatisticsOperator extends AbstractStreamOperator<StatisticsOrRecord> + implements OneInputStreamOperator<RowData, StatisticsOrRecord>, OperatorEventHandler { private static final long serialVersionUID = 1L; private final String operatorName; private final RowDataWrapper rowDataWrapper; private final SortKey sortKey; private final OperatorEventGateway operatorEventGateway; - private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer; - private transient volatile DataStatistics<D, S> localStatistics; - private transient volatile DataStatistics<D, S> globalStatistics; - private transient ListState<DataStatistics<D, S>> globalStatisticsState; + private final int downstreamParallelism; + private final StatisticsType statisticsType; + private final TypeSerializer<DataStatistics> taskStatisticsSerializer; + private final TypeSerializer<AggregatedStatistics> aggregatedStatisticsSerializer; + + private transient int parallelism; + private transient int subtaskIndex; + private transient ListState<AggregatedStatistics> globalStatisticsState; + // current statistics type may be different from the config due to possible + // migration from Map statistics to Sketch statistics when high cardinality detected + private transient volatile StatisticsType taskStatisticsType; + private transient volatile DataStatistics localStatistics; + private transient volatile AggregatedStatistics globalStatistics; DataStatisticsOperator( String operatorName, Schema schema, SortOrder sortOrder, OperatorEventGateway operatorEventGateway, - TypeSerializer<DataStatistics<D, S>> statisticsSerializer) { + int downstreamParallelism, + StatisticsType statisticsType) { this.operatorName = operatorName; this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), schema.asStruct()); this.sortKey = new SortKey(schema, sortOrder); this.operatorEventGateway = operatorEventGateway; - this.statisticsSerializer = statisticsSerializer; + this.downstreamParallelism = downstreamParallelism; + this.statisticsType = statisticsType; + + SortKeySerializer sortKeySerializer = new SortKeySerializer(schema, sortOrder); + this.taskStatisticsSerializer = new DataStatisticsSerializer(sortKeySerializer); + this.aggregatedStatisticsSerializer = new AggregatedStatisticsSerializer(sortKeySerializer); } @Override public void initializeState(StateInitializationContext context) throws Exception { - localStatistics = statisticsSerializer.createInstance(); - globalStatisticsState = + this.parallelism = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); + this.subtaskIndex = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); + this.globalStatisticsState = context .getOperatorStateStore() .getUnionListState( - new ListStateDescriptor<>("globalStatisticsState", statisticsSerializer)); + new ListStateDescriptor<>("globalStatisticsState", aggregatedStatisticsSerializer)); if (context.isRestored()) { - int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); if (globalStatisticsState.get() == null || !globalStatisticsState.get().iterator().hasNext()) { LOG.warn( "Operator {} subtask {} doesn't have global statistics state to restore", operatorName, subtaskIndex); - globalStatistics = statisticsSerializer.createInstance(); } else { LOG.info( - "Restoring operator {} global statistics state for subtask {}", - operatorName, - subtaskIndex); - globalStatistics = globalStatisticsState.get().iterator().next(); + "Operator {} subtask {} restoring global statistics state", operatorName, subtaskIndex); + this.globalStatistics = globalStatisticsState.get().iterator().next(); } - } else { - globalStatistics = statisticsSerializer.createInstance(); } + + this.taskStatisticsType = StatisticsUtil.collectType(statisticsType, globalStatistics); Review Comment: Same comment for line 124 and 125 ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchUtil.java: ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.function.Consumer; +import org.apache.datasketches.sampling.ReservoirItemsSketch; +import org.apache.iceberg.SortKey; +import org.apache.iceberg.StructLike; + +class SketchUtil { + static final int COORDINATOR_MIN_RESERVOIR_SIZE = 10_000; + static final int COORDINATOR_MAX_RESERVOIR_SIZE = 1_000_000; + static final int COORDINATOR_TARGET_PARTITIONS_MULTIPLIER = 100; + static final int OPERATOR_OVER_SAMPLE_RATIO = 10; + + // switch the statistics tracking from map to sketch if the cardinality of the sort key is over + // this threshold. It is hardcoded for now, we can revisit in the future if config is needed. + static final int OPERATOR_SKETCH_SWITCH_THRESHOLD = 10_000; + static final int COORDINATOR_SKETCH_SWITCH_THRESHOLD = 100_000; + + private SketchUtil() {} + + /** + * The larger the reservoir size, the more accurate for range bounds calculation and the more + * balanced range distribution. + * + * <p>Here are the heuristic rules + * <li>Target size: numPartitions x 100 to achieve good accuracy and is easier to calculate the + * range bounds + * <li>Min is 10K to achieve good accuracy while memory footprint is still relatively small + * <li>Max is 1M to cap the memory footprint on coordinator + * + * @param numPartitions number of range partitions which equals to downstream operator parallelism + * @return reservoir size + */ + static int determineCoordinatorReservoirSize(int numPartitions) { + int reservoirSize = numPartitions * COORDINATOR_TARGET_PARTITIONS_MULTIPLIER; + + if (reservoirSize < COORDINATOR_MIN_RESERVOIR_SIZE) { + // adjust it up and still make reservoirSize divisible by numPartitions + int remainder = COORDINATOR_MIN_RESERVOIR_SIZE % numPartitions; + reservoirSize = COORDINATOR_MIN_RESERVOIR_SIZE + (numPartitions - remainder); + } else if (reservoirSize > COORDINATOR_MAX_RESERVOIR_SIZE) { + // adjust it down and still make reservoirSize divisible by numPartitions + int remainder = COORDINATOR_MAX_RESERVOIR_SIZE % numPartitions; + reservoirSize = COORDINATOR_MAX_RESERVOIR_SIZE - remainder; + } + + return reservoirSize; + } + + /** + * Determine the sampling reservoir size where operator subtasks collect data statistics. + * + * <p>Here are the heuristic rules + * <li>Target size is "coordinator reservoir size * over sampling ration (10) / operator + * parallelism" + * <li>Min is 1K to achieve good accuracy while memory footprint is still relatively small + * <li>Max is 100K to cap the memory footprint on coordinator + * + * @param numPartitions number of range partitions which equals to downstream operator parallelism + * @param operatorParallelism data statistics operator parallelism + * @return reservoir size + */ + static int determineOperatorReservoirSize(int operatorParallelism, int numPartitions) { + int coordinatorReservoirSize = determineCoordinatorReservoirSize(numPartitions); + int totalOperatorSamples = coordinatorReservoirSize * OPERATOR_OVER_SAMPLE_RATIO; + return (int) Math.ceil((double) totalOperatorSamples / operatorParallelism); + } + + /** + * To understand how range bounds are used in range partitioning, heere is an example for human + * ages with 4 partitions: [15, 32, 60]. The 4 ranges would be + * <li>age <= 15 + * <li>age > 15 && age <= 32 + * <li>age >32 && age <= 60 + * <li>age > 60 + * + * @param numPartitions number of partitions which maps to downstream operator parallelism + * @param sketch aggregated reservoir sampling sketch + * @return list of range partition bounds. It should be a sorted list (ascending). Number of items + * should be {@code numPartitions - 1}. if numPartitions is 1, return an empty list + */ + static SortKey[] rangeBounds( + int numPartitions, Comparator<StructLike> comparator, ReservoirItemsSketch<SortKey> sketch) { + SortKey[] sortKeys = sketch.getSamples(); + return determineBounds(Math.min(numPartitions, sortKeys.length), comparator, sortKeys); + } + + /** + * This assumes the sort keys have equal weight, which is usually the case for high-cardinality + * scenarios (like device_id, user_id, uuid etc.). + */ + static SortKey[] determineBounds( + int numPartitions, Comparator<StructLike> comparator, SortKey[] sortKeys) { + // sort the keys first + Arrays.sort(sortKeys, comparator); + int numCandidates = numPartitions - 1; + SortKey[] candidates = new SortKey[numCandidates]; + int step = (int) Math.ceil((double) sortKeys.length / numPartitions); + int position = step - 1; + int numChosen = 0; + while (position < sortKeys.length && numChosen < numCandidates) { + SortKey candidate = sortKeys[position]; + // skip duplicate values + if (numChosen > 0 && candidate.equals(candidates[numChosen - 1])) { + // linear probe for the next distinct value + position += 1; + } else { + candidates[numChosen] = candidate; + position += step; + numChosen += 1; + } + } + + return candidates; + } + + /** This can be a bit expensive since it is quadratic. */ + static void convertMapToSketch( + Map<SortKey, Long> taskMapStats, Consumer<SortKey> sketchConsumer) { + taskMapStats.forEach( + (sortKey, count) -> { + for (int i = 0; i < count; ++i) { + sketchConsumer.accept(sortKey); Review Comment: Do we consider to execute the sketchConsumer.accept in parallel to make it faster ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java: ########## @@ -104,30 +144,135 @@ AggregatedStatistics<D, S> updateAndCheckCompletion( subtask, checkpointId); } else { - inProgressStatistics.mergeDataStatistic( + merge(dataStatistics); + LOG.debug( + "Merge data statistics from operator {} subtask {} for checkpoint {}.", operatorName, - event.checkpointId(), - DataStatisticsUtil.deserializeDataStatistics( - event.statisticsBytes(), statisticsSerializer)); + subtask, + checkpointId); } + // This should be the happy path where all subtasks reports are received if (inProgressSubtaskSet.size() == parallelism) { - completedStatistics = inProgressStatistics; + completedStatistics = completedStatistics(); + resetAggregates(); LOG.info( - "Received data statistics from all {} operators {} for checkpoint {}. Return last completed aggregator {}.", + "Received data statistics from all {} operators {} for checkpoint {}. Return last completed aggregator.", parallelism, operatorName, - inProgressStatistics.checkpointId(), - completedStatistics.dataStatistics()); - inProgressStatistics = new AggregatedStatistics<>(checkpointId + 1, statisticsSerializer); - inProgressSubtaskSet.clear(); + inProgressCheckpointId); } return completedStatistics; } + private boolean inProgress() { + return inProgressCheckpointId != CheckpointStoreUtil.INVALID_CHECKPOINT_ID; + } + + private AggregatedStatistics completedStatistics() { + if (coordinatorStatisticsType == StatisticsType.Map) { + LOG.info( + "Completed map statistics aggregation with {} keys", coordinatorMapStatistics.size()); + return AggregatedStatistics.fromKeyFrequency( + inProgressCheckpointId, coordinatorMapStatistics); + } else { + ReservoirItemsSketch<SortKey> sketch = coordinatorSketchStatistics.getResult(); + LOG.info( + "Completed sketch statistics aggregation: " + + "reservoir size = {}, number of items seen = {}, number of samples = {}", + sketch.getK(), + sketch.getN(), + sketch.getNumSamples()); + return AggregatedStatistics.fromRangeBounds( + inProgressCheckpointId, + SketchUtil.rangeBounds(downstreamParallelism, comparator, sketch)); + } + } + + private void initializeAggregates(long checkpointId, DataStatistics taskStatistics) { + LOG.info("Starting a new statistics aggregation for checkpoint {}", checkpointId); + this.inProgressCheckpointId = checkpointId; + this.coordinatorStatisticsType = taskStatistics.type(); + + if (coordinatorStatisticsType == StatisticsType.Map) { + this.coordinatorMapStatistics = Maps.newHashMap(); + this.coordinatorSketchStatistics = null; + } else { + this.coordinatorMapStatistics = null; + this.coordinatorSketchStatistics = + ReservoirItemsUnion.newInstance( + SketchUtil.determineCoordinatorReservoirSize(downstreamParallelism)); + } + } + + private void resetAggregates() { + inProgressSubtaskSet.clear(); + this.inProgressCheckpointId = CheckpointStoreUtil.INVALID_CHECKPOINT_ID; + this.coordinatorMapStatistics = null; + this.coordinatorSketchStatistics = null; + } + + @SuppressWarnings("unchecked") + private void merge(DataStatistics taskStatistics) { + if (taskStatistics.type() == StatisticsType.Map) { + Map<SortKey, Long> taskMapStats = (Map<SortKey, Long>) taskStatistics.result(); + if (coordinatorStatisticsType == StatisticsType.Map) { + taskMapStats.forEach((key, count) -> coordinatorMapStatistics.merge(key, count, Long::sum)); + if (coordinatorMapStatistics.size() > switchToSketchThreshold) { Review Comment: So for coordinator, unlike operator which needs to check if StatisticsType = Auto, we will convert it from map to sketch once the size reaches the threshold? ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java: ########## @@ -158,30 +185,29 @@ private void ensureStarted() { } private int parallelism() { - return operatorCoordinatorContext.currentParallelism(); + return context.currentParallelism(); } - private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<D, S> event) { - AggregatedStatistics<D, S> aggregatedStatistics = + private void handleDataStatisticRequest(int subtask, StatisticsEvent event) { + AggregatedStatistics aggregatedStatistics = aggregatedStatisticsTracker.updateAndCheckCompletion(subtask, event); if (aggregatedStatistics != null) { completedStatistics = aggregatedStatistics; - sendDataStatisticsToSubtasks( - completedStatistics.checkpointId(), completedStatistics.dataStatistics()); + sendAggregatedStatisticsToSubtasks(completedStatistics.checkpointId(), completedStatistics); } } @SuppressWarnings("FutureReturnValueIgnored") - private void sendDataStatisticsToSubtasks( - long checkpointId, DataStatistics<D, S> globalDataStatistics) { + private void sendAggregatedStatisticsToSubtasks( + long checkpointId, AggregatedStatistics globalStatistics) { callInCoordinatorThread( () -> { - DataStatisticsEvent<D, S> dataStatisticsEvent = - DataStatisticsEvent.create(checkpointId, globalDataStatistics, statisticsSerializer); - int parallelism = parallelism(); - for (int i = 0; i < parallelism; ++i) { - subtaskGateways.getSubtaskGateway(i).sendEvent(dataStatisticsEvent); + StatisticsEvent statisticsEvent = + StatisticsEvent.createAggregatedStatisticsEvent( + checkpointId, globalStatistics, aggregatedStatisticsSerializer); + for (int i = 0; i < context.currentParallelism(); ++i) { Review Comment: We have a function #parallelism at line 187 to get the current parallelism. Do we want to remove the function ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchUtil.java: ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.function.Consumer; +import org.apache.datasketches.sampling.ReservoirItemsSketch; +import org.apache.iceberg.SortKey; +import org.apache.iceberg.StructLike; + +class SketchUtil { + static final int COORDINATOR_MIN_RESERVOIR_SIZE = 10_000; + static final int COORDINATOR_MAX_RESERVOIR_SIZE = 1_000_000; + static final int COORDINATOR_TARGET_PARTITIONS_MULTIPLIER = 100; + static final int OPERATOR_OVER_SAMPLE_RATIO = 10; + + // switch the statistics tracking from map to sketch if the cardinality of the sort key is over + // this threshold. It is hardcoded for now, we can revisit in the future if config is needed. + static final int OPERATOR_SKETCH_SWITCH_THRESHOLD = 10_000; + static final int COORDINATOR_SKETCH_SWITCH_THRESHOLD = 100_000; + + private SketchUtil() {} + + /** + * The larger the reservoir size, the more accurate for range bounds calculation and the more + * balanced range distribution. + * + * <p>Here are the heuristic rules + * <li>Target size: numPartitions x 100 to achieve good accuracy and is easier to calculate the + * range bounds + * <li>Min is 10K to achieve good accuracy while memory footprint is still relatively small + * <li>Max is 1M to cap the memory footprint on coordinator + * + * @param numPartitions number of range partitions which equals to downstream operator parallelism + * @return reservoir size + */ + static int determineCoordinatorReservoirSize(int numPartitions) { + int reservoirSize = numPartitions * COORDINATOR_TARGET_PARTITIONS_MULTIPLIER; + + if (reservoirSize < COORDINATOR_MIN_RESERVOIR_SIZE) { + // adjust it up and still make reservoirSize divisible by numPartitions + int remainder = COORDINATOR_MIN_RESERVOIR_SIZE % numPartitions; + reservoirSize = COORDINATOR_MIN_RESERVOIR_SIZE + (numPartitions - remainder); + } else if (reservoirSize > COORDINATOR_MAX_RESERVOIR_SIZE) { + // adjust it down and still make reservoirSize divisible by numPartitions + int remainder = COORDINATOR_MAX_RESERVOIR_SIZE % numPartitions; + reservoirSize = COORDINATOR_MAX_RESERVOIR_SIZE - remainder; + } + + return reservoirSize; + } + + /** + * Determine the sampling reservoir size where operator subtasks collect data statistics. + * + * <p>Here are the heuristic rules + * <li>Target size is "coordinator reservoir size * over sampling ration (10) / operator + * parallelism" + * <li>Min is 1K to achieve good accuracy while memory footprint is still relatively small + * <li>Max is 100K to cap the memory footprint on coordinator + * + * @param numPartitions number of range partitions which equals to downstream operator parallelism + * @param operatorParallelism data statistics operator parallelism + * @return reservoir size + */ + static int determineOperatorReservoirSize(int operatorParallelism, int numPartitions) { + int coordinatorReservoirSize = determineCoordinatorReservoirSize(numPartitions); + int totalOperatorSamples = coordinatorReservoirSize * OPERATOR_OVER_SAMPLE_RATIO; + return (int) Math.ceil((double) totalOperatorSamples / operatorParallelism); + } + + /** + * To understand how range bounds are used in range partitioning, heere is an example for human Review Comment: typo heere to here ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchUtil.java: ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.function.Consumer; +import org.apache.datasketches.sampling.ReservoirItemsSketch; +import org.apache.iceberg.SortKey; +import org.apache.iceberg.StructLike; + +class SketchUtil { + static final int COORDINATOR_MIN_RESERVOIR_SIZE = 10_000; + static final int COORDINATOR_MAX_RESERVOIR_SIZE = 1_000_000; + static final int COORDINATOR_TARGET_PARTITIONS_MULTIPLIER = 100; + static final int OPERATOR_OVER_SAMPLE_RATIO = 10; + + // switch the statistics tracking from map to sketch if the cardinality of the sort key is over + // this threshold. It is hardcoded for now, we can revisit in the future if config is needed. + static final int OPERATOR_SKETCH_SWITCH_THRESHOLD = 10_000; + static final int COORDINATOR_SKETCH_SWITCH_THRESHOLD = 100_000; + + private SketchUtil() {} + + /** + * The larger the reservoir size, the more accurate for range bounds calculation and the more + * balanced range distribution. + * + * <p>Here are the heuristic rules + * <li>Target size: numPartitions x 100 to achieve good accuracy and is easier to calculate the + * range bounds + * <li>Min is 10K to achieve good accuracy while memory footprint is still relatively small + * <li>Max is 1M to cap the memory footprint on coordinator + * + * @param numPartitions number of range partitions which equals to downstream operator parallelism + * @return reservoir size + */ + static int determineCoordinatorReservoirSize(int numPartitions) { + int reservoirSize = numPartitions * COORDINATOR_TARGET_PARTITIONS_MULTIPLIER; + + if (reservoirSize < COORDINATOR_MIN_RESERVOIR_SIZE) { + // adjust it up and still make reservoirSize divisible by numPartitions + int remainder = COORDINATOR_MIN_RESERVOIR_SIZE % numPartitions; + reservoirSize = COORDINATOR_MIN_RESERVOIR_SIZE + (numPartitions - remainder); + } else if (reservoirSize > COORDINATOR_MAX_RESERVOIR_SIZE) { + // adjust it down and still make reservoirSize divisible by numPartitions + int remainder = COORDINATOR_MAX_RESERVOIR_SIZE % numPartitions; + reservoirSize = COORDINATOR_MAX_RESERVOIR_SIZE - remainder; + } + + return reservoirSize; + } + + /** + * Determine the sampling reservoir size where operator subtasks collect data statistics. + * + * <p>Here are the heuristic rules + * <li>Target size is "coordinator reservoir size * over sampling ration (10) / operator + * parallelism" + * <li>Min is 1K to achieve good accuracy while memory footprint is still relatively small + * <li>Max is 100K to cap the memory footprint on coordinator Review Comment: From the current implementation, operator reservoir size depends on coordinator reservoir size completely. Do we check the operator reservoir min max value? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org