This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push: new 0f81c131fc KYLIN-5372 If the Job occurs permission exception error ,then let the Job failed instead of retry.(#29259) 0f81c131fc is described below commit 0f81c131fc2e11b654b7dd2e7290213e41a6ff90 Author: huangsheng <huangshen...@163.com> AuthorDate: Sat Oct 29 10:48:23 2022 +0800 KYLIN-5372 If the Job occurs permission exception error ,then let the Job failed instead of retry.(#29259) KYLIN-5372 If the Job occurs permission exception ,then let the Job failed instead of retry. --- .../engine/spark/application/SparkApplication.java | 5 + .../org/apache/spark/application/JobWorker.scala | 8 +- .../apache/spark/application/TestJobMonitor.scala | 2 +- .../apache/spark/application/TestJobWorker.scala | 141 ++++++++++++++++++++- 4 files changed, 153 insertions(+), 3 deletions(-) diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java index 9cc144902d..2e8a503156 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.AccessControlException; import org.apache.kylin.cluster.IClusterManager; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.exception.KylinException; @@ -324,6 +325,10 @@ public abstract class SparkApplication implements Application { } protected void handleException(Exception e) throws Exception { + if (e instanceof AccessControlException) { + logger.error("Permission denied.", e); + throw new NoRetryException("Permission denied."); + } throw e; } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala index 069ac171cc..77b8834f02 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala @@ -18,6 +18,7 @@ package org.apache.spark.application + import java.util.concurrent.Executors import org.apache.kylin.engine.spark.application.SparkApplication @@ -48,7 +49,6 @@ class JobWorker(application: SparkApplication, args: Array[String], eventLoop: K execute() } - private def execute(): Unit = { pool.execute(new Runnable { override def run(): Unit = { @@ -56,6 +56,12 @@ class JobWorker(application: SparkApplication, args: Array[String], eventLoop: K application.execute(args) eventLoop.post(JobSucceeded()) } catch { + // Compatible with runtime exceptions thrown by the SparkApplication.execute(args: Array[String]) + case runtimeException: RuntimeException => + runtimeException.getCause match { + case noRetryException: NoRetryException => eventLoop.post(UnknownThrowable(noRetryException)) + case throwable: Throwable => eventLoop.post(ResourceLack(throwable)) + } case exception: NoRetryException => eventLoop.post(UnknownThrowable(exception)) case throwable: Throwable => eventLoop.post(ResourceLack(throwable)) } diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala index b9a1ff87c2..1864d8749c 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala @@ -22,6 +22,7 @@ import java.util import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicBoolean import com.amazonaws.services.s3.model.AmazonS3Exception +import org.apache.hadoop.security.AccessControlException import org.apache.kylin.cluster.{AvailableResource, IClusterManager, ResourceInfo} import org.apache.kylin.common.KylinConfig import org.apache.kylin.engine.spark.job.KylinBuildEnv @@ -307,7 +308,6 @@ class TestJobMonitor extends SparderBaseFunSuite with BeforeAndAfterEach { } } - test("post JobFailed event when receive class not found event") { withEventLoop { eventLoop => Mockito.when(config.getSparkEngineMaxRetryTime).thenReturn(1) diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala index 73035a24c3..64afc26b85 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala @@ -18,9 +18,10 @@ package org.apache.spark.application +import org.apache.hadoop.security.AccessControlException + import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicBoolean - import org.apache.kylin.engine.spark.application.SparkApplication import org.apache.kylin.engine.spark.scheduler._ import org.apache.spark.scheduler.KylinJobEventLoop @@ -53,6 +54,30 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter { eventLoop.stop() } + test("post ResourceLack event when job failed with runtime exception for lack of resource") { + val eventLoop = new KylinJobEventLoop + eventLoop.start() + val worker = new JobWorker(new ResourceLackJobWithRuntimeException(), Array.empty, eventLoop) + val latch = new CountDownLatch(2) + val receiveResourceLack = new AtomicBoolean(false) + val listener = new KylinJobListener { + override def onReceive(event: KylinJobEvent): Unit = { + if (event.isInstanceOf[ResourceLack]) { + receiveResourceLack.getAndSet(true) + } + latch.countDown() + } + } + eventLoop.registerListener(listener) + eventLoop.post(RunJob()) + // receive RunJob and ResourceLack + latch.await() + assert(receiveResourceLack.get()) + eventLoop.unregisterListener(listener) + worker.stop() + eventLoop.stop() + } + test("post JobSucceeded event when job succeeded") { val eventLoop = new KylinJobEventLoop eventLoop.start() @@ -100,6 +125,78 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter { worker.stop() eventLoop.stop() } + + test("post Permission denied event when PermissionDenied occurred with handle Exception function") { + val eventLoop = new KylinJobEventLoop + eventLoop.start() + val worker = new JobWorker(new PermissionDeniedJobWithHandleException(), Array.empty, eventLoop) + val latch = new CountDownLatch(2) + val receivePermissionDenied = new AtomicBoolean(false) + val listener = new KylinJobListener { + override def onReceive(event: KylinJobEvent): Unit = { + if (event.isInstanceOf[UnknownThrowable]) { + receivePermissionDenied.getAndSet(true) + } + latch.countDown() + } + } + eventLoop.registerListener(listener) + eventLoop.post(RunJob()) + // receive RunJob and PermissionDenied + latch.await() + assert(receivePermissionDenied.get()) + eventLoop.unregisterListener(listener) + worker.stop() + eventLoop.stop() + } + + test("post Permission denied event when RuntimeException occurred") { + val eventLoop = new KylinJobEventLoop + eventLoop.start() + val worker = new JobWorker(new PermissionDeniedJobWithRuntimeException(), Array.empty, eventLoop) + val latch = new CountDownLatch(2) + val receivePermissionDenied = new AtomicBoolean(false) + val listener = new KylinJobListener { + override def onReceive(event: KylinJobEvent): Unit = { + if (event.isInstanceOf[UnknownThrowable]) { + receivePermissionDenied.getAndSet(true) + } + latch.countDown() + } + } + eventLoop.registerListener(listener) + eventLoop.post(RunJob()) + // receive RunJob and PermissionDenied + latch.await() + assert(receivePermissionDenied.get()) + eventLoop.unregisterListener(listener) + worker.stop() + eventLoop.stop() + } + + test("post Permission denied event when AccessControlException occurred") { + val eventLoop = new KylinJobEventLoop + eventLoop.start() + val worker = new JobWorker(new PermissionDeniedJobWithNoRetryException(), Array.empty, eventLoop) + val latch = new CountDownLatch(2) + val receivePermissionDenied = new AtomicBoolean(false) + val listener = new KylinJobListener { + override def onReceive(event: KylinJobEvent): Unit = { + if (event.isInstanceOf[UnknownThrowable]) { + receivePermissionDenied.getAndSet(true) + } + latch.countDown() + } + } + eventLoop.registerListener(listener) + eventLoop.post(RunJob()) + // receive RunJob and PermissionDenied + latch.await() + assert(receivePermissionDenied.get()) + eventLoop.unregisterListener(listener) + worker.stop() + eventLoop.stop() + } } class UnknownThrowableJob extends SparkApplication { @@ -110,6 +207,35 @@ class UnknownThrowableJob extends SparkApplication { override protected def doExecute(): Unit = {} } +class PermissionDeniedJobWithHandleException extends SparkApplication { + override def execute(args: Array[String]): Unit = { + try { + throw new AccessControlException() + } catch { + case e : Exception => handleException(e) + } + } + override protected def doExecute(): Unit = {} +} + +class PermissionDeniedJobWithRuntimeException extends SparkApplication { + override def execute(args: Array[String]): Unit = { + try { + throw new AccessControlException() + } catch { + case e : Exception => throw new RuntimeException("Error execute " + this.getClass.getName, new NoRetryException("Permission denied.")) + } + } + override protected def doExecute(): Unit = {} +} + +class PermissionDeniedJobWithNoRetryException extends SparkApplication { + override def execute(args: Array[String]): Unit = { + throw new NoRetryException("Permission Denied") + } + override protected def doExecute(): Unit = {} +} + class ResourceLackJob extends SparkApplication { override def execute(args: Array[String]): Unit = { @@ -119,6 +245,19 @@ class ResourceLackJob extends SparkApplication { override protected def doExecute(): Unit = {} } +class ResourceLackJobWithRuntimeException extends SparkApplication { + + override def execute(args: Array[String]): Unit = { + try { + throw new Exception() + } catch { + case e: Exception => throw new RuntimeException("Error execute " + this.getClass.getName, e) + } + } + + override protected def doExecute(): Unit = {} +} + class MockSucceedJob extends SparkApplication { override def execute(args: Array[String]): Unit = {}