slfan1989 commented on code in PR #4892:
URL: https://github.com/apache/hadoop/pull/4892#discussion_r985217417
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java:
##########
@@ -1456,28 +1460,178 @@ public Response
cancelDelegationToken(HttpServletRequest hsr)
@Override
public Response createNewReservation(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
- throw new NotImplementedException("Code is not implemented");
+ long startTime = clock.getTime();
+
+ Map<SubClusterId, SubClusterInfo> subClustersActive;
+ try {
+ subClustersActive = federationFacade.getSubClusters(true);
+ } catch (YarnException e) {
+ routerMetrics.incrGetNewReservationFailedRetrieved();
+ return Response.status(Status.INTERNAL_SERVER_ERROR).
+ entity(e.getLocalizedMessage()).build();
+ }
+
+ List<SubClusterId> blacklist = new ArrayList<>();
+
+ for (int i = 0; i < numSubmitRetries; ++i) {
+ SubClusterId subClusterId = null;
+ try {
+ subClusterId = getRandomActiveSubCluster(subClustersActive, blacklist);
+ SubClusterInfo subClusterInfo = subClustersActive.get(subClusterId);
+ DefaultRequestInterceptorREST interceptor =
getOrCreateInterceptorForSubCluster(
+ subClusterId, subClusterInfo.getRMWebServiceAddress());
+ Response response = interceptor.createNewReservation(hsr);
+ LOG.info("createNewReservation try #{} on SubCluster {}.", i,
subClusterId);
+ if (response != null && response.getStatus() ==
HttpServletResponse.SC_OK) {
+ long stopTime = clock.getTime();
+ routerMetrics.succeededGetNewReservationRetrieved(stopTime -
startTime);
+ return response;
+ } else {
+ // Empty response from the ResourceManager.
+ // Blacklist this subCluster for this request.
+ blacklist.add(subClusterId);
+ }
+ } catch (YarnException e) {
+ routerMetrics.incrGetNewReservationFailedRetrieved();
+ subClustersActive.remove(subClusterId);
Review Comment:
Personally, I think remove `subClusterId` from `subClustersActive` and add
`subClusterId` in blackList are equivalent operations.
Within the scope of numSubmitRetries, the following operations will be
performed:
1.Randomly pick a subClusterId to ensure that it is not in the blacklist.
In this step, it is also necessary to remove blackList from
ActiveSubCluster
```
private SubClusterId getRandomActiveSubCluster(
Map<SubClusterId, SubClusterInfo> activeSubclusters,
List<SubClusterId> blackListSubClusters) throws YarnException {
...
if (blackListSubClusters != null) {
keySet.removeAll(blackListSubClusters);
}
List<SubClusterId> list = keySet.stream().collect(Collectors.toList());
return list.get(rand.nextInt(list.size()));
}
```
2. Using the selected subClusterId, call createNewReservation.
3. If step 2 is successful, return the result, otherwise add subClusterId to
blacklist.
> Also in the blacklist no?
I read this part of the code carefully, when a subClusterId is accessed
abnormally, it is a better choice to put it into blackList, Removing
SubClusterId directly from active Subclusters will also work, But it seems
better to put directly into blackList.
> And are we sure subClustersActive contains subClusterId?
This should be guaranteed, this part of the code is executed serially and
there will be no extra thread interference, so make sure to remove the
subClusterId.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]