mcvsubbu commented on code in PR #14214: URL: https://github.com/apache/pinot/pull/14214#discussion_r1799973196
########## pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java: ########## @@ -124,34 +124,27 @@ public IdealState commit(HelixManager helixManager, String resourceName, Entry first = queue._pending.poll(); processed.add(first); String mergedResourceName = first._resourceName; - HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor(); - PropertyKey idealStateKey = dataAccessor.keyBuilder().idealStates(resourceName); - IdealState idealState = dataAccessor.getProperty(idealStateKey); - - // Make a copy of the idealState above to pass it to the updater - // NOTE: new IdealState(idealState.getRecord()) does not work because it's shallow copy for map fields and - // list fields - IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState); - /** - * If the local cache does not contain a value, need to check if there is a - * value in ZK; use it as initial value if exists - */ - IdealState updatedIdealState = first._updater.apply(idealStateCopy); - first._updatedIdealState = updatedIdealState; - Iterator<Entry> it = queue._pending.iterator(); - while (it.hasNext()) { - Entry ent = it.next(); - if (!ent._resourceName.equals(mergedResourceName)) { - continue; - } - processed.add(ent); - updatedIdealState = ent._updater.apply(idealStateCopy); - ent._updatedIdealState = updatedIdealState; - it.remove(); - } - IdealState finalUpdatedIdealState = updatedIdealState; - updateIdealState(helixManager, resourceName, anyIdealState -> finalUpdatedIdealState, + updateIdealState(helixManager, mergedResourceName, idealState -> { + // Make a copy of the idealState above to pass it to the updater + // NOTE: new IdealState(idealState.getRecord()) does not work because it's shallow copy for map fields + // and list fields + IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState); + IdealState updatedIdealState = first._updater.apply(idealStateCopy); + first._updatedIdealState = updatedIdealState; + Iterator<Entry> it = queue._pending.iterator(); + while (it.hasNext()) { + Entry ent = it.next(); + if (!ent._resourceName.equals(mergedResourceName)) { + continue; + } + processed.add(ent); + updatedIdealState = ent._updater.apply(updatedIdealState); + ent._updatedIdealState = updatedIdealState; + it.remove(); + } + return updatedIdealState; + }, retryPolicy, noChangeOk); Review Comment: If I understand the code+fix correctly, - We could have multiple threads adding different segments to the idealstate. - This case is fixed in line 142 by passing `updatedIdealState` to the updater, so that all updates are applied _cumulatively_. - The first thread that enters this logic may end updating all of the segments that other threads have added to the queue. - The other threads are busy-waiting, and will find the queue empty when their turn comes (i.e. they set `queue._running` to themselves and get to examine the queue), and will return SUCCESS to the caller (irrespective of whether the updating thread has succeeded or not). Is my understanding right? If so, we should add a status of some sort to each entry, and have only the thread that enters the entry remove it, so that all threads can return errors. Otherwise, the inbound call (a segment push) that caused the first thread to happen will think that it could not push the segment, and a retry may succeed pushing that segment. The other segment pushers will think that their push succeeded and may never retry. @dinoocch or @jasperjiaguo can you validate this? In fact, things may get more interesting if the retries start to get added as new entries ... (can that happen?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org