goiri commented on code in PR #4656:
URL: https://github.com/apache/hadoop/pull/4656#discussion_r934901461


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java:
##########
@@ -177,11 +185,65 @@ public void setHomeSubCluster(SubClusterId 
homeSubCluster) {
   public void setMockActiveSubclusters(int numSubclusters) {
     for (int i = 1; i <= numSubclusters; i++) {
       SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
-      SubClusterInfo sci = mock(SubClusterInfo.class);
-      when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
-      when(sci.getSubClusterId()).thenReturn(sc.toId());
+      SubClusterInfo sci = SubClusterInfo.newInstance(sc.toId(),
+          "dns1:80", "dns1:81", "dns1:82", "dns1:83", 
SubClusterState.SC_RUNNING,
+          System.currentTimeMillis(), "something");
       getActiveSubclusters().put(sc.toId(), sci);
     }
   }
 
+  public String generateClusterMetricsInfo(int id) {
+    long mem = 1024 * getRand().nextInt(277 * 100 - 1);
+    // plant a best cluster
+    if (id == 5) {
+      mem = 1024 * 277 * 100;
+    }
+    String clusterMetrics =
+            "{\"clusterMetrics\":{\"appsSubmitted\":65," + 
"\"appsCompleted\":64,"
+                    + "\"appsPending\":0,\"appsRunning\":0,\"appsFailed\":0,"
+                    + "\"appsKilled\":1,\"reservedMB\":0,\"availableMB\":" + 
mem + ","
+                    + "\"allocatedMB\":0,\"reservedVirtualCores\":0,"
+                    + 
"\"availableVirtualCores\":2216,\"allocatedVirtualCores\":0,"
+                    + "\"containersAllocated\":0,\"containersReserved\":0,"
+                    + "\"containersPending\":0,\"totalMB\":28364800,"
+                    + 
"\"totalVirtualCores\":2216,\"totalNodes\":278,\"lostNodes\":1,"
+                    + "\"unhealthyNodes\":0,\"decommissionedNodes\":0,"
+                    + "\"rebootedNodes\":0,\"activeNodes\":277}}\n";
+
+    return clusterMetrics;
+  }
+
+  public FederationStateStoreFacade getMemoryFacade() throws YarnException {
+
+    // setting up a store and its facade (with caching off)
+    FederationStateStoreFacade fedFacade =

Review Comment:
   One line



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java:
##########
@@ -177,11 +185,65 @@ public void setHomeSubCluster(SubClusterId 
homeSubCluster) {
   public void setMockActiveSubclusters(int numSubclusters) {
     for (int i = 1; i <= numSubclusters; i++) {
       SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
-      SubClusterInfo sci = mock(SubClusterInfo.class);
-      when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
-      when(sci.getSubClusterId()).thenReturn(sc.toId());
+      SubClusterInfo sci = SubClusterInfo.newInstance(sc.toId(),
+          "dns1:80", "dns1:81", "dns1:82", "dns1:83", 
SubClusterState.SC_RUNNING,
+          System.currentTimeMillis(), "something");
       getActiveSubclusters().put(sc.toId(), sci);
     }
   }
 
+  public String generateClusterMetricsInfo(int id) {
+    long mem = 1024 * getRand().nextInt(277 * 100 - 1);
+    // plant a best cluster
+    if (id == 5) {
+      mem = 1024 * 277 * 100;
+    }
+    String clusterMetrics =
+            "{\"clusterMetrics\":{\"appsSubmitted\":65," + 
"\"appsCompleted\":64,"
+                    + "\"appsPending\":0,\"appsRunning\":0,\"appsFailed\":0,"
+                    + "\"appsKilled\":1,\"reservedMB\":0,\"availableMB\":" + 
mem + ","
+                    + "\"allocatedMB\":0,\"reservedVirtualCores\":0,"
+                    + 
"\"availableVirtualCores\":2216,\"allocatedVirtualCores\":0,"
+                    + "\"containersAllocated\":0,\"containersReserved\":0,"
+                    + "\"containersPending\":0,\"totalMB\":28364800,"
+                    + 
"\"totalVirtualCores\":2216,\"totalNodes\":278,\"lostNodes\":1,"
+                    + "\"unhealthyNodes\":0,\"decommissionedNodes\":0,"
+                    + "\"rebootedNodes\":0,\"activeNodes\":277}}\n";
+
+    return clusterMetrics;
+  }
+
+  public FederationStateStoreFacade getMemoryFacade() throws YarnException {
+
+    // setting up a store and its facade (with caching off)
+    FederationStateStoreFacade fedFacade =
+            FederationStateStoreFacade.getInstance();
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, "0");

Review Comment:
   setInt?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java:
##########
@@ -63,4 +71,108 @@ public void validate(ApplicationSubmissionContext 
appSubmissionContext)
     }
   }
 
+  /**
+   * This method is implemented by the specific policy, and it is used to route
+   * both reservations, and applications among a given set of
+   * sub-clusters.
+   *
+   * @param queue the queue for this application/reservation
+   * @param preSelectSubClusters a pre-filter set of sub-clusters
+   * @return the chosen sub-cluster
+   *
+   * @throws YarnException if the policy fails to choose a sub-cluster
+   */
+  protected abstract SubClusterId chooseSubCluster(String queue,
+      Map<SubClusterId, SubClusterInfo> preSelectSubClusters) throws 
YarnException;
+
+  /**
+   * Filter chosen SubCluster based on reservationId.
+   *
+   * @param reservationId the globally unique identifier for a reservation.
+   * @param activeSubClusters the map of ids to info for all active 
subclusters.
+   * @return the chosen sub-cluster
+   * @throws YarnException if the policy fails to choose a sub-cluster
+   */
+  protected Map<SubClusterId, SubClusterInfo> prefilterSubClusters(
+      ReservationId reservationId, Map<SubClusterId, SubClusterInfo> 
activeSubClusters)
+      throws YarnException {
+
+    // if a reservation exists limit scope to the sub-cluster this
+    // reservation is mapped to
+    // TODO: Implemented in YARN-11236
+    return activeSubClusters;
+  }
+
+  /**
+   * Simply picks from alphabetically-sorted active subclusters based on the
+   * hash of quey name. Jobs of the same queue will all be routed to the same
+   * sub-cluster, as far as the number of active sub-cluster and their names
+   * remain the same.
+   *
+   * @param appContext the {@link ApplicationSubmissionContext} that
+   *          has to be routed to an appropriate subCluster for execution.
+   *
+   * @param blackLists the list of subClusters as identified by
+   *          {@link SubClusterId} to blackList from the selection of the home
+   *          subCluster.
+   *
+   * @return a hash-based chosen {@link SubClusterId} that will be the "home"
+   *         for this application.
+   *
+   * @throws YarnException if there are no active subclusters.
+   */
+  @Override
+  public SubClusterId getHomeSubcluster(ApplicationSubmissionContext 
appContext,
+      List<SubClusterId> blackLists) throws YarnException {
+
+    // null checks and default-queue behavior
+    validate(appContext);
+
+    // apply filtering based on reservation location and active sub-clusters
+    Map<SubClusterId, SubClusterInfo> filteredSubClusters = 
prefilterSubClusters(
+            appContext.getReservationID(), getActiveSubclusters());
+
+    FederationPolicyUtils.validateSubClusterAvailability(

Review Comment:
   Can we have `validateSubClusterAvailability` taking Collection so we don't 
need temp data structures?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java:
##########
@@ -46,12 +46,15 @@ public void setUp() throws Exception {
     Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
     Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
 
+    long now = System.currentTimeMillis();

Review Comment:
   Time.now()?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java:
##########
@@ -54,10 +54,11 @@ public void setUp() throws Exception {
 
       // with 5% omit a subcluster
       if (getRand().nextFloat() < 0.95f || i == 5) {
-        SubClusterInfo sci = mock(SubClusterInfo.class);
-        when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
-        when(sci.getSubClusterId()).thenReturn(sc.toId());
-        getActiveSubclusters().put(sc.toId(), sci);
+        long now = System.currentTimeMillis();

Review Comment:
   Time.now()



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java:
##########
@@ -115,4 +117,14 @@ public void testAllBlacklistSubcluster() throws 
YarnException {
       }
     }
   }
+
+  @Test
+  public void testNullReservationContext() throws Exception {
+    FederationRouterPolicy policy =  ((FederationRouterPolicy) getPolicy());

Review Comment:
   Too many spaces after the =



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java:
##########
@@ -131,7 +133,7 @@ public void testIfNoSubclustersWithWeightOne() {
       fail();
     } catch (YarnException ex) {
       Assert.assertTrue(

Review Comment:
   LambdaTestUtils?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java:
##########
@@ -159,6 +159,50 @@ public static void initializePolicyContext(
         new Configuration());
   }
 
+  public static FederationPolicyInitializationContext initializePolicyContext2(
+      ConfigurableFederationPolicy policy, WeightedPolicyInfo policyInfo,
+      Map<SubClusterId, SubClusterInfo> activeSubClusters,
+      FederationStateStoreFacade facade) throws YarnException {
+    FederationPolicyInitializationContext context =
+        new FederationPolicyInitializationContext(null, initResolver(), facade,
+        SubClusterId.newInstance("homesubcluster"));
+    return initializePolicyContext2(context, policy, policyInfo, 
activeSubClusters);
+  }
+
+  public static FederationPolicyInitializationContext initializePolicyContext2(
+      ConfigurableFederationPolicy policy, WeightedPolicyInfo policyInfo,
+      Map<SubClusterId, SubClusterInfo> activeSubClusters)
+      throws YarnException {
+    return initializePolicyContext2(policy, policyInfo, activeSubClusters, 
initFacade());
+  }
+
+  public static FederationPolicyInitializationContext initializePolicyContext2(
+      FederationPolicyInitializationContext fpc,
+      ConfigurableFederationPolicy policy, WeightedPolicyInfo policyInfo,
+      Map<SubClusterId, SubClusterInfo> activeSubClusters)
+      throws YarnException {
+    ByteBuffer buf = policyInfo.toByteBuffer();
+    fpc.setSubClusterPolicyConfiguration(SubClusterPolicyConfiguration
+        .newInstance("queue1", policy.getClass().getCanonicalName(), buf));
+
+    if (fpc.getFederationStateStoreFacade() == null) {
+      FederationStateStoreFacade facade = 
FederationStateStoreFacade.getInstance();
+      FederationStateStore fss = mock(FederationStateStore.class);
+
+      if (activeSubClusters == null) {
+        activeSubClusters = new HashMap<>();
+      }
+      GetSubClustersInfoResponse response =

Review Comment:
   Can we create a generic version of GetSubClustersInfoResponse.newInstance?
   One that takes Collection<String>



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java:
##########
@@ -36,6 +37,7 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+

Review Comment:
   Avoid



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to