This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit ada7c4ee26cba3523d2af78039b1f72e5becc507
Author: luoping.zhang <luoping.zh...@kyligence.io>
AuthorDate: Fri Mar 31 15:07:54 2023 +0800

    KYLIN-5634 Query node capacity expansion and reduction
---
 .../rest/controller/QueryResourceController.java   | 59 +++++++++++++
 .../controller/QueryResourceControllerTest.java    | 82 ++++++++++++++++++
 .../kylin/rest/service/QueryResourceService.java   | 90 ++++++++++++++++++++
 .../rest/service/QueryResourceServiceTest.java     | 96 ++++++++++++++++++++++
 .../scala/org/apache/spark/sql/SparderEnv.scala    | 23 +++++-
 5 files changed, 348 insertions(+), 2 deletions(-)

diff --git 
a/src/query-server/src/main/java/org/apache/kylin/rest/controller/QueryResourceController.java
 
b/src/query-server/src/main/java/org/apache/kylin/rest/controller/QueryResourceController.java
new file mode 100644
index 0000000000..768f8ad102
--- /dev/null
+++ 
b/src/query-server/src/main/java/org/apache/kylin/rest/controller/QueryResourceController.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.rest.controller;
+
+import static 
org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON;
+import static 
org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON;
+
+import org.apache.kylin.rest.service.QueryResourceService;
+import org.apache.kylin.rest.service.QueryResourceService.QueryResource;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping(value = "/api/resource/query", produces = { 
HTTP_VND_APACHE_KYLIN_JSON,
+        HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON })
+public class QueryResourceController {
+
+    @Autowired
+    private QueryResourceService queryResourceService;
+
+    @PutMapping(value = "/adjust")
+    @ResponseBody
+    public QueryResourceService.QueryResource adjustQueryResource(@RequestBody 
QueryResource resource) {
+        if (queryResourceService.isAvailable()) {
+            return queryResourceService.adjustQueryResource(resource);
+        }
+        return new QueryResource();
+    }
+
+    @GetMapping(value = "/executor")
+    @ResponseBody
+    public int getExecutorSize() {
+        if (queryResourceService.isAvailable()) {
+            return queryResourceService.getExecutorSize();
+        }
+        return -1;
+    }
+
+}
\ No newline at end of file
diff --git 
a/src/query-server/src/test/java/org/apache/kylin/rest/controller/QueryResourceControllerTest.java
 
