This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 1edec930c1c6331d4c98452061c8820f2d70f4ae Author: huangsheng <huangshen...@163.com> AuthorDate: Fri Nov 4 16:37:34 2022 +0800 Revert KYLIN-5372 due to this fix cannot override snapshot build scenarios --- .../engine/spark/application/SparkApplication.java | 5 - .../org/apache/spark/application/JobWorker.scala | 8 +- .../apache/spark/application/TestJobMonitor.scala | 2 +- .../apache/spark/application/TestJobWorker.scala | 141 +-------------------- 4 files changed, 3 insertions(+), 153 deletions(-) diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java index 2e8a503156..9cc144902d 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java @@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.AccessControlException; import org.apache.kylin.cluster.IClusterManager; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.exception.KylinException; @@ -325,10 +324,6 @@ public abstract class SparkApplication implements Application { } protected void handleException(Exception e) throws Exception { - if (e instanceof AccessControlException) { - logger.error("Permission denied.", e); - throw new NoRetryException("Permission denied."); - } throw e; } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala index 77b8834f02..069ac171cc 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala @@ -18,7 +18,6 @@ package org.apache.spark.application - import java.util.concurrent.Executors import org.apache.kylin.engine.spark.application.SparkApplication @@ -49,6 +48,7 @@ class JobWorker(application: SparkApplication, args: Array[String], eventLoop: K execute() } + private def execute(): Unit = { pool.execute(new Runnable { override def run(): Unit = { @@ -56,12 +56,6 @@ class JobWorker(application: SparkApplication, args: Array[String], eventLoop: K application.execute(args) eventLoop.post(JobSucceeded()) } catch { - // Compatible with runtime exceptions thrown by the SparkApplication.execute(args: Array[String]) - case runtimeException: RuntimeException => - runtimeException.getCause match { - case noRetryException: NoRetryException => eventLoop.post(UnknownThrowable(noRetryException)) - case throwable: Throwable => eventLoop.post(ResourceLack(throwable)) - } case exception: NoRetryException => eventLoop.post(UnknownThrowable(exception)) case throwable: Throwable => eventLoop.post(ResourceLack(throwable)) } diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala index 1864d8749c..b9a1ff87c2 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala @@ -22,7 +22,6 @@ import java.util import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicBoolean import com.amazonaws.services.s3.model.AmazonS3Exception -import org.apache.hadoop.security.AccessControlException import org.apache.kylin.cluster.{AvailableResource, IClusterManager, ResourceInfo} import org.apache.kylin.common.KylinConfig import org.apache.kylin.engine.spark.job.KylinBuildEnv @@ -308,6 +307,7 @@ class TestJobMonitor extends SparderBaseFunSuite with BeforeAndAfterEach { } } + test("post JobFailed event when receive class not found event") { withEventLoop { eventLoop => Mockito.when(config.getSparkEngineMaxRetryTime).thenReturn(1) diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala index 64afc26b85..73035a24c3 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala @@ -18,10 +18,9 @@ package org.apache.spark.application -import org.apache.hadoop.security.AccessControlException - import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicBoolean + import org.apache.kylin.engine.spark.application.SparkApplication import org.apache.kylin.engine.spark.scheduler._ import org.apache.spark.scheduler.KylinJobEventLoop @@ -54,30 +53,6 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter { eventLoop.stop() } - test("post ResourceLack event when job failed with runtime exception for lack of resource") { - val eventLoop = new KylinJobEventLoop - eventLoop.start() - val worker = new JobWorker(new ResourceLackJobWithRuntimeException(), Array.empty, eventLoop) - val latch = new CountDownLatch(2) - val receiveResourceLack = new AtomicBoolean(false) - val listener = new KylinJobListener { - override def onReceive(event: KylinJobEvent): Unit = { - if (event.isInstanceOf[ResourceLack]) { - receiveResourceLack.getAndSet(true) - } - latch.countDown() - } - } - eventLoop.registerListener(listener) - eventLoop.post(RunJob()) - // receive RunJob and ResourceLack - latch.await() - assert(receiveResourceLack.get()) - eventLoop.unregisterListener(listener) - worker.stop() - eventLoop.stop() - } - test("post JobSucceeded event when job succeeded") { val eventLoop = new KylinJobEventLoop eventLoop.start() @@ -125,78 +100,6 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter { worker.stop() eventLoop.stop() } - - test("post Permission denied event when PermissionDenied occurred with handle Exception function") { - val eventLoop = new KylinJobEventLoop - eventLoop.start() - val worker = new JobWorker(new PermissionDeniedJobWithHandleException(), Array.empty, eventLoop) - val latch = new CountDownLatch(2) - val receivePermissionDenied = new AtomicBoolean(false) - val listener = new KylinJobListener { - override def onReceive(event: KylinJobEvent): Unit = { - if (event.isInstanceOf[UnknownThrowable]) { - receivePermissionDenied.getAndSet(true) - } - latch.countDown() - } - } - eventLoop.registerListener(listener) - eventLoop.post(RunJob()) - // receive RunJob and PermissionDenied - latch.await() - assert(receivePermissionDenied.get()) - eventLoop.unregisterListener(listener) - worker.stop() - eventLoop.stop() - } - - test("post Permission denied event when RuntimeException occurred") { - val eventLoop = new KylinJobEventLoop - eventLoop.start() - val worker = new JobWorker(new PermissionDeniedJobWithRuntimeException(), Array.empty, eventLoop) - val latch = new CountDownLatch(2) - val receivePermissionDenied = new AtomicBoolean(false) - val listener = new KylinJobListener { - override def onReceive(event: KylinJobEvent): Unit = { - if (event.isInstanceOf[UnknownThrowable]) { - receivePermissionDenied.getAndSet(true) - } - latch.countDown() - } - } - eventLoop.registerListener(listener) - eventLoop.post(RunJob()) - // receive RunJob and PermissionDenied - latch.await() - assert(receivePermissionDenied.get()) - eventLoop.unregisterListener(listener) - worker.stop() - eventLoop.stop() - } - - test("post Permission denied event when AccessControlException occurred") { - val eventLoop = new KylinJobEventLoop - eventLoop.start() - val worker = new JobWorker(new PermissionDeniedJobWithNoRetryException(), Array.empty, eventLoop) - val latch = new CountDownLatch(2) - val receivePermissionDenied = new AtomicBoolean(false) - val listener = new KylinJobListener { - override def onReceive(event: KylinJobEvent): Unit = { - if (event.isInstanceOf[UnknownThrowable]) { - receivePermissionDenied.getAndSet(true) - } - latch.countDown() - } - } - eventLoop.registerListener(listener) - eventLoop.post(RunJob()) - // receive RunJob and PermissionDenied - latch.await() - assert(receivePermissionDenied.get()) - eventLoop.unregisterListener(listener) - worker.stop() - eventLoop.stop() - } } class UnknownThrowableJob extends SparkApplication { @@ -207,35 +110,6 @@ class UnknownThrowableJob extends SparkApplication { override protected def doExecute(): Unit = {} } -class PermissionDeniedJobWithHandleException extends SparkApplication { - override def execute(args: Array[String]): Unit = { - try { - throw new AccessControlException() - } catch { - case e : Exception => handleException(e) - } - } - override protected def doExecute(): Unit = {} -} - -class PermissionDeniedJobWithRuntimeException extends SparkApplication { - override def execute(args: Array[String]): Unit = { - try { - throw new AccessControlException() - } catch { - case e : Exception => throw new RuntimeException("Error execute " + this.getClass.getName, new NoRetryException("Permission denied.")) - } - } - override protected def doExecute(): Unit = {} -} - -class PermissionDeniedJobWithNoRetryException extends SparkApplication { - override def execute(args: Array[String]): Unit = { - throw new NoRetryException("Permission Denied") - } - override protected def doExecute(): Unit = {} -} - class ResourceLackJob extends SparkApplication { override def execute(args: Array[String]): Unit = { @@ -245,19 +119,6 @@ class ResourceLackJob extends SparkApplication { override protected def doExecute(): Unit = {} } -class ResourceLackJobWithRuntimeException extends SparkApplication { - - override def execute(args: Array[String]): Unit = { - try { - throw new Exception() - } catch { - case e: Exception => throw new RuntimeException("Error execute " + this.getClass.getName, e) - } - } - - override protected def doExecute(): Unit = {} -} - class MockSucceedJob extends SparkApplication { override def execute(args: Array[String]): Unit = {}