This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ada7c4ee26cba3523d2af78039b1f72e5becc507 Author: luoping.zhang <luoping.zh...@kyligence.io> AuthorDate: Fri Mar 31 15:07:54 2023 +0800 KYLIN-5634 Query node capacity expansion and reduction --- .../rest/controller/QueryResourceController.java | 59 +++++++++++++ .../controller/QueryResourceControllerTest.java | 82 ++++++++++++++++++ .../kylin/rest/service/QueryResourceService.java | 90 ++++++++++++++++++++ .../rest/service/QueryResourceServiceTest.java | 96 ++++++++++++++++++++++ .../scala/org/apache/spark/sql/SparderEnv.scala | 23 +++++- 5 files changed, 348 insertions(+), 2 deletions(-) diff --git a/src/query-server/src/main/java/org/apache/kylin/rest/controller/QueryResourceController.java b/src/query-server/src/main/java/org/apache/kylin/rest/controller/QueryResourceController.java new file mode 100644 index 0000000000..768f8ad102 --- /dev/null +++ b/src/query-server/src/main/java/org/apache/kylin/rest/controller/QueryResourceController.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.rest.controller; + +import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON; +import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON; + +import org.apache.kylin.rest.service.QueryResourceService; +import org.apache.kylin.rest.service.QueryResourceService.QueryResource; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping(value = "/api/resource/query", produces = { HTTP_VND_APACHE_KYLIN_JSON, + HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON }) +public class QueryResourceController { + + @Autowired + private QueryResourceService queryResourceService; + + @PutMapping(value = "/adjust") + @ResponseBody + public QueryResourceService.QueryResource adjustQueryResource(@RequestBody QueryResource resource) { + if (queryResourceService.isAvailable()) { + return queryResourceService.adjustQueryResource(resource); + } + return new QueryResource(); + } + + @GetMapping(value = "/executor") + @ResponseBody + public int getExecutorSize() { + if (queryResourceService.isAvailable()) { + return queryResourceService.getExecutorSize(); + } + return -1; + } + +} \ No newline at end of file diff --git a/src/query-server/src/test/java/org/apache/kylin/rest/controller/QueryResourceControllerTest.java b/src/query-server/src/test/java/org/apache/kylin/rest/controller/QueryResourceControllerTest.java new file mode 100644 index 0000000000..ff881db697 --- /dev/null +++ b/src/query-server/src/test/java/org/apache/kylin/rest/controller/QueryResourceControllerTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.controller; + +import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON; + +import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.rest.service.QueryResourceService; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.springframework.http.MediaType; +import org.springframework.test.util.ReflectionTestUtils; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; +import org.springframework.test.web.servlet.result.MockMvcResultMatchers; +import org.springframework.test.web.servlet.setup.MockMvcBuilders; + +public class QueryResourceControllerTest { + + private MockMvc mockMvc; + + @InjectMocks + private QueryResourceController queryResourceController = Mockito.spy(new QueryResourceController()); + @Mock + private QueryResourceService queryResourceService; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + mockMvc = MockMvcBuilders.standaloneSetup(queryResourceController) + .defaultRequest(MockMvcRequestBuilders.get("/")).build(); + ReflectionTestUtils.setField(queryResourceController, "queryResourceService", queryResourceService); + } + + @Test + public void testAdjustQueryResource() throws Exception { + QueryResourceService.QueryResource queryResource = new QueryResourceService.QueryResource(); + queryResource.setInstance(1); + mockMvc.perform(MockMvcRequestBuilders.put("/api/resource/query/adjust").contentType(MediaType.APPLICATION_JSON) + .content(JsonUtil.writeValueAsString(queryResource)) + .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) + .andExpect(MockMvcResultMatchers.status().isOk()); + + Mockito.verify(queryResourceController).adjustQueryResource(Mockito.any()); + Mockito.when(queryResourceService.isAvailable()).thenReturn(true); + Mockito.when(queryResourceService.adjustQueryResource(queryResource)).thenReturn(queryResource); + Assert.assertEquals(queryResource.getInstance(), + queryResourceController.adjustQueryResource(queryResource).getInstance()); + } + + @Test + public void testGetExecutorSize() throws Exception { + mockMvc.perform(MockMvcRequestBuilders.get("/api/resource/query/executor") + .contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) + .andExpect(MockMvcResultMatchers.status().isOk()); + Mockito.verify(queryResourceController).getExecutorSize(); + Mockito.when(queryResourceService.isAvailable()).thenReturn(true); + Mockito.when(queryResourceService.getExecutorSize()).thenReturn(1); + Assert.assertEquals(1, queryResourceController.getExecutorSize()); + } +} diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryResourceService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryResourceService.java new file mode 100644 index 0000000000..afb6b61aea --- /dev/null +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryResourceService.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.rest.service; + +import java.util.ArrayList; + +import org.apache.spark.ExecutorAllocationClient; +import org.apache.spark.sql.SparderEnv; +import org.springframework.stereotype.Component; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.val; +import lombok.extern.slf4j.Slf4j; +import scala.collection.JavaConverters; + +@Slf4j +@Component +public class QueryResourceService { + + public QueryResource adjustQueryResource(QueryResource resource) { + int adjustNum; + if (resource.instance > 0) { + adjustNum = requestExecutor(resource.instance); + } else { + adjustNum = releaseExecutor(resource.instance * -1, resource.force); + } + return new QueryResource(adjustNum, resource.force); + } + + public int getExecutorSize() { + return getExecutorAllocationClient().getExecutorIds().size(); + } + + private int requestExecutor(int instance) { + val client = getExecutorAllocationClient(); + return client.requestExecutors(instance) ? instance : 0; + } + + private int releaseExecutor(int instance, boolean force) { + val client = getExecutorAllocationClient(); + val ids = client.getExecutorIds().iterator(); + val idsToRemoved = new ArrayList<String>(); + while (ids.hasNext()) { + if (idsToRemoved.size() == instance) + break; + val id = ids.next(); + idsToRemoved.add(id); + } + + if (idsToRemoved.isEmpty()) { + return 0; + } + return client.killExecutors(JavaConverters.asScalaBuffer(idsToRemoved).toSeq(), true, false, force).size(); + } + + private ExecutorAllocationClient getExecutorAllocationClient() { + return SparderEnv.executorAllocationClient().get(); + } + + public boolean isAvailable() { + boolean available = SparderEnv.executorAllocationClient().isDefined() && SparderEnv.isSparkAvailable(); + log.info("node is available={}", available); + return available; + } + + @Data + @AllArgsConstructor + @NoArgsConstructor + public static class QueryResource { + private int instance; + private boolean force; + } +} \ No newline at end of file diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryResourceServiceTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryResourceServiceTest.java new file mode 100644 index 0000000000..455a3ad5af --- /dev/null +++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryResourceServiceTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.service; + +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.spark.ExecutorAllocationClient; +import org.apache.spark.sql.SparderEnv; +import org.apache.spark.sql.SparkSession; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import lombok.val; +import lombok.var; +import scala.collection.JavaConverters; + +public class QueryResourceServiceTest extends NLocalFileMetadataTestCase { + + private SparkSession ss; + @InjectMocks + private final QueryResourceService queryResourceService = Mockito.spy(new QueryResourceService()); + + @Mock + private ExecutorAllocationClient client; + + @Before + public void setUp() throws Exception { + System.setProperty("SPARK_LOCAL_IP", "localhost"); + MockitoAnnotations.initMocks(this); + createTestMetadata(); + ss = SparkSession.builder().appName("local").master("local[1]").getOrCreate(); + SparderEnv.setSparkSession(ss); + SparderEnv.setExecutorAllocationClient(client); + Mockito.doReturn(true).when(client).isExecutorActive(Mockito.anyString()); + ss.range(1, 10).createOrReplaceTempView("queryResourceServiceTest"); + val data = ss.sql("SELECT id,count(0) FROM queryResourceServiceTest group by id"); + data.persist(); + data.show(); + data.unpersist(); + } + + @After + public void after() throws Exception { + cleanupTestMetadata(); + ss.stop(); + } + + @Test + public void testAdjustQueryResource() { + Assert.assertTrue(queryResourceService.isAvailable()); + QueryResourceService.QueryResource queryResource = new QueryResourceService.QueryResource(); + + queryResource.setInstance(1); + var resource = queryResourceService.adjustQueryResource(queryResource); + Assert.assertEquals(0, resource.getInstance()); + Mockito.doReturn(true).when(client).requestExecutors(Mockito.anyInt()); + resource = queryResourceService.adjustQueryResource(queryResource); + Assert.assertEquals(1, resource.getInstance()); + + queryResource.setInstance(-1); + val seqs = JavaConverters.asScalaBuffer(Lists.newArrayList("1")).toSeq(); + Mockito.doReturn(seqs).when(client).getExecutorIds(); + Assert.assertEquals(1, queryResourceService.getExecutorSize()); + Mockito.doReturn(seqs).when(client).killExecutors(Mockito.any(), Mockito.anyBoolean(), Mockito.anyBoolean(), + Mockito.anyBoolean()); + resource = queryResourceService.adjustQueryResource(queryResource); + Assert.assertEquals(1, resource.getInstance()); + + queryResource.setInstance(0); + queryResource.setForce(true); + resource = queryResourceService.adjustQueryResource(queryResource); + Assert.assertEquals(0, resource.getInstance()); + } +} diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala index 81934163c3..3310f79a97 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala @@ -36,7 +36,7 @@ import org.apache.kylin.metadata.model.{NTableMetadataManager, TableExtDesc} import org.apache.kylin.metadata.project.NProjectManager import org.apache.kylin.query.runtime.plan.QueryToExecutionIDCache import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerLogRollUp, SparkListenerTaskEnd} +import org.apache.spark.scheduler._ import org.apache.spark.sql.KylinSession._ import org.apache.spark.sql.catalyst.optimizer.ConvertInnerJoinToSemiJoin import org.apache.spark.sql.catalyst.parser.ParseException @@ -46,7 +46,7 @@ import org.apache.spark.sql.execution.ui.PostQueryExecutionForKylin import org.apache.spark.sql.hive.ReplaceLocationRule import org.apache.spark.sql.udf.UdfManager import org.apache.spark.util.{ThreadUtils, Utils} -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext} // scalastyle:off object SparderEnv extends Logging { @@ -68,6 +68,8 @@ object SparderEnv extends Logging { @volatile var lastStartSparkFailureTime: Long = 0 + private var _executorAllocationClient: Option[ExecutorAllocationClient] = None + def getSparkSession: SparkSession = { if (spark == null || spark.sparkContext.isStopped) { logInfo("Init spark.") @@ -244,6 +246,7 @@ object SparderEnv extends Logging { .currentThread() .getContextClassLoader .toString) + setExecutorAllocationClient(sparkSession.sparkContext) registerListener(sparkSession.sparkContext) registerQueryMetrics(sparkSession.sparkContext) APP_MASTER_TRACK_URL = null @@ -278,6 +281,22 @@ object SparderEnv extends Logging { } } + //for test + def setExecutorAllocationClient(client: ExecutorAllocationClient): Unit = { + _executorAllocationClient = Some(client) + } + + def executorAllocationClient: Option[ExecutorAllocationClient] = _executorAllocationClient + + def setExecutorAllocationClient(sc: SparkContext): Unit = { + _executorAllocationClient = sc.schedulerBackend match { + case client: ExecutorAllocationClient => + Some(client) + case _ => + None + } + } + def registerListener(sc: SparkContext): Unit = { val sparkListener = new SparkListener {