This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new ff8b0db KYLIN-4581 Add spark and flink engine test case for release test ff8b0db is described below commit ff8b0dbc8156c5a63e3895d7f8276f88c46312f8 Author: yaqian.zhang <598593...@qq.com> AuthorDate: Fri Jun 19 11:48:25 2020 +0800 KYLIN-4581 Add spark and flink engine test case for release test --- build/smoke-test/smoke-test.sh | 1 + build/smoke-test/testBuildCube.py | 51 ++++++++++++++------ build/smoke-test/testQuery.py | 56 ++++++++++++++-------- .../kylin/rest/controller/CubeController.java | 25 ++++++++++ 4 files changed, 99 insertions(+), 34 deletions(-) diff --git a/build/smoke-test/smoke-test.sh b/build/smoke-test/smoke-test.sh index 0a7e362..cc10889 100755 --- a/build/smoke-test/smoke-test.sh +++ b/build/smoke-test/smoke-test.sh @@ -69,6 +69,7 @@ sed -i 's/#*\(kylin.query.pushdown.jdbc.url*\)/\1/' conf/kylin.properties sed -i 's/#*\(kylin.query.pushdown.jdbc.driver*\)/\1/' conf/kylin.properties sed -i 's/#*\(kylin.query.pushdown.jdbc.username*\)/\1/' conf/kylin.properties +${KYLIN_HOME}/bin/download-flink.sh ${KYLIN_HOME}/bin/kylin.sh start echo "Wait 3 minutes for service start." diff --git a/build/smoke-test/testBuildCube.py b/build/smoke-test/testBuildCube.py index 9452cf2..733d1eb 100644 --- a/build/smoke-test/testBuildCube.py +++ b/build/smoke-test/testBuildCube.py @@ -24,23 +24,39 @@ import time class testBuildCube(unittest.TestCase): + + _base_url = "http://sandbox:7070/kylin/api" + + _headers = { + 'content-type': "application/json", + 'authorization': "Basic QURNSU46S1lMSU4=", + 'cache-control': "no-cache" + } + + _clone_cube_url = _base_url + "/cubes/kylin_sales_cube/clone" + def setUp(self): - pass + self.clone_cube("kylin_sales_cube_spark", "SPARK") + self.clone_cube("kylin_sales_cube_flink", "FLINK") def tearDown(self): pass - def testBuild(self): - base_url = "http://sandbox:7070/kylin/api" - url = base_url + "/cubes/kylin_sales_cube/rebuild" - headers = { - 'content-type': "application/json", - 'authorization': "Basic QURNSU46S1lMSU4=", - 'cache-control': "no-cache" - } + def clone_cube(self, cube_name, engine_type): + payload = {'project': 'learn_kylin', + 'cubeName': cube_name} + response = requests.request("PUT", self._clone_cube_url, json=payload, headers=self._headers) + self.assertEqual(response.status_code, 200, 'Clone cube : ' + cube_name + ' failed.') + update_engine_url = self._base_url + "/cubes/" + cube_name + "/" + engine_type + response = requests.request("PUT", update_engine_url, headers=self._headers) + self.assertEqual(response.status_code, 200, 'Update engine type of cube : ' + cube_name + ' failed.') + + def singleBuild(self, cube_name): + + url = self._base_url + "/cubes/" + cube_name + "/rebuild" # reload metadata before build cubes - cache_response = requests.request("PUT", base_url + "/cache/all/all/update", headers=headers) + cache_response = requests.request("PUT", self._base_url + "/cache/all/all/update", headers=self._headers) self.assertEqual(cache_response.status_code, 200, 'Metadata cache not refreshed.') payload = "{\"startTime\": 1325376000000, \"endTime\": 1456790400000, \"buildType\":\"BUILD\"}" @@ -49,7 +65,7 @@ class testBuildCube(unittest.TestCase): while status_code != 200 and try_time <= 3: print 'Submit build job, try_time = ' + str(try_time) try: - response = requests.request("PUT", url, data=payload, headers=headers) + response = requests.request("PUT", url, data=payload, headers=self._headers) status_code = response.status_code except: status_code = 0 @@ -64,8 +80,8 @@ class testBuildCube(unittest.TestCase): print 'Build job is submitted...' job_response = json.loads(response.text) job_uuid = job_response['uuid'] - job_url = base_url + "/jobs/" + job_uuid - job_response = requests.request("GET", job_url, headers=headers) + job_url = self._base_url + "/jobs/" + job_uuid + job_response = requests.request("GET", job_url, headers=self._headers) self.assertEqual(job_response.status_code, 200, 'Build job information fetched failed.') @@ -75,7 +91,7 @@ class testBuildCube(unittest.TestCase): while job_status in ('RUNNING', 'PENDING') and try_time <= 30: print 'Wait for job complete, try_time = ' + str(try_time) try: - job_response = requests.request("GET", job_url, headers=headers) + job_response = requests.request("GET", job_url, headers=self._headers) job_info = json.loads(job_response.text) job_status = job_info['job_status'] except: @@ -88,6 +104,13 @@ class testBuildCube(unittest.TestCase): self.assertEquals(job_status, 'FINISHED', 'Build cube failed, job status is ' + job_status) print 'Job complete.' + def testBuild(self): + self.singleBuild("kylin_sales_cube_spark") + self.singleBuild("kylin_sales_cube_flink") + self.singleBuild("kylin_sales_cube") + # wait for kylin_sales_cube to READY + time.sleep(10) + if __name__ == '__main__': print 'Test Build Cube for Kylin sample.' diff --git a/build/smoke-test/testQuery.py b/build/smoke-test/testQuery.py index ec1702e..ddf6cdd 100644 --- a/build/smoke-test/testQuery.py +++ b/build/smoke-test/testQuery.py @@ -41,6 +41,7 @@ class testQuery(unittest.TestCase): def testQuery(self): sql_files = glob.glob('sql/*.sql') + cube_list = ['kylin_sales_cube', 'kylin_sales_cube_spark', 'kylin_sales_cube_flink'] index = 0 query_url = testQuery.base_url + "/query" for sql_file in sql_files: @@ -50,26 +51,36 @@ class testQuery(unittest.TestCase): for sql_statement_line in sql_statement_lines: if not sql_statement_line.startswith('--'): sql_statement += sql_statement_line.strip() + ' ' - payload = "{\"sql\": \"" + sql_statement.strip() + "\", \"offset\": 0, \"limit\": \"50000\", \"acceptPartial\":false, \"project\":\"learn_kylin\"}" - print 'Test Query #' + str(index) + ': \n' + sql_statement - response = requests.request("POST", query_url, data=payload, headers=testQuery.headers) - - self.assertEqual(response.status_code, 200, 'Query failed.') - actual_result = json.loads(response.text) - print 'Query duration: ' + str(actual_result['duration']) + 'ms' - del actual_result['duration'] - del actual_result['hitExceptionCache'] - del actual_result['storageCacheUsed'] - del actual_result['totalScanCount'] - del actual_result['totalScanBytes'] - - expect_result = json.loads(open(sql_file[:-4] + '.json').read().strip()) - self.assertEqual(actual_result, expect_result, 'Query result does not equal.') - - def testQueryPushDown(self): - sql_files = glob.glob('sql/*.sql') - index = 0 - url = testQuery.base_url + "/cubes/kylin_sales_cube/disable" + for cube_name in cube_list: + payload = "{\"sql\": \"" + sql_statement.strip() + "\", \"offset\": 0, \"limit\": \"50000\", " \ + "\"acceptPartial\":false, " \ + "\"project\":\"learn_kylin\", " \ + "\"backdoorToggles\":{" \ + "\"DEBUG_TOGGLE_HIT_CUBE\":\"" + cube_name + "\"}} " + print 'Test Query #' + str(index) + ': \n' + sql_statement + response = requests.request("POST", query_url, data=payload, headers=testQuery.headers) + + self.assertEqual(response.status_code, 200, 'Query failed.') + actual_result = json.loads(response.text) + print 'Query duration: ' + str(actual_result['duration']) + 'ms' + del actual_result['duration'] + del actual_result['hitExceptionCache'] + del actual_result['storageCacheUsed'] + del actual_result['totalScanCount'] + del actual_result['totalScanBytes'] + + expect_result = json.loads(open(sql_file[:-4] + '.json').read().strip()) + expect_result["cube"] = u'CUBE[name=' + cube_name + ']' + + self.assertEqual(actual_result, expect_result, 'Query result does not equal.') + + def disableCube(self): + self.disableSingleCube("kylin_sales_cube") + self.disableSingleCube("kylin_sales_cube_spark") + self.disableSingleCube("kylin_sales_cube_flink") + + def disableSingleCube(self, cube_name): + url = testQuery.base_url + "/cubes/" + cube_name + "/disable" status_code = 0 try_time = 1 while status_code != 200 and try_time <= 3: @@ -86,9 +97,14 @@ class testQuery(unittest.TestCase): self.assertEqual(status_code, 200, 'Disable cube failed.') + def testQueryPushDown(self): + self.disableCube() # Sleep 3 seconds to ensure cache wiped while do query pushdown time.sleep(3) + sql_files = glob.glob('sql/*.sql') + index = 0 + query_url = testQuery.base_url + "/query" for sql_file in sql_files: index += 1 diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index a96cdda..c71bacd 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -53,6 +53,7 @@ import org.apache.kylin.job.JoinedFlatTable; import org.apache.kylin.job.exception.JobException; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISourceAware; +import org.apache.kylin.metadata.model.IEngineAware; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.SegmentRange.TSRange; @@ -781,6 +782,30 @@ public class CubeController extends BasicController { return cubeRequest; } + @RequestMapping(value = "/{cubeName}/{engineType}", method = RequestMethod.PUT) + @ResponseBody + public void updateCubeEngineType(@PathVariable String cubeName, @PathVariable String engineType) throws IOException { + checkCubeExists(cubeName); + CubeInstance cube = cubeService.getCubeManager().getCube(cubeName); + CubeDesc desc = cube.getDescriptor(); + int engineTypeID = desc.getEngineType(); + switch(engineType) { + case "MR_V2": + engineTypeID = IEngineAware.ID_MR_V2; + break; + case "SPARK": + engineTypeID = IEngineAware.ID_SPARK; + break; + case "FLINK": + engineTypeID = IEngineAware.ID_FLINK; + break; + default: + logger.warn("Engine type {} is not support", engineType); + } + desc.setEngineType(engineTypeID); + cubeService.updateCubeAndDesc(cube, desc, cube.getProject(), true); + } + /** * get Hbase Info *