zhtaoxiang commented on code in PR #11740: URL: https://github.com/apache/pinot/pull/11740#discussion_r1348079942
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -2063,28 +2066,76 @@ public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String jobId ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.RELOAD_SEGMENT)); } + /** + * Adds a new job metadata for controller job like table rebalance or reload into ZK + * @param jobId job's UUID + * @param jobMetadata the job metadata + * @param jobResourcePath zkPath to add the new job metadata + * @return boolean representing success / failure of the ZK write step + */ public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, String jobResourcePath) { + return addControllerJobToZK(jobId, jobMetadata, jobResourcePath, prev -> true); + } + + /** + * Adds a new job metadata for controller job like table rebalance or reload into ZK + * @param jobId job's UUID + * @param jobMetadata the job metadata + * @param jobResourcePath zkPath to add the new job metadata + * @param prevJobMetadataChecker to check the previous job metadata before adding new one Review Comment: could we add some explanation on what specifically to check? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java: ########## @@ -126,30 +143,74 @@ public int getNumUpdatesToZk() { return _numUpdatesToZk; } + @VisibleForTesting + TableRebalanceRetryConfig getTableRebalanceJobRetryConfig() { + return _tableRebalanceJobRetryConfig; + } + private void trackStatsInZk() { + Map<String, String> jobMetadata = + createJobMetadata(_tableNameWithType, _rebalanceJobId, _tableRebalanceProgressStats, + _tableRebalanceJobRetryConfig); + _pinotHelixResourceManager.addControllerJobToZK(_rebalanceJobId, jobMetadata, + ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.TABLE_REBALANCE), + prevJobMetadata -> { + // Abort the job when we're sure it has failed, otherwise continue to update the status. + if (prevJobMetadata == null) { + return true; + } + String prevStatsInStr = prevJobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS); + TableRebalanceProgressStats prevStats; + try { + prevStats = JsonUtils.stringToObject(prevStatsInStr, TableRebalanceProgressStats.class); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } Review Comment: should we log the exception and return false? ########## docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml: ########## @@ -100,6 +100,12 @@ rules: labels: table: "$1" taskType: "$2" +- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.([^\\.]*?)\\.(\\w+).tableRebalanceExecutionTimeMs\"><>(\\w+)" Review Comment: we also need to add this to the pinot.yml file -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org