goiri commented on code in PR #4892:
URL: https://github.com/apache/hadoop/pull/4892#discussion_r982893961


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java:
##########
@@ -222,4 +236,115 @@ public static void logAndThrowRunTimeException(Throwable 
t, String errMsgFormat,
       throw new RuntimeException(msg);
     }
   }
+
+  /**
+   * Save Reservation And HomeSubCluster Mapping.
+   *
+   * @param federationFacade federation facade
+   * @param reservationId reservationId
+   * @param homeSubCluster homeSubCluster
+   * @throws YarnException on failure
+   */
+  public static void addReservationHomeSubCluster(FederationStateStoreFacade 
federationFacade,
+      ReservationId reservationId, ReservationHomeSubCluster homeSubCluster) 
throws YarnException {
+    try {
+      // persist the mapping of reservationId and the subClusterId which has
+      // been selected as its home
+      federationFacade.addReservationHomeSubCluster(homeSubCluster);
+    } catch (YarnException e) {
+      RouterServerUtil.logAndThrowException(e,
+          "Unable to insert the ReservationId %s into the 
FederationStateStore.", reservationId);
+    }
+  }
+
+  /**
+   * Update Reservation And HomeSubCluster Mapping.
+   *
+   * @param federationFacade federation facade
+   * @param subClusterId subClusterId
+   * @param reservationId reservationId
+   * @param homeSubCluster homeSubCluster
+   * @throws YarnException on failure
+   */
+  public static void 
updateReservationHomeSubCluster(FederationStateStoreFacade federationFacade,
+      SubClusterId subClusterId, ReservationId reservationId,
+      ReservationHomeSubCluster homeSubCluster) throws YarnException {
+    try {
+      // update the mapping of reservationId and the home subClusterId to
+      // the new subClusterId we have selected
+      federationFacade.updateReservationHomeSubCluster(homeSubCluster);
+    } catch (YarnException e) {
+      SubClusterId subClusterIdInStateStore =
+          federationFacade.getReservationHomeSubCluster(reservationId);
+      if (subClusterId == subClusterIdInStateStore) {
+        LOG.info("Reservation {} already submitted on SubCluster {}.", 
reservationId, subClusterId);
+      } else {
+        RouterServerUtil.logAndThrowException(e,
+            "Unable to update the ReservationId %s into the 
FederationStateStore.", reservationId);
+      }
+    }
+  }
+
+  /**
+   * Exists ReservationHomeSubCluster Mapping.
+   *
+   * @param federationFacade federation facade
+   * @param reservationId reservationId
+   * @return true - exist, false - not exist
+   */
+  public static Boolean 
existsReservationHomeSubCluster(FederationStateStoreFacade federationFacade,
+      ReservationId reservationId) {
+    try {
+      SubClusterId subClusterId = 
federationFacade.getReservationHomeSubCluster(reservationId);
+      if (subClusterId != null) {
+        return true;
+      }
+    } catch (YarnException e) {
+      LOG.warn("get homeSubCluster by reservationId = {} error.", 
reservationId, e);
+    }
+    return false;
+  }
+
+  public static ReservationDefinition convertReservationDefinition(
+      ReservationDefinitionInfo definitionInfo) {
+
+    // basic variable
+    long arrival = definitionInfo.getArrival();
+    long deadline = definitionInfo.getDeadline();
+
+    // ReservationRequests reservationRequests
+    String name = definitionInfo.getReservationName();
+    String recurrenceExpression = definitionInfo.getRecurrenceExpression();
+    Priority priority = Priority.newInstance(definitionInfo.getPriority());
+
+    // reservation requests info
+    List<ReservationRequest> reservationRequestList = new ArrayList<>();
+
+    ReservationRequestsInfo reservationRequestsInfo = 
definitionInfo.getReservationRequests();
+
+    List<ReservationRequestInfo> reservationRequestInfos =
+        reservationRequestsInfo.getReservationRequest();
+
+    for (ReservationRequestInfo resRequestInfo : reservationRequestInfos) {
+      ResourceInfo resourceInfo = resRequestInfo.getCapability();
+      Resource capability =
+          Resource.newInstance(resourceInfo.getMemorySize(), 
resourceInfo.getvCores());
+      ReservationRequest reservationRequest = 
ReservationRequest.newInstance(capability,
+          resRequestInfo.getNumContainers(), 
resRequestInfo.getMinConcurrency(),
+          resRequestInfo.getDuration());
+      reservationRequestList.add(reservationRequest);
+    }
+
+    ReservationRequestInterpreter[] values = 
ReservationRequestInterpreter.values();
+    ReservationRequestInterpreter reservationRequestInterpreter =
+        values[reservationRequestsInfo.getReservationRequestsInterpreter()];
+    ReservationRequests reservationRequests =
+        ReservationRequests.newInstance(reservationRequestList, 
reservationRequestInterpreter);
+
+    ReservationDefinition definition =

Review Comment:
   This?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java:
##########
@@ -1456,28 +1460,177 @@ public Response 
cancelDelegationToken(HttpServletRequest hsr)
   @Override
   public Response createNewReservation(HttpServletRequest hsr)
       throws AuthorizationException, IOException, InterruptedException {
-    throw new NotImplementedException("Code is not implemented");
+    long startTime = clock.getTime();
+
+    Map<SubClusterId, SubClusterInfo> subClustersActive;
+    try {
+      subClustersActive = federationFacade.getSubClusters(true);
+    } catch (YarnException e) {
+      routerMetrics.incrGetNewReservationFailedRetrieved();
+      return Response.status(Status.INTERNAL_SERVER_ERROR).
+          entity(e.getLocalizedMessage()).build();
+    }
+
+    List<SubClusterId> blacklist = new ArrayList<>();
+
+    for (int i = 0; i < numSubmitRetries; ++i) {
+      SubClusterId subClusterId = null;
+      try {
+        subClusterId = getRandomActiveSubCluster(subClustersActive, blacklist);
+        SubClusterInfo subClusterInfo = subClustersActive.get(subClusterId);
+        DefaultRequestInterceptorREST interceptor = 
getOrCreateInterceptorForSubCluster(
+            subClusterId, subClusterInfo.getRMWebServiceAddress());
+        Response response = interceptor.createNewReservation(hsr);
+        LOG.info("createNewReservation try #{} on SubCluster {}.", i, 
subClusterId);
+        if (response != null && response.getStatus() == 
HttpServletResponse.SC_OK) {
+          long stopTime = clock.getTime();
+          routerMetrics.succeededGetNewReservationRetrieved(stopTime - 
startTime);
+          return response;
+        } else {
+          // Empty response from the ResourceManager.
+          // Blacklist this subCluster for this request.
+          blacklist.add(subClusterId);
+        }
+      } catch (YarnException e) {
+        routerMetrics.incrGetNewReservationFailedRetrieved();

Review Comment:
   Should we disallow this subclusterid if available?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to