This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new ff8b0db  KYLIN-4581 Add spark and flink engine test case for release 
test
ff8b0db is described below

commit ff8b0dbc8156c5a63e3895d7f8276f88c46312f8
Author: yaqian.zhang <598593...@qq.com>
AuthorDate: Fri Jun 19 11:48:25 2020 +0800

    KYLIN-4581 Add spark and flink engine test case for release test
---
 build/smoke-test/smoke-test.sh                     |  1 +
 build/smoke-test/testBuildCube.py                  | 51 ++++++++++++++------
 build/smoke-test/testQuery.py                      | 56 ++++++++++++++--------
 .../kylin/rest/controller/CubeController.java      | 25 ++++++++++
 4 files changed, 99 insertions(+), 34 deletions(-)

diff --git a/build/smoke-test/smoke-test.sh b/build/smoke-test/smoke-test.sh
index 0a7e362..cc10889 100755
--- a/build/smoke-test/smoke-test.sh
+++ b/build/smoke-test/smoke-test.sh
@@ -69,6 +69,7 @@ sed -i 's/#*\(kylin.query.pushdown.jdbc.url*\)/\1/' 
conf/kylin.properties
 sed -i 's/#*\(kylin.query.pushdown.jdbc.driver*\)/\1/' conf/kylin.properties
 sed -i 's/#*\(kylin.query.pushdown.jdbc.username*\)/\1/' conf/kylin.properties
 
+${KYLIN_HOME}/bin/download-flink.sh
 ${KYLIN_HOME}/bin/kylin.sh start
 
 echo "Wait 3 minutes for service start."
diff --git a/build/smoke-test/testBuildCube.py 
b/build/smoke-test/testBuildCube.py
index 9452cf2..733d1eb 100644
--- a/build/smoke-test/testBuildCube.py
+++ b/build/smoke-test/testBuildCube.py
@@ -24,23 +24,39 @@ import time
 
 
 class testBuildCube(unittest.TestCase):
+
+    _base_url = "http://sandbox:7070/kylin/api";
+
+    _headers = {
+        'content-type': "application/json",
+        'authorization': "Basic QURNSU46S1lMSU4=",
+        'cache-control': "no-cache"
+    }
+
+    _clone_cube_url = _base_url + "/cubes/kylin_sales_cube/clone"
+
     def setUp(self):
-        pass
+        self.clone_cube("kylin_sales_cube_spark", "SPARK")
+        self.clone_cube("kylin_sales_cube_flink", "FLINK")
 
     def tearDown(self):
         pass
 
