I try to define several Yarn queues so that the cluster will always be fully used and once a new task will be added to different queue the resources will be divided (several workers in the first queue will be preempted)
For that purpose I use FairScheduler and rely on the documentation: Hadoop-FairScheduler <https://hadoop.apache.org/docs/r2.7.1/hadoop-yarn/hadoop-yarn-site/FairScheduler.html> and Cloudera-FairScheduler <https://www.cloudera.com/documentation/enterprise/5-8-x/topics/admin_fair_scheduler.html> . I run Yarn and Spark from Ambari and the interesting configurations which are set set: In yarn-site.xml: yarn.resourcemanager.scheduler.monitor.enable=true yarn.resourcemanager.scheduler.class=org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler yarn.resourcemanager.scheduler.monitor.enable=true I define the queues in fair-scheduler.xml: <?xml version="1.0"?> <allocations> <defaultMinSharePreemptionTimeout>1</defaultMinSharePreemptionTimeout> <defaultFairSharePreemptionTimeout>1</defaultFairSharePreemptionTimeout> <defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy> <defaultFairSharePreemptionThreshold>0.5</defaultFairSharePreemptionThreshold>. <queue name="team1" type="parent"> <minResources>20000 mb,2vcores</minResources> <weight>1.0</weight> </queue> <queue name="team2" type="parent"> <minResources>20000 mb,2vcores</minResources> </queue> <queue name="team3" type="parent"> <minResources>20000 mb,2vcores</minResources> <fairSharePreemptionThreshold>1.0</fairSharePreemptionThreshold> <weight>10.0</weight> </queue> <queueMaxAMShareDefault>0.5</queueMaxAMShareDefault> <queueMaxResourcesDefault>40000 mb,0vcores</queueMaxResourcesDefault> <!-- Queue 'secondary_group_queue' is a parent queue and may have user queues under it --> <user name="sample_user"> <maxRunningApps>30</maxRunningApps> </user> <userMaxAppsDefault>5</userMaxAppsDefault> <queuePlacementPolicy> <rule name="specified" /> <rule name="primaryGroup" create="false" /> <rule name="nestedUserQueue"> <rule name="secondaryGroupExistingQueue" create="false" /> </rule> <rule name="default" queue="team5"/> </queuePlacementPolicy> </allocations> I have a simple variation of the calcPi example as the application I run (simple while loop which constantly counts pi): while(true){ SparkSession spark = SparkSession .builder() .appName("JavaPipelineExample") .getOrCreate(); List<Integer> l = new ArrayList<>(NUM_SAMPLES); for (int i = 0; i < NUM_SAMPLES; i++) { l.add(i); } JavaRDD<Integer> inputRDD = new JavaSparkContext(spark.sparkContext()).parallelize(l).coalesce(100).repartition(100); System.out.println(String.format("Data split to %s partitions", inputRDD.partitions().size()) ); long count = inputRDD.filter(i -> { double x = Math.random(); double y = Math.random(); return x*x + y*y < 1; }).count(); System.out.println("Pi is roughly " + 4.0 * count / NUM_SAMPLES); } And to run it I open two different terminals, first I run the first application (in queue 1). Then I inspect to see that it occupied all the resources and start the second application (in queue 2). I suspect the scheduler to preempt the application in queue 1 and share the resources to queue 2 but it doesn't happen: Run application 1: /usr/hdp/current/spark2-client/bin/spark-submit --master yarn --class com.comp.CalculatePi --num-executors 25 --executor-cores 6 --queue team2.aa /root/calcpi-1.0-SNAPSHOT.jar After running it I see in Yarn management panel that 37/54 vcores are used. And I run: /usr/hdp/current/spark2-client/bin/spark-submit --master yarn --class com.comp.CalculatePi --num-executors 10 --executor-cores 4 --queue team1.aa /root/calcpi-1.0-SNAPSHOT.jar Now I see 38/54 vcores are used and the application is successfully submitted, although it doesn't start and I get the message: [Timer-0] WARN org.apache.spark.scheduler.cluster.YarnScheduler - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources What is the problem here? Why Yarn doesn't run those two applications together and preempt the first?