b/src/query-server/src/test/java/org/apache/kylin/rest/controller/QueryResourceControllerTest.java
new file mode 100644
index 0000000000..ff881db697
--- /dev/null
+++ 
b/src/query-server/src/test/java/org/apache/kylin/rest/controller/QueryResourceControllerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.controller;
+
+import static 
org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON;
+
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.rest.service.QueryResourceService;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.springframework.http.MediaType;
+import org.springframework.test.util.ReflectionTestUtils;
+import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
+import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
+import org.springframework.test.web.servlet.setup.MockMvcBuilders;
+
+public class QueryResourceControllerTest {
+
+    private MockMvc mockMvc;
+
+    @InjectMocks
+    private QueryResourceController queryResourceController = Mockito.spy(new 
QueryResourceController());
+    @Mock
+    private QueryResourceService queryResourceService;
+
+    @Before
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+        mockMvc = MockMvcBuilders.standaloneSetup(queryResourceController)
+                .defaultRequest(MockMvcRequestBuilders.get("/")).build();
+        ReflectionTestUtils.setField(queryResourceController, 
"queryResourceService", queryResourceService);
+    }
+
+    @Test
+    public void testAdjustQueryResource() throws Exception {
+        QueryResourceService.QueryResource queryResource = new 
QueryResourceService.QueryResource();
+        queryResource.setInstance(1);
+        
mockMvc.perform(MockMvcRequestBuilders.put("/api/resource/query/adjust").contentType(MediaType.APPLICATION_JSON)
+                .content(JsonUtil.writeValueAsString(queryResource))
+                .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
+                .andExpect(MockMvcResultMatchers.status().isOk());
+
+        
Mockito.verify(queryResourceController).adjustQueryResource(Mockito.any());
+        Mockito.when(queryResourceService.isAvailable()).thenReturn(true);
+        
Mockito.when(queryResourceService.adjustQueryResource(queryResource)).thenReturn(queryResource);
+        Assert.assertEquals(queryResource.getInstance(),
+                
queryResourceController.adjustQueryResource(queryResource).getInstance());
+    }
+
+    @Test
+    public void testGetExecutorSize() throws Exception {
+        
mockMvc.perform(MockMvcRequestBuilders.get("/api/resource/query/executor")
+                
.contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
+                .andExpect(MockMvcResultMatchers.status().isOk());
+        Mockito.verify(queryResourceController).getExecutorSize();
+        Mockito.when(queryResourceService.isAvailable()).thenReturn(true);
+        Mockito.when(queryResourceService.getExecutorSize()).thenReturn(1);
+        Assert.assertEquals(1, queryResourceController.getExecutorSize());
+    }
+}
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryResourceService.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryResourceService.java
new file mode 100644
index 0000000000..afb6b61aea
--- /dev/null
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryResourceService.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.rest.service;
+
+import java.util.ArrayList;
+
+import org.apache.spark.ExecutorAllocationClient;
+import org.apache.spark.sql.SparderEnv;
+import org.springframework.stereotype.Component;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.val;
+import lombok.extern.slf4j.Slf4j;
+import scala.collection.JavaConverters;
+
+@Slf4j
+@Component
+public class QueryResourceService {
+
+    public QueryResource adjustQueryResource(QueryResource resource) {
+        int adjustNum;
+        if (resource.instance > 0) {
+            adjustNum = requestExecutor(resource.instance);
+        } else {
+            adjustNum = releaseExecutor(resource.instance * -1, 
resource.force);
+        }
+        return new QueryResource(adjustNum, resource.force);
+    }
+
+    public int getExecutorSize() {
+        return getExecutorAllocationClient().getExecutorIds().size();
+    }
+
+    private int requestExecutor(int instance) {
+        val client = getExecutorAllocationClient();
+        return client.requestExecutors(instance) ? instance : 0;
+    }
+
+    private int releaseExecutor(int instance, boolean force) {
+        val client = getExecutorAllocationClient();
+        val ids = client.getExecutorIds().iterator();
+        val idsToRemoved = new ArrayList<String>();
+        while (ids.hasNext()) {
+            if (idsToRemoved.size() == instance)
+                break;
+            val id = ids.next();
+            idsToRemoved.add(id);
+        }
+
+        if (idsToRemoved.isEmpty()) {
+            return 0;
+        }
+        return 
client.killExecutors(JavaConverters.asScalaBuffer(idsToRemoved).toSeq(), true, 
false, force).size();
+    }
+
+    private ExecutorAllocationClient getExecutorAllocationClient() {
+        return SparderEnv.executorAllocationClient().get();
+    }
+
+    public boolean isAvailable() {
+        boolean available = SparderEnv.executorAllocationClient().isDefined() 
&& SparderEnv.isSparkAvailable();
+        log.info("node is available={}", available);
+        return available;
+    }
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class QueryResource {
+        private int instance;
+        private boolean force;
+    }
+}
\ No newline at end of file
diff --git 
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryResourceServiceTest.java
 
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryResourceServiceTest.java
new file mode 100644
index 0000000000..455a3ad5af
--- /dev/null
+++ 
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryResourceServiceTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.service;
+
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.spark.ExecutorAllocationClient;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.SparkSession;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import lombok.val;
+import lombok.var;
+import scala.collection.JavaConverters;
+
+public class QueryResourceServiceTest extends NLocalFileMetadataTestCase {
+
+    private SparkSession ss;
+    @InjectMocks
+    private final QueryResourceService queryResourceService = Mockito.spy(new 
QueryResourceService());
+
+    @Mock
+    private ExecutorAllocationClient client;
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty("SPARK_LOCAL_IP", "localhost");
+        MockitoAnnotations.initMocks(this);
+        createTestMetadata();
+        ss = 
SparkSession.builder().appName("local").master("local[1]").getOrCreate();
+        SparderEnv.setSparkSession(ss);
+        SparderEnv.setExecutorAllocationClient(client);
+        
Mockito.doReturn(true).when(client).isExecutorActive(Mockito.anyString());
+        ss.range(1, 10).createOrReplaceTempView("queryResourceServiceTest");
+        val data = ss.sql("SELECT id,count(0) FROM queryResourceServiceTest 
group by id");
+        data.persist();
+        data.show();
+        data.unpersist();
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+        ss.stop();
+    }
+
+    @Test
+    public void testAdjustQueryResource() {
+        Assert.assertTrue(queryResourceService.isAvailable());
+        QueryResourceService.QueryResource queryResource = new 
QueryResourceService.QueryResource();
+
+        queryResource.setInstance(1);
+        var resource = queryResourceService.adjustQueryResource(queryResource);
+        Assert.assertEquals(0, resource.getInstance());
+        Mockito.doReturn(true).when(client).requestExecutors(Mockito.anyInt());
+        resource = queryResourceService.adjustQueryResource(queryResource);
+        Assert.assertEquals(1, resource.getInstance());
+
+        queryResource.setInstance(-1);
+        val seqs = 
JavaConverters.asScalaBuffer(Lists.newArrayList("1")).toSeq();
+        Mockito.doReturn(seqs).when(client).getExecutorIds();
+        Assert.assertEquals(1, queryResourceService.getExecutorSize());
+        Mockito.doReturn(seqs).when(client).killExecutors(Mockito.any(), 
Mockito.anyBoolean(), Mockito.anyBoolean(),
+                Mockito.anyBoolean());
+        resource = queryResourceService.adjustQueryResource(queryResource);
+        Assert.assertEquals(1, resource.getInstance());
+
+        queryResource.setInstance(0);
+        queryResource.setForce(true);
+        resource = queryResourceService.adjustQueryResource(queryResource);
+        Assert.assertEquals(0, resource.getInstance());
+    }
+}
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
index 81934163c3..3310f79a97 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
@@ -36,7 +36,7 @@ import 
org.apache.kylin.metadata.model.{NTableMetadataManager, TableExtDesc}
 import org.apache.kylin.metadata.project.NProjectManager
 import org.apache.kylin.query.runtime.plan.QueryToExecutionIDCache
 import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, 
SparkListenerLogRollUp, SparkListenerTaskEnd}
+import org.apache.spark.scheduler._
 import org.apache.spark.sql.KylinSession._
 import org.apache.spark.sql.catalyst.optimizer.ConvertInnerJoinToSemiJoin
 import org.apache.spark.sql.catalyst.parser.ParseException