-    def testBuild(self):
-        base_url = "http://sandbox:7070/kylin/api";
-        url = base_url + "/cubes/kylin_sales_cube/rebuild"
-        headers = {
-            'content-type': "application/json",
-            'authorization': "Basic QURNSU46S1lMSU4=",
-            'cache-control': "no-cache"
-        }
+    def clone_cube(self, cube_name, engine_type):
+        payload = {'project': 'learn_kylin',
+                   'cubeName': cube_name}
+        response = requests.request("PUT", self._clone_cube_url, json=payload, 
headers=self._headers)
+        self.assertEqual(response.status_code, 200, 'Clone cube : ' + 
cube_name + ' failed.')
+        update_engine_url = self._base_url + "/cubes/" + cube_name + "/" + 
engine_type
+        response = requests.request("PUT", update_engine_url, 
headers=self._headers)
+        self.assertEqual(response.status_code, 200, 'Update engine type of 
cube : ' + cube_name + ' failed.')
+
+    def singleBuild(self, cube_name):
+
+        url = self._base_url + "/cubes/" + cube_name + "/rebuild"
 
         # reload metadata before build cubes
-        cache_response = requests.request("PUT", base_url + 
"/cache/all/all/update", headers=headers)
+        cache_response = requests.request("PUT", self._base_url + 
"/cache/all/all/update", headers=self._headers)
         self.assertEqual(cache_response.status_code, 200, 'Metadata cache not 
refreshed.')
 
         payload = "{\"startTime\": 1325376000000, \"endTime\": 1456790400000, 
\"buildType\":\"BUILD\"}"
@@ -49,7 +65,7 @@ class testBuildCube(unittest.TestCase):
         while status_code != 200 and try_time <= 3:
             print 'Submit build job, try_time = ' + str(try_time)
             try:
-                response = requests.request("PUT", url, data=payload, 
headers=headers)
+                response = requests.request("PUT", url, data=payload, 
headers=self._headers)
                 status_code = response.status_code
             except:
                 status_code = 0
@@ -64,8 +80,8 @@ class testBuildCube(unittest.TestCase):
             print 'Build job is submitted...'
             job_response = json.loads(response.text)
             job_uuid = job_response['uuid']
-            job_url = base_url + "/jobs/" + job_uuid
-            job_response = requests.request("GET", job_url, headers=headers)
+            job_url = self._base_url + "/jobs/" + job_uuid
+            job_response = requests.request("GET", job_url, 
headers=self._headers)
 
             self.assertEqual(job_response.status_code, 200, 'Build job 
information fetched failed.')
 
@@ -75,7 +91,7 @@ class testBuildCube(unittest.TestCase):
             while job_status in ('RUNNING', 'PENDING') and try_time <= 30:
                 print 'Wait for job complete, try_time = ' + str(try_time)
                 try:
-                    job_response = requests.request("GET", job_url, 
headers=headers)
+                    job_response = requests.request("GET", job_url, 
headers=self._headers)
                     job_info = json.loads(job_response.text)
                     job_status = job_info['job_status']
                 except:
@@ -88,6 +104,13 @@ class testBuildCube(unittest.TestCase):
             self.assertEquals(job_status, 'FINISHED', 'Build cube failed, job 
status is ' + job_status)
             print 'Job complete.'
 
+    def testBuild(self):
+        self.singleBuild("kylin_sales_cube_spark")
+        self.singleBuild("kylin_sales_cube_flink")
+        self.singleBuild("kylin_sales_cube")
+        # wait for kylin_sales_cube to READY
+        time.sleep(10)
+
 
 if __name__ == '__main__':
     print 'Test Build Cube for Kylin sample.'
diff --git a/build/smoke-test/testQuery.py b/build/smoke-test/testQuery.py
index ec1702e..ddf6cdd 100644
--- a/build/smoke-test/testQuery.py
+++ b/build/smoke-test/testQuery.py
@@ -41,6 +41,7 @@ class testQuery(unittest.TestCase):
     def testQuery(self):
 
         sql_files = glob.glob('sql/*.sql')
+        cube_list = ['kylin_sales_cube', 'kylin_sales_cube_spark', 
'kylin_sales_cube_flink']
         index = 0
         query_url = testQuery.base_url + "/query"
         for sql_file in sql_files:
@@ -50,26 +51,36 @@ class testQuery(unittest.TestCase):
             for sql_statement_line in sql_statement_lines:
                 if not sql_statement_line.startswith('--'):
                     sql_statement += sql_statement_line.strip() + ' '
-            payload = "{\"sql\": \"" + sql_statement.strip() + "\", 
\"offset\": 0, \"limit\": \"50000\", \"acceptPartial\":false, 
\"project\":\"learn_kylin\"}"
-            print 'Test Query #' + str(index) + ': \n' + sql_statement
-            response = requests.request("POST", query_url, data=payload, 
headers=testQuery.headers)
-
-            self.assertEqual(response.status_code, 200, 'Query failed.')
-            actual_result = json.loads(response.text)
-            print 'Query duration: ' + str(actual_result['duration']) + 'ms'
-            del actual_result['duration']
-            del actual_result['hitExceptionCache']
-            del actual_result['storageCacheUsed']
-            del actual_result['totalScanCount']
-            del actual_result['totalScanBytes']
-
-            expect_result = json.loads(open(sql_file[:-4] + 
'.json').read().strip())
-            self.assertEqual(actual_result, expect_result, 'Query result does 
not equal.')
-
-    def testQueryPushDown(self):
-        sql_files = glob.glob('sql/*.sql')
-        index = 0
-        url = testQuery.base_url + "/cubes/kylin_sales_cube/disable"
+            for cube_name in cube_list:
+                payload = "{\"sql\": \"" + sql_statement.strip() + "\", 
\"offset\": 0, \"limit\": \"50000\", " \
+                                                                   
"\"acceptPartial\":false, " \
+                                                                   
"\"project\":\"learn_kylin\", " \
+                                                                   
"\"backdoorToggles\":{" \
+                                                                   
"\"DEBUG_TOGGLE_HIT_CUBE\":\"" + cube_name + "\"}} "
+                print 'Test Query #' + str(index) + ': \n' + sql_statement
+                response = requests.request("POST", query_url, data=payload, 
headers=testQuery.headers)
+
+                self.assertEqual(response.status_code, 200, 'Query failed.')
+                actual_result = json.loads(response.text)
+                print 'Query duration: ' + str(actual_result['duration']) + 
'ms'
+                del actual_result['duration']
+                del actual_result['hitExceptionCache']
+                del actual_result['storageCacheUsed']
+                del actual_result['totalScanCount']
+                del actual_result['totalScanBytes']
+
+                expect_result = json.loads(open(sql_file[:-4] + 
'.json').read().strip())
+                expect_result["cube"] = u'CUBE[name=' + cube_name + ']'
+
+                self.assertEqual(actual_result, expect_result, 'Query result 
does not equal.')
+
+    def disableCube(self):
+        self.disableSingleCube("kylin_sales_cube")
+        self.disableSingleCube("kylin_sales_cube_spark")
+        self.disableSingleCube("kylin_sales_cube_flink")
+
+    def disableSingleCube(self, cube_name):
+        url = testQuery.base_url + "/cubes/" + cube_name + "/disable"
         status_code = 0
         try_time = 1
         while status_code != 200 and try_time <= 3:
@@ -86,9 +97,14 @@ class testQuery(unittest.TestCase):
 
         self.assertEqual(status_code, 200, 'Disable cube failed.')
 
+    def testQueryPushDown(self):
+        self.disableCube()
         # Sleep 3 seconds to ensure cache wiped while do query pushdown
         time.sleep(3)
 
+        sql_files = glob.glob('sql/*.sql')
+        index = 0
+
         query_url = testQuery.base_url + "/query"
         for sql_file in sql_files:
             index += 1
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index a96cdda..c71bacd 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -53,6 +53,7 @@ import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISourceAware;
+import org.apache.kylin.metadata.model.IEngineAware;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.SegmentRange.TSRange;
@@ -781,6 +782,30 @@ public class CubeController extends BasicController {
         return cubeRequest;
     }
 
+    @RequestMapping(value = "/{cubeName}/{engineType}", method = 
RequestMethod.PUT)
+    @ResponseBody
+    public void updateCubeEngineType(@PathVariable String cubeName, 
@PathVariable String engineType) throws IOException {
+        checkCubeExists(cubeName);
+        CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
+        CubeDesc desc = cube.getDescriptor();
+        int engineTypeID = desc.getEngineType();
+        switch(engineType) {
+            case "MR_V2":
+                engineTypeID = IEngineAware.ID_MR_V2;
+                break;
+            case "SPARK":
+                engineTypeID = IEngineAware.ID_SPARK;
+                break;
+            case "FLINK":
+                engineTypeID = IEngineAware.ID_FLINK;
+                break;
+            default:
+                logger.warn("Engine type {} is not support", engineType);
+        }
+        desc.setEngineType(engineTypeID);
+        cubeService.updateCubeAndDesc(cube, desc, cube.getProject(), true);
+    }
+
     /**
      * get Hbase Info
      *

Reply via email to