dinoocch commented on code in PR #14214:
URL: https://github.com/apache/pinot/pull/14214#discussion_r1800003998


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java:
##########
@@ -124,34 +124,27 @@ public IdealState commit(HelixManager helixManager, 
String resourceName,
           Entry first = queue._pending.poll();
           processed.add(first);
           String mergedResourceName = first._resourceName;
-          HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
-          PropertyKey idealStateKey = 
dataAccessor.keyBuilder().idealStates(resourceName);
-          IdealState idealState = dataAccessor.getProperty(idealStateKey);
-
-          // Make a copy of the idealState above to pass it to the updater
-          // NOTE: new IdealState(idealState.getRecord()) does not work 
because it's shallow copy for map fields and
-          // list fields
-          IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState);
 
-          /**
-           * If the local cache does not contain a value, need to check if 
there is a
-           * value in ZK; use it as initial value if exists
-           */
-          IdealState updatedIdealState = first._updater.apply(idealStateCopy);
-          first._updatedIdealState = updatedIdealState;
-          Iterator<Entry> it = queue._pending.iterator();
-          while (it.hasNext()) {
-            Entry ent = it.next();
-            if (!ent._resourceName.equals(mergedResourceName)) {
-              continue;
-            }
-            processed.add(ent);
-            updatedIdealState = ent._updater.apply(idealStateCopy);
-            ent._updatedIdealState = updatedIdealState;
-            it.remove();
-          }
-          IdealState finalUpdatedIdealState = updatedIdealState;
-          updateIdealState(helixManager, resourceName, anyIdealState -> 
finalUpdatedIdealState,
+          updateIdealState(helixManager, mergedResourceName, idealState -> {
+                // Make a copy of the idealState above to pass it to the 
updater
+                // NOTE: new IdealState(idealState.getRecord()) does not work 
because it's shallow copy for map fields
+                // and list fields
+                IdealState idealStateCopy = 
HelixHelper.cloneIdealState(idealState);
+                IdealState updatedIdealState = 
first._updater.apply(idealStateCopy);
+                first._updatedIdealState = updatedIdealState;
+                Iterator<Entry> it = queue._pending.iterator();
+                while (it.hasNext()) {
+                  Entry ent = it.next();
+                  if (!ent._resourceName.equals(mergedResourceName)) {
+                    continue;
+                  }
+                  processed.add(ent);
+                  updatedIdealState = ent._updater.apply(updatedIdealState);
+                  ent._updatedIdealState = updatedIdealState;
+                  it.remove();
+                }
+                return updatedIdealState;
+              },
               retryPolicy, noChangeOk);

Review Comment:
   @mcvsubbu do you mean in the case of an exception writing to zk?
   
   I'm not fully sure of the intent, but the current code has the following 
behavior:
   
   * The executing thread will throw the exception
   * Any grouped executions would `return null`
   
   Particularly, the code uses `Entry._sent` to indicate an entry was already 
processed and this is the condition for the while loop. So I believe the worst 
case is that we might either (a) run one extra iteration of the processing or 
(b) wait 10ms.
   
   We _do_ lose the exception though in all other threads (I don't know if that 
is an issue or not though/if null is enough to imply an error).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to