dinoocch commented on code in PR #14237:
URL: https://github.com/apache/pinot/pull/14237#discussion_r1801704162


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java:
##########
@@ -124,35 +125,44 @@ public IdealState commit(HelixManager helixManager, 
String resourceName,
           Entry first = queue._pending.poll();
           processed.add(first);
           String mergedResourceName = first._resourceName;
-          HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
-          PropertyKey idealStateKey = 
dataAccessor.keyBuilder().idealStates(resourceName);
-          IdealState idealState = dataAccessor.getProperty(idealStateKey);
-
-          // Make a copy of the idealState above to pass it to the updater
-          // NOTE: new IdealState(idealState.getRecord()) does not work 
because it's shallow copy for map fields and
-          // list fields
-          IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState);
-
-          /**
-           * If the local cache does not contain a value, need to check if 
there is a
-           * value in ZK; use it as initial value if exists
-           */
-          IdealState updatedIdealState = first._updater.apply(idealStateCopy);
-          first._updatedIdealState = updatedIdealState;
-          Iterator<Entry> it = queue._pending.iterator();
-          while (it.hasNext()) {
-            Entry ent = it.next();
-            if (!ent._resourceName.equals(mergedResourceName)) {
-              continue;
+          IdealState response = updateIdealState(helixManager, 
mergedResourceName, idealState -> {
+            IdealState updatedIdealState = first._updater.apply(idealState);
+            first._updatedIdealState = updatedIdealState;
+            first._exception = null;
+            if (processed.size() > 1) {
+              queue._pending.addAll(processed.subList(1, processed.size()));
+              processed.clear();
+              processed.add(first);
+            }
+            Iterator<Entry> it = queue._pending.iterator();
+            while (it.hasNext()) {
+              Entry ent = it.next();
+              if (!ent._resourceName.equals(mergedResourceName)) {
+                continue;
+              }
+              processed.add(ent);
+              updatedIdealState = ent._updater.apply(updatedIdealState);
+              ent._updatedIdealState = updatedIdealState;
+              ent._exception = null;
+              it.remove();
             }
-            processed.add(ent);
-            updatedIdealState = ent._updater.apply(idealStateCopy);
-            ent._updatedIdealState = updatedIdealState;
-            it.remove();
+            return updatedIdealState;
+          }, retryPolicy, noChangeOk);
+          if (response == null) {
+            RuntimeException ex = new RuntimeException("Failed to update 
IdealState");
+            for (Entry ent : processed) {
+              ent._exception = ex;
+              ent._updatedIdealState = null;
+            }
+            throw ex;
+          }
+        } catch (Throwable e) {
+          // If the update failed, we should re-add all entries to the queue
+          for (Entry ent : processed) {
+            ent._exception = e;
+            ent._updatedIdealState = null;
           }
-          IdealState finalUpdatedIdealState = updatedIdealState;
-          updateIdealState(helixManager, resourceName, anyIdealState -> 
finalUpdatedIdealState,
-              retryPolicy, noChangeOk);
+          throw e;

Review Comment:
   hmm so I mean to say --
   
   The caller's interface is this --
   
   ```java
   public IdealState commit(HelixManager helixManager, String resourceName, 
Function<IdealState, IdealState> updater, RetryPolicy retryPolicy, boolean 
noChangeOk);
   ```
   
   Since we hash out of 100 buckets, there exists some chance (though quite 
low) that we may need to process some other resource before the Entry we are 
interested in.
   
   So the caller might be interested in `resourceName <- A` but 
`mergedResourceName <- B`. So an exceptioni raised while trying to perform `B` 
should not be thrown to `A` since it is not related to the callers intent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to