stevenzwu commented on code in PR #10457:
URL: https://github.com/apache/iceberg/pull/10457#discussion_r1630376841


##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -189,28 +198,75 @@ private void handleDataStatisticRequest(int subtask, 
StatisticsEvent event) {
         aggregatedStatisticsTracker.updateAndCheckCompletion(subtask, event);
 
     if (aggregatedStatistics != null) {
-      completedStatistics = aggregatedStatistics;
-      sendAggregatedStatisticsToSubtasks(completedStatistics.checkpointId(), 
completedStatistics);
+      // completedStatistics contains the complete samples, which is needed to 
compute
+      // the range bounds in globalStatistics if downstreamParallelism changed.
+      this.completedStatistics = aggregatedStatistics;
+      // globalStatistics only contains the range bounds, which has size equal 
to
+      // downstreamParallelism - 1.
+      this.globalStatistics =
+          globalStatistics(aggregatedStatistics, downstreamParallelism, 
comparator);
+      sendGlobalStatisticsToSubtasks(globalStatistics);
+    }
+  }
+
+  private static AggregatedStatistics globalStatistics(
+      AggregatedStatistics aggregatedStatistics,
+      int downstreamParallelism,
+      Comparator<StructLike> comparator) {
+    if (aggregatedStatistics.type() == StatisticsType.Map) {
+      return aggregatedStatistics;
+    } else {
+      // range bound is a much smaller array compared to the complete samples.
+      // It helps reduce the amount of data transfer from coordinator to 
operator subtasks.
+      SortKey[] rangeBounds =
+          SketchUtil.rangeBounds(
+              downstreamParallelism, comparator, 
aggregatedStatistics.keySamples());
+      return 
AggregatedStatistics.fromKeySamples(aggregatedStatistics.checkpointId(), 
rangeBounds);
     }
   }
 
   @SuppressWarnings("FutureReturnValueIgnored")
-  private void sendAggregatedStatisticsToSubtasks(
-      long checkpointId, AggregatedStatistics globalStatistics) {
+  private void sendGlobalStatisticsToSubtasks(AggregatedStatistics statistics) 
{
     callInCoordinatorThread(
         () -> {
+          // applyImmediately is set to false so that operator subtasks can 
apply the change at
+          // checkpoint boundary
           StatisticsEvent statisticsEvent =
               StatisticsEvent.createAggregatedStatisticsEvent(
-                  checkpointId, globalStatistics, 
aggregatedStatisticsSerializer);
+                  statistics, aggregatedStatisticsSerializer, false);
           for (int i = 0; i < context.currentParallelism(); ++i) {
+            // Ignore future return value for potential error, e.g. subtask 
down
             subtaskGateways.getSubtaskGateway(i).sendEvent(statisticsEvent);
           }
-

Review Comment:
   haha. this is a good question. I guess we were just avoiding the complexity 
of error handling for completable future. 
   
   Now coming back to the error handling, what we really need is a 
reconciliation process/protocol. I think we can compute a hash code / signature 
in the `GlobalStatistics`. Subtasks always send the `RequestStatisticsEvent` 
with the signature of its local copy upon startup. Coordinator can check if the 
signature matches or not. if not, resend the latest statistics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to