slfan1989 commented on code in PR #4810:
URL: https://github.com/apache/hadoop/pull/4810#discussion_r956701424
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java:
##########
@@ -1545,18 +1546,66 @@ private void cacheAllocatedContainers(List<Container>
containers,
+ " from same sub-cluster: {}, so ignoring.",
container.getId(), subClusterId);
} else {
+
+ LOG.info("Duplicate containerID found in the allocated containers. "
+
+ "try to re-pick the sub-cluster.");
+
// The same container allocation from different sub-clusters,
// something is wrong.
- // TODO: YARN-6667 if some subcluster RM is configured wrong, we
- // should not fail the entire heartbeat.
- throw new YarnRuntimeException(
- "Duplicate containerID found in the allocated containers. This"
- + " can happen if the RM epoch is not configured properly."
- + " ContainerId: " + container.getId().toString()
- + " ApplicationId: " + this.attemptId + " From RM: "
- + subClusterId
- + " . Previous container was from sub-cluster: "
- + existingSubClusterId);
+ try {
+
+ Set<SubClusterId> timeOutScs = getTimedOutSCs(true);
+ SubClusterInfo existingSubCluster =
+ federationFacade.getSubCluster(existingSubClusterId);
+ SubClusterInfo newSubCluster =
federationFacade.getSubCluster(subClusterId);
+
+ boolean existAllocatedScHealth = true;
+ boolean newAllocatedScHealth = true;
+
+ // Previous SubCluster Time Out Or Unusable, Can't Continue to use.
+ if (timeOutScs.contains(existingSubClusterId) ||
+ existingSubCluster == null ||
existingSubCluster.getState().isUnusable()) {
+ existAllocatedScHealth = false;
+ }
+
+ // New SubCluster Time Out Or Unusable, Can't Continue to use.
+ if (timeOutScs.contains(existingSubClusterId) ||
+ newSubCluster == null ||
newSubCluster.getState().isUnusable()) {
+ newAllocatedScHealth = false;
+ }
+
+ // If the previous RM which allocated Container is normal,
+ // the previous RM will be used first
+ if ((existAllocatedScHealth && !newAllocatedScHealth) ||
+ (existAllocatedScHealth && newAllocatedScHealth)) {
+ LOG.info("Use Previous Allocated Container's subCluster. " +
+ "ContainerId: {} ApplicationId: {} From RM: {}.",
this.attemptId,
+ container.getId(), existingSubClusterId);
+ continue;
Review Comment:
At the end of the for loop, there is a piece of code like this
```
this.containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
```
If a container uses the previous subcluster, it does not need to execute
this line, so use continue, jump out of the loop, and let the for loop continue
to execute the next round.
This code may not be easy to read, I will refactor this part of the code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]