@@ -46,7 +46,7 @@ import 
org.apache.spark.sql.execution.ui.PostQueryExecutionForKylin
 import org.apache.spark.sql.hive.ReplaceLocationRule
 import org.apache.spark.sql.udf.UdfManager
 import org.apache.spark.util.{ThreadUtils, Utils}
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext}
 
 // scalastyle:off
 object SparderEnv extends Logging {
@@ -68,6 +68,8 @@ object SparderEnv extends Logging {
   @volatile
   var lastStartSparkFailureTime: Long = 0
 
+  private var _executorAllocationClient: Option[ExecutorAllocationClient] = 
None
+
   def getSparkSession: SparkSession = {
     if (spark == null || spark.sparkContext.isStopped) {
       logInfo("Init spark.")
@@ -244,6 +246,7 @@ object SparderEnv extends Logging {
           .currentThread()
           .getContextClassLoader
           .toString)
+      setExecutorAllocationClient(sparkSession.sparkContext)
       registerListener(sparkSession.sparkContext)
       registerQueryMetrics(sparkSession.sparkContext)
       APP_MASTER_TRACK_URL = null
@@ -278,6 +281,22 @@ object SparderEnv extends Logging {
     }
   }
 
+  //for test
+  def setExecutorAllocationClient(client: ExecutorAllocationClient): Unit = {
+    _executorAllocationClient = Some(client)
+  }
+
+  def executorAllocationClient: Option[ExecutorAllocationClient] = 
_executorAllocationClient
+
+  def setExecutorAllocationClient(sc: SparkContext): Unit = {
+    _executorAllocationClient = sc.schedulerBackend match {
+      case client: ExecutorAllocationClient =>
+        Some(client)
+      case _ =>
+        None
+    }
+  }
+
   def registerListener(sc: SparkContext): Unit = {
     val sparkListener = new SparkListener {
 

Reply via email to