yegangy0718 commented on code in PR #6382: URL: https://github.com/apache/iceberg/pull/6382#discussion_r1065372357
########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsFactory.java: ########## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +/** + * DataStatisticsFactory provides the DataStatistics definition for different mode like HASH, RANGE + */ +class DataStatisticsFactory<K> { + + DataStatistics<K> createDataStatistics() { Review Comment: The factory will create different types DataStatistics based on different scenarios like `MapStatistics` for low-cardinary and `SketchStatistics`/ `DigestStatistics` for high-cardinary. Thus I would prefer to keep it general. Regarding whether we need this factory, it depends. We can either use ShuffleOperatorFactory to get config(low or high cardinary or range mode) and then pass the right data statistics to operator and coordinator, or use this factory, pass it into operator and coordinator to create DataStatistics by calling this function. ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; + +/** + * Shuffle operator can help to improve data clustering based on the key. Review Comment: will update ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** MapDataStatistics uses map to count key frequency */ +class MapDataStatistics<K> implements DataStatistics<K> { + private final Map<K, Long> dataStatistics = Maps.newHashMap(); + + @Override + public long size() { + return dataStatistics.size(); + } + + @Override + public void put(K key) { + dataStatistics.put(key, dataStatistics.getOrDefault(key, 0L) + 1L); + } + + @Override + public void merge(DataStatistics<K> other) { + Preconditions.checkArgument( + other instanceof MapDataStatistics, "Can not merge this type of statistics: " + other); + MapDataStatistics<K> mapDataStatistic = (MapDataStatistics<K>) other; + mapDataStatistic.dataStatistics.forEach( + (key, count) -> dataStatistics.put(key, dataStatistics.getOrDefault(key, 0L) + count)); + } + + @VisibleForTesting Review Comment: yes, right. I will remove the annotation ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; + +/** + * Shuffle operator can help to improve data clustering based on the key. + * + * <p>It collects the data statistics information, sends to coordinator and gets the global data + * distribution weight from coordinator. Then it will ingest the weight into data stream(wrap by a + * class{@link ShuffleRecordWrapper}) and send to partitioner. + */ +class ShuffleOperator<T, K> extends AbstractStreamOperator<ShuffleRecordWrapper<T, K>> + implements OneInputStreamOperator<T, ShuffleRecordWrapper<T, K>>, OperatorEventHandler { + + private static final long serialVersionUID = 1L; + + private final KeySelector<T, K> keySelector; + private final OperatorEventGateway operatorEventGateway; + // key is generated by applying KeySelector to record + // value is the times key occurs + // TODO: support to store statistics for high cardinality cases + private transient DataStatistics<K> localDataStatistics; + private transient DataStatistics<K> globalDataStatistics; + private transient ListState<DataStatistics<K>> globalDataStatisticsState; + private transient DataStatisticsFactory<K> dataStatisticsFactory; + + public ShuffleOperator( + KeySelector<T, K> keySelector, + OperatorEventGateway operatorEventGateway, + DataStatisticsFactory<K> dataStatisticsFactory) { + this.keySelector = keySelector; + this.operatorEventGateway = operatorEventGateway; + this.dataStatisticsFactory = dataStatisticsFactory; + } + + @VisibleForTesting + ListStateDescriptor<DataStatistics<K>> generateGlobalDataDistributionWeightDescriptor() { + return new ListStateDescriptor<>( + "globalDataStatisticsState", TypeInformation.of(new TypeHint<DataStatistics<K>>() {})); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + localDataStatistics = dataStatisticsFactory.createDataStatistics(); + globalDataStatisticsState = Review Comment: will add the check to line 81 ``` if (context.isRestored() && globalStatisticsState.get() != null && globalStatisticsState.get().iterator().hasNext()) ``` since we still need to initialize the globalDataStatisticsState variable. ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestShuffleOperator.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; +import org.apache.flink.streaming.util.MockOutput; +import org.apache.flink.streaming.util.MockStreamConfig; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestShuffleOperator { + private ShuffleOperator<String, String> operator; + + private Environment getTestingEnvironment() { + return new StreamMockEnvironment( + new Configuration(), + new Configuration(), + new ExecutionConfig(), + 1L, + new MockInputSplitProvider(), + 1, + new TestTaskStateManager()); + } + + @Before + public void before() throws Exception { + MockOperatorEventGateway mockGateway = new MockOperatorEventGateway(); + KeySelector<String, String> keySelector = + new KeySelector<String, String>() { + private static final long serialVersionUID = 7662520075515707428L; + + @Override + public String getKey(String value) { + return value; + } + }; + DataStatisticsFactory<String> dataStatisticsFactory = new DataStatisticsFactory<>(); + + this.operator = new ShuffleOperator<>(keySelector, mockGateway, dataStatisticsFactory); + Environment env = getTestingEnvironment(); + this.operator.setup( + new OneInputStreamTask<String, String>(env), + new MockStreamConfig(new Configuration(), 1), + new MockOutput<>(Lists.newArrayList())); + } + + @After + public void clean() throws Exception { + operator.close(); + } + + @Test + public void testInitializeState() throws Exception { + StateInitializationContext stateContext = getStateContext(); + operator.initializeState(stateContext); + + assertNotNull( + stateContext + .getOperatorStateStore() + .getListState(operator.generateGlobalDataDistributionWeightDescriptor())); + } + + @Test + public void testProcessElement() throws Exception { + StateInitializationContext stateContext = getStateContext(); + operator.initializeState(stateContext); + operator.processElement(new StreamRecord<>("a")); + operator.processElement(new StreamRecord<>("a")); + operator.processElement(new StreamRecord<>("b")); + assertTrue(operator.localDataStatistics() instanceof MapDataStatistics); + MapDataStatistics<String> mapDataStatistics = + (MapDataStatistics<String>) operator.localDataStatistics(); + assertTrue(mapDataStatistics.dataStatistics().containsKey("a")); + assertTrue(mapDataStatistics.dataStatistics().containsKey("b")); + assertEquals(2L, (long) mapDataStatistics.dataStatistics().get("a")); + assertEquals(1L, (long) mapDataStatistics.dataStatistics().get("b")); + } + + @Test + public void testOperatorOutput() throws Exception { + try (OneInputStreamOperatorTestHarness<String, ShuffleRecordWrapper<String, String>> + testHarness = createHarness(this.operator)) { + testHarness.processElement(new StreamRecord<>("a")); + testHarness.processElement(new StreamRecord<>("b")); + testHarness.processElement(new StreamRecord<>("b")); + + List<String> recordsOutput = + testHarness.extractOutputValues().stream() + .filter(ShuffleRecordWrapper::hasRecord) + .map(ShuffleRecordWrapper::record) + .collect(Collectors.toList()); + assertThat(recordsOutput) + .containsExactlyInAnyOrderElementsOf(ImmutableList.of("a", "b", "b")); + } + } + + // ---------------- helper methods ------------------------- Review Comment: will remove ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; + +/** + * Shuffle operator can help to improve data clustering based on the key. + * + * <p>It collects the data statistics information, sends to coordinator and gets the global data + * distribution weight from coordinator. Then it will ingest the weight into data stream(wrap by a + * class{@link ShuffleRecordWrapper}) and send to partitioner. + */ +class ShuffleOperator<T, K> extends AbstractStreamOperator<ShuffleRecordWrapper<T, K>> + implements OneInputStreamOperator<T, ShuffleRecordWrapper<T, K>>, OperatorEventHandler { + + private static final long serialVersionUID = 1L; + + private final KeySelector<T, K> keySelector; + private final OperatorEventGateway operatorEventGateway; + // key is generated by applying KeySelector to record + // value is the times key occurs + // TODO: support to store statistics for high cardinality cases + private transient DataStatistics<K> localDataStatistics; + private transient DataStatistics<K> globalDataStatistics; + private transient ListState<DataStatistics<K>> globalDataStatisticsState; + private transient DataStatisticsFactory<K> dataStatisticsFactory; + + public ShuffleOperator( + KeySelector<T, K> keySelector, + OperatorEventGateway operatorEventGateway, + DataStatisticsFactory<K> dataStatisticsFactory) { + this.keySelector = keySelector; + this.operatorEventGateway = operatorEventGateway; + this.dataStatisticsFactory = dataStatisticsFactory; + } + + @VisibleForTesting + ListStateDescriptor<DataStatistics<K>> generateGlobalDataDistributionWeightDescriptor() { Review Comment: we use it in unit test now to get the state to make sure state is not null like what SourceOperatorTest did https://github.com/apache/flink/blob/68b37fb867374df5a201f0b170e35c21266e5d7b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java#L87 I was thinking we can use it to verify globalStatisticsState was updated as expected after taking snapshot. But actually, we can also create a test-only visible function to return globalStatisticsState directly if we need to check its value. ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/ShuffleOperator.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; + +/** + * Shuffle operator can help to improve data clustering based on the key. + * + * <p>It collects the data statistics information, sends to coordinator and gets the global data + * distribution weight from coordinator. Then it will ingest the weight into data stream(wrap by a + * class{@link ShuffleRecordWrapper}) and send to partitioner. + */ +class ShuffleOperator<T, K> extends AbstractStreamOperator<ShuffleRecordWrapper<T, K>> + implements OneInputStreamOperator<T, ShuffleRecordWrapper<T, K>>, OperatorEventHandler { + + private static final long serialVersionUID = 1L; + + private final KeySelector<T, K> keySelector; + private final OperatorEventGateway operatorEventGateway; + // key is generated by applying KeySelector to record + // value is the times key occurs + // TODO: support to store statistics for high cardinality cases + private transient DataStatistics<K> localDataStatistics; + private transient DataStatistics<K> globalDataStatistics; + private transient ListState<DataStatistics<K>> globalDataStatisticsState; + private transient DataStatisticsFactory<K> dataStatisticsFactory; + + public ShuffleOperator( + KeySelector<T, K> keySelector, + OperatorEventGateway operatorEventGateway, + DataStatisticsFactory<K> dataStatisticsFactory) { + this.keySelector = keySelector; + this.operatorEventGateway = operatorEventGateway; + this.dataStatisticsFactory = dataStatisticsFactory; + } + + @VisibleForTesting + ListStateDescriptor<DataStatistics<K>> generateGlobalDataDistributionWeightDescriptor() { + return new ListStateDescriptor<>( + "globalDataStatisticsState", TypeInformation.of(new TypeHint<DataStatistics<K>>() {})); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + localDataStatistics = dataStatisticsFactory.createDataStatistics(); + globalDataStatisticsState = + context + .getOperatorStateStore() + .getListState(generateGlobalDataDistributionWeightDescriptor()); + + if (globalDataStatisticsState.get() != null + && globalDataStatisticsState.get().iterator().hasNext()) { + globalDataStatistics = globalDataStatisticsState.get().iterator().next(); + } else { + globalDataStatistics = dataStatisticsFactory.createDataStatistics(); Review Comment: We will always check `globalStatistics != null && globalStatistics.size() > 0` to decide whether to send globalStatistics to partitioner. We don't have any special action for cold start yet. Leaving it to empty doesn't bring too much benefit as of now. ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** MapDataStatistics uses map to count key frequency */ +class MapDataStatistics<K> implements DataStatistics<K> { + private final Map<K, Long> dataStatistics = Maps.newHashMap(); + + @Override + public long size() { + return dataStatistics.size(); + } + + @Override + public void put(K key) { Review Comment: will rename the method to `add` in latest commit ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatistics.java: ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +/** + * DataStatistics defines the interface to collect data statistics. + * + * <p>{@link ShuffleOperator} will store local data statistics and later distribute the global Review Comment: Make sense to me. Will update in latest commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org