dinoocch commented on code in PR #14237: URL: https://github.com/apache/pinot/pull/14237#discussion_r1802060944
########## pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java: ########## @@ -124,35 +125,44 @@ public IdealState commit(HelixManager helixManager, String resourceName, Entry first = queue._pending.poll(); processed.add(first); String mergedResourceName = first._resourceName; - HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor(); - PropertyKey idealStateKey = dataAccessor.keyBuilder().idealStates(resourceName); - IdealState idealState = dataAccessor.getProperty(idealStateKey); - - // Make a copy of the idealState above to pass it to the updater - // NOTE: new IdealState(idealState.getRecord()) does not work because it's shallow copy for map fields and - // list fields - IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState); - - /** - * If the local cache does not contain a value, need to check if there is a - * value in ZK; use it as initial value if exists - */ - IdealState updatedIdealState = first._updater.apply(idealStateCopy); - first._updatedIdealState = updatedIdealState; - Iterator<Entry> it = queue._pending.iterator(); - while (it.hasNext()) { - Entry ent = it.next(); - if (!ent._resourceName.equals(mergedResourceName)) { - continue; + IdealState response = updateIdealState(helixManager, mergedResourceName, idealState -> { + IdealState updatedIdealState = first._updater.apply(idealState); + first._updatedIdealState = updatedIdealState; + first._exception = null; + if (processed.size() > 1) { + queue._pending.addAll(processed.subList(1, processed.size())); + processed.clear(); + processed.add(first); + } + Iterator<Entry> it = queue._pending.iterator(); + while (it.hasNext()) { + Entry ent = it.next(); + if (!ent._resourceName.equals(mergedResourceName)) { + continue; + } + processed.add(ent); + updatedIdealState = ent._updater.apply(updatedIdealState); + ent._updatedIdealState = updatedIdealState; + ent._exception = null; + it.remove(); } - processed.add(ent); - updatedIdealState = ent._updater.apply(idealStateCopy); - ent._updatedIdealState = updatedIdealState; - it.remove(); + return updatedIdealState; + }, retryPolicy, noChangeOk); + if (response == null) { + RuntimeException ex = new RuntimeException("Failed to update IdealState"); + for (Entry ent : processed) { + ent._exception = ex; + ent._updatedIdealState = null; + } + throw ex; + } + } catch (Throwable e) { + // If the update failed, we should re-add all entries to the queue + for (Entry ent : processed) { + ent._exception = e; + ent._updatedIdealState = null; } - IdealState finalUpdatedIdealState = updatedIdealState; - updateIdealState(helixManager, resourceName, anyIdealState -> finalUpdatedIdealState, - retryPolicy, noChangeOk); + throw e; Review Comment: Right, so imagine your queue slot looks like `[A]` Some call to `commit(_, B, _)` is made -- `[A, B]`. Since nothing is running, our thread drives the execution, starting with `A`. Now, consider if `A` fails and raises an exception. The queue will look like: `[B]`, but our thread which "owns" the Entry would have already failed. Worse, some other thread will come and execute our commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org