mcvsubbu commented on code in PR #14214:
URL: https://github.com/apache/pinot/pull/14214#discussion_r1799973196


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java:
##########
@@ -124,34 +124,27 @@ public IdealState commit(HelixManager helixManager, 
String resourceName,
           Entry first = queue._pending.poll();
           processed.add(first);
           String mergedResourceName = first._resourceName;
-          HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
-          PropertyKey idealStateKey = 
dataAccessor.keyBuilder().idealStates(resourceName);
-          IdealState idealState = dataAccessor.getProperty(idealStateKey);
-
-          // Make a copy of the idealState above to pass it to the updater
-          // NOTE: new IdealState(idealState.getRecord()) does not work 
because it's shallow copy for map fields and
-          // list fields
-          IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState);
 
-          /**
-           * If the local cache does not contain a value, need to check if 
there is a
-           * value in ZK; use it as initial value if exists
-           */
-          IdealState updatedIdealState = first._updater.apply(idealStateCopy);
-          first._updatedIdealState = updatedIdealState;
-          Iterator<Entry> it = queue._pending.iterator();
-          while (it.hasNext()) {
-            Entry ent = it.next();
-            if (!ent._resourceName.equals(mergedResourceName)) {
-              continue;
-            }
-            processed.add(ent);
-            updatedIdealState = ent._updater.apply(idealStateCopy);
-            ent._updatedIdealState = updatedIdealState;
-            it.remove();
-          }
-          IdealState finalUpdatedIdealState = updatedIdealState;
-          updateIdealState(helixManager, resourceName, anyIdealState -> 
finalUpdatedIdealState,
+          updateIdealState(helixManager, mergedResourceName, idealState -> {
+                // Make a copy of the idealState above to pass it to the 
updater
+                // NOTE: new IdealState(idealState.getRecord()) does not work 
because it's shallow copy for map fields
+                // and list fields
+                IdealState idealStateCopy = 
HelixHelper.cloneIdealState(idealState);
+                IdealState updatedIdealState = 
first._updater.apply(idealStateCopy);
+                first._updatedIdealState = updatedIdealState;
+                Iterator<Entry> it = queue._pending.iterator();
+                while (it.hasNext()) {
+                  Entry ent = it.next();
+                  if (!ent._resourceName.equals(mergedResourceName)) {
+                    continue;
+                  }
+                  processed.add(ent);
+                  updatedIdealState = ent._updater.apply(updatedIdealState);
+                  ent._updatedIdealState = updatedIdealState;
+                  it.remove();
+                }
+                return updatedIdealState;
+              },
               retryPolicy, noChangeOk);

Review Comment:
   If I understand the code+fix correctly,
   - We could have multiple threads adding different segments to the idealstate.
   - This case is fixed in line 142 by passing `updatedIdealState` to the 
updater, so that all updates are applied _cumulatively_.
   - The first thread that enters this logic may end updating all of the 
segments that other threads have added to the queue.
   - The other threads are busy-waiting, and will find the queue empty when 
their turn comes (i.e. they set `queue._running` to themselves and get to 
examine the queue), and will return SUCCESS to the caller (irrespective of 
whether the updating thread has succeeded or not).
   
   Is my understanding right? If so, we should add a status of some sort to 
each entry, and have only the thread that enters the entry remove it, so that 
all threads can return errors. Otherwise, the inbound call (a segment push) 
that caused the first thread to happen will think that it could not push the 
segment, and a retry may succeed pushing that segment. The other segment 
pushers will think that their push succeeded and may never retry.
   
   @dinoocch or @jasperjiaguo can you validate this?
   
   In fact, things may get more interesting if the retries start to get added 
as new entries ... (can that happen?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to