yashmayya commented on code in PR #16886:
URL: https://github.com/apache/pinot/pull/16886#discussion_r2395439947


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java:
##########
@@ -20,120 +20,294 @@
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.retry.AttemptFailureException;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 public class ZkBasedTenantRebalanceObserver implements TenantRebalanceObserver 
{
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ZkBasedTenantRebalanceObserver.class);
+  public static final int DEFAULT_ZK_UPDATE_MAX_RETRIES = 30;
+  private static final int ZK_UPDATE_RETRY_WAIT_MS = 1000;
 
   private final PinotHelixResourceManager _pinotHelixResourceManager;
   private final String _jobId;
   private final String _tenantName;
-  private final List<String> _unprocessedTables;
-  private final TenantRebalanceProgressStats _progressStats;
-  private final TenantRebalanceContext _tenantRebalanceContext;
   // Keep track of number of updates. Useful during debugging.
-  private int _numUpdatesToZk;
+  private final AtomicInteger _numUpdatesToZk;
+  private final int _zkUpdateMaxRetries;
   private boolean _isDone;
 
-  public ZkBasedTenantRebalanceObserver(String jobId, String tenantName, 
TenantRebalanceProgressStats progressStats,
-      TenantRebalanceContext tenantRebalanceContext,
-      PinotHelixResourceManager pinotHelixResourceManager) {
+  public ZkBasedTenantRebalanceObserver(String jobId, String tenantName,

Review Comment:
   Should this constructor be private or merged with some of the other 
constructors? Doesn't look like it's intended to be called externally directly 
given that it doesn't update ZK.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java:
##########
@@ -20,120 +20,294 @@
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.retry.AttemptFailureException;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 public class ZkBasedTenantRebalanceObserver implements TenantRebalanceObserver 
{
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ZkBasedTenantRebalanceObserver.class);
+  public static final int DEFAULT_ZK_UPDATE_MAX_RETRIES = 30;

Review Comment:
   This is way too high IMO, how did we arrive at this number for the default?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to