This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit a072088cc247e1080e4b96fd8c23dcf64f9d14a7 Author: Xuecheng Shan <xuecheng.s...@kyligence.io> AuthorDate: Fri Oct 13 11:37:50 2023 +0800 KYLIN-5853 Optimize job_info and job_lock tables --- .../src/main/resources/metadata-jdbc-h2.properties | 16 ++++++++-------- .../org/apache/kylin/job/core/lock/JdbcLockClient.java | 2 +- .../main/java/org/apache/kylin/job/dao/JobInfoDao.java | 11 +++-------- .../main/java/org/apache/kylin/job/domain/JobInfo.java | 14 ++++++-------- .../main/java/org/apache/kylin/job/domain/JobLock.java | 7 ++++--- .../org/apache/kylin/job/mapper/JobInfoMapper.java | 3 +-- .../org/apache/kylin/job/mapper/JobLockMapper.java | 2 +- .../org/apache/kylin/job/rest/JobMapperFilter.java | 5 ++--- .../java/org/apache/kylin/job/util/JobInfoUtil.java | 2 +- .../main/resources/mybatis-mapper/JobInfoMapper.xml | 18 +++++++++--------- .../main/resources/mybatis-mapper/JobLockMapper.xml | 12 ++++++------ .../src/main/resources/script/schema_job_info_h2.sql | 8 ++++---- .../main/resources/script/schema_job_info_mysql.sql | 6 +++--- .../resources/script/schema_job_info_postgresql.sql | 6 +++--- .../src/main/resources/script/schema_job_lock_h2.sql | 4 ++-- .../main/resources/script/schema_job_lock_mysql.sql | 4 ++-- .../resources/script/schema_job_lock_postgresql.sql | 4 ++-- .../java/org/apache/kylin/job/util/JobFilterUtil.java | 2 +- .../kylin/rest/service/JobResourceServiceTest.java | 5 ++--- .../org/apache/kylin/rest/service/OpsServiceTest.java | 7 +++---- .../main/java/org/apache/kylin/tool/JobInfoTool.java | 5 +---- 21 files changed, 65 insertions(+), 78 deletions(-) diff --git a/src/core-common/src/main/resources/metadata-jdbc-h2.properties b/src/core-common/src/main/resources/metadata-jdbc-h2.properties index 96550fbbd3..dd8d7dc3c2 100644 --- a/src/core-common/src/main/resources/metadata-jdbc-h2.properties +++ b/src/core-common/src/main/resources/metadata-jdbc-h2.properties @@ -150,18 +150,18 @@ create.rawrecommendation.store.index= #### JOB METADATA STORE create.job.info.table=CREATE TABLE IF NOT EXISTS `%s` ( \ - `id` bigint(10) NOT NULL AUTO_INCREMENT, \ + `id` bigint(20) NOT NULL AUTO_INCREMENT, \ `job_id` varchar(100) NOT NULL, \ `mvcc` bigint(10) , \ `job_type` varchar(50) NOT NULL, \ `job_status` varchar(50) NOT NULL, \ - `project` varchar(512) NOT NULL, \ - `subject` varchar(512) NOT NULL, \ - `model_id` varchar(50) NOT NULL, \ + `project` varchar(100) NOT NULL, \ + `subject` varchar(200) NOT NULL, \ + `model_id` varchar(100) NOT NULL, \ `priority` integer DEFAULT 3, \ `job_content` longblob NOT NULL, \ - `create_time` datetime DEFAULT CURRENT_TIMESTAMP, \ - `update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, \ + `create_time` bigint, \ + `update_time` bigint, \ `job_duration_millis` bigint(10) NOT NULL DEFAULT '0' COMMENT 'total duration milliseconds', \ PRIMARY KEY (`id`), \ UNIQUE KEY `uk_job_id` (`job_id`) \ @@ -173,8 +173,8 @@ create.job.lock.table=CREATE TABLE IF NOT EXISTS `%s` ( \ `lock_node` varchar(50) DEFAULT NULL COMMENT 'who locked it', \ `lock_expire_time` datetime DEFAULT NULL COMMENT 'when does the lock expire', \ `priority` integer DEFAULT 3, \ - `create_time` datetime DEFAULT CURRENT_TIMESTAMP, \ - `update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, \ + `create_time` bigint, \ + `update_time` bigint, \ PRIMARY KEY (`id`), \ UNIQUE KEY `uk_lock_id` (`lock_id`) \ ) DEFAULT CHARSET=utf8; diff --git a/src/core-job/src/main/java/org/apache/kylin/job/core/lock/JdbcLockClient.java b/src/core-job/src/main/java/org/apache/kylin/job/core/lock/JdbcLockClient.java index e5fcdb91e4..5392efc44b 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/core/lock/JdbcLockClient.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/core/lock/JdbcLockClient.java @@ -109,7 +109,7 @@ public class JdbcLockClient { boolean acquired = false; try { int r = jobContext.getJobLockMapper().updateLock(jobLock.getLockId(), jobLock.getLockNode(), - jobLock.getRenewalSec()); + jobLock.getRenewalSec(), System.currentTimeMillis()); acquired = r > 0; return acquired; } catch (Exception e) { diff --git a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java index 73ea4248f1..5a05dd2068 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java @@ -21,7 +21,6 @@ import static org.apache.kylin.job.util.JobInfoUtil.JOB_SERIALIZER; import java.util.ArrayList; import java.util.Arrays; -import java.util.Date; import java.util.List; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -194,8 +193,8 @@ public class JobInfoDao { jobInfo.setSubject(subject); jobInfo.setModelId(executablePO.getTargetModel()); - jobInfo.setCreateTime(new Date(executablePO.getCreateTime())); - jobInfo.setUpdateTime(new Date(executablePO.getLastModified())); + jobInfo.setCreateTime(executablePO.getCreateTime()); + jobInfo.setUpdateTime(executablePO.getLastModified()); jobInfo.setJobContent(JobInfoUtil.serializeExecutablePO(executablePO)); ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), @@ -241,10 +240,6 @@ public class JobInfoDao { } public Long getEarliestJobCreateTime(String project) { - Date result = jobInfoMapper.getEarliestCreateTime(project); - if (null == result) { - return null; - } - return result.getTime(); + return jobInfoMapper.getEarliestCreateTime(project); } } diff --git a/src/core-job/src/main/java/org/apache/kylin/job/domain/JobInfo.java b/src/core-job/src/main/java/org/apache/kylin/job/domain/JobInfo.java index 4dacc67e2c..92301e835b 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/domain/JobInfo.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/domain/JobInfo.java @@ -18,8 +18,6 @@ package org.apache.kylin.job.domain; -import java.util.Date; - public class JobInfo { private Long id; @@ -35,9 +33,9 @@ public class JobInfo { private String modelId; - private Date createTime; + private long createTime; - private Date updateTime; + private long updateTime; private Long jobDurationMillis; @@ -106,19 +104,19 @@ public class JobInfo { this.modelId = modelId; } - public Date getCreateTime() { + public long getCreateTime() { return createTime; } - public void setCreateTime(Date createTime) { + public void setCreateTime(long createTime) { this.createTime = createTime; } - public Date getUpdateTime() { + public long getUpdateTime() { return updateTime; } - public void setUpdateTime(Date updateTime) { + public void setUpdateTime(long updateTime) { this.updateTime = updateTime; } diff --git a/src/core-job/src/main/java/org/apache/kylin/job/domain/JobLock.java b/src/core-job/src/main/java/org/apache/kylin/job/domain/JobLock.java index 963a1d949c..7083c50159 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/domain/JobLock.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/domain/JobLock.java @@ -40,9 +40,9 @@ public class JobLock { private int priority; - private Date createTime; + private long createTime; - private Date updateTime; + private long updateTime; // placeholder for mybatis ${} private String jobLockTable; @@ -52,6 +52,7 @@ public class JobLock { public JobLock(String lockId, int priority) { this.lockId = lockId; this.priority = priority; - this.createTime = new Date(); + this.createTime = System.currentTimeMillis(); + this.updateTime = System.currentTimeMillis(); } } diff --git a/src/core-job/src/main/java/org/apache/kylin/job/mapper/JobInfoMapper.java b/src/core-job/src/main/java/org/apache/kylin/job/mapper/JobInfoMapper.java index c0fc0bdcae..23e6b02a9a 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/mapper/JobInfoMapper.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/mapper/JobInfoMapper.java @@ -18,7 +18,6 @@ package org.apache.kylin.job.mapper; -import java.util.Date; import java.util.List; import org.apache.ibatis.annotations.Mapper; @@ -51,6 +50,6 @@ public interface JobInfoMapper { long countByJobFilter(JobMapperFilter jobMapperFilter); - Date getEarliestCreateTime(@Param("project") String project); + long getEarliestCreateTime(@Param("project") String project); } diff --git a/src/core-job/src/main/java/org/apache/kylin/job/mapper/JobLockMapper.java b/src/core-job/src/main/java/org/apache/kylin/job/mapper/JobLockMapper.java index f879602984..c0b3c4f785 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/mapper/JobLockMapper.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/mapper/JobLockMapper.java @@ -45,7 +45,7 @@ public interface JobLockMapper { int deleteAllJobLock(); int updateLock(@Param("lockId") String lockId, @Param("lockNode") String lockNode, - @Param("renewalSec") long renewalSec); + @Param("renewalSec") long renewalSec, @Param("updateTime") long updateTime); int removeLock(@Param("lockId") String lockId, @Param("lockNode") String lockNode); diff --git a/src/core-job/src/main/java/org/apache/kylin/job/rest/JobMapperFilter.java b/src/core-job/src/main/java/org/apache/kylin/job/rest/JobMapperFilter.java index 6eba7cea66..b907a5a070 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/rest/JobMapperFilter.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/rest/JobMapperFilter.java @@ -18,7 +18,6 @@ package org.apache.kylin.job.rest; -import java.util.Date; import java.util.List; import org.apache.kylin.guava30.shaded.common.collect.Lists; @@ -43,7 +42,7 @@ public class JobMapperFilter { // jobName is jobType private List<String> jobNames; - private Date queryStartTime; + private long queryStartTime; // for sql condition: in (...) private List<String> subjects; @@ -72,7 +71,7 @@ public class JobMapperFilter { // placeholder for mybatis ${} private String jobInfoTable; - private List<Date> timeRange; + private List<Long> timeRange; public void setStatuses(List<ExecutableState> stateList) { statuses = stateList; diff --git a/src/core-job/src/main/java/org/apache/kylin/job/util/JobInfoUtil.java b/src/core-job/src/main/java/org/apache/kylin/job/util/JobInfoUtil.java index 532b755141..3b84a0284e 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/util/JobInfoUtil.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/util/JobInfoUtil.java @@ -50,7 +50,7 @@ public class JobInfoUtil { public static ExecutablePO deserializeExecutablePO(JobInfo jobInfo) { ByteSource byteSource = ByteSource.wrap(jobInfo.getJobContent()); try { - return deserializeExecutablePO(byteSource, jobInfo.getUpdateTime().getTime(), jobInfo.getProject()); + return deserializeExecutablePO(byteSource, jobInfo.getUpdateTime(), jobInfo.getProject()); } catch (IOException e) { log.warn("Error when deserializing jobInfo, id: {} " + jobInfo.getJobId(), e); return null; diff --git a/src/core-job/src/main/resources/mybatis-mapper/JobInfoMapper.xml b/src/core-job/src/main/resources/mybatis-mapper/JobInfoMapper.xml index f969f40546..e3a00cb2bd 100644 --- a/src/core-job/src/main/resources/mybatis-mapper/JobInfoMapper.xml +++ b/src/core-job/src/main/resources/mybatis-mapper/JobInfoMapper.xml @@ -11,8 +11,8 @@ <result column="subject" jdbcType="VARCHAR" property="subject" /> <result column="model_id" jdbcType="VARCHAR" property="modelId" /> <result column="priority" javaType="INTEGER" property="priority" /> - <result column="create_time" jdbcType="TIMESTAMP" property="createTime" /> - <result column="update_time" jdbcType="TIMESTAMP" property="updateTime" /> + <result column="create_time" jdbcType="BIGINT" property="createTime" /> + <result column="update_time" jdbcType="BIGINT" property="updateTime" /> <result column="job_duration_millis" jdbcType="BIGINT" property="jobDurationMillis" /> </resultMap> <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="org.apache.kylin.job.domain.JobInfo"> @@ -35,8 +35,8 @@ job_duration_millis, mvcc, job_content) values (#{id,jdbcType=BIGINT}, #{jobId,jdbcType=VARCHAR}, #{jobType,jdbcType=VARCHAR}, #{jobStatus,jdbcType=VARCHAR}, #{project,jdbcType=VARCHAR}, #{subject,jdbcType=VARCHAR}, - #{modelId,jdbcType=VARCHAR}, #{priority,jdbcType=INTEGER}, #{createTime,jdbcType=TIMESTAMP}, - #{updateTime,jdbcType=TIMESTAMP}, #{jobDurationMillis,jdbcType=BIGINT}, #{mvcc}, #{jobContent,jdbcType=BINARY}) + #{modelId,jdbcType=VARCHAR}, #{priority,jdbcType=INTEGER}, #{createTime,jdbcType=BIGINT}, + #{updateTime,jdbcType=BIGINT}, #{jobDurationMillis,jdbcType=BIGINT}, #{mvcc}, #{jobContent,jdbcType=BINARY}) </insert> <delete id="deleteByJobId" parameterType="java.lang.String"> @@ -120,10 +120,10 @@ #{priority,jdbcType=INTEGER}, </if> <if test="createTime != null"> - #{createTime,jdbcType=TIMESTAMP}, + #{createTime,jdbcType=BIGINT}, </if> <if test="updateTime != null"> - #{updateTime,jdbcType=TIMESTAMP}, + #{updateTime,jdbcType=BIGINT}, </if> <if test="jobDurationMillis != null"> #{jobDurationMillis,jdbcType=BIGINT}, @@ -166,10 +166,10 @@ priority = #{priority,jdbcType=INTEGER}, </if> <if test="createTime != null"> - create_time = #{createTime,jdbcType=TIMESTAMP}, + create_time = #{createTime,jdbcType=BIGINT}, </if> <if test="updateTime != null"> - update_time = #{updateTime,jdbcType=TIMESTAMP}, + update_time = #{updateTime,jdbcType=BIGINT}, </if> <if test="jobDurationMillis != null"> job_duration_millis = #{jobDurationMillis,jdbcType=BIGINT}, @@ -241,7 +241,7 @@ </if> </select> - <select id="getEarliestCreateTime" parameterType="java.lang.String" resultType="java.util.Date"> + <select id="getEarliestCreateTime" parameterType="java.lang.String" resultType="long"> select min(create_time) from ${jobInfoTable} <where> diff --git a/src/core-job/src/main/resources/mybatis-mapper/JobLockMapper.xml b/src/core-job/src/main/resources/mybatis-mapper/JobLockMapper.xml index 2ceeb44c13..528f7ea425 100644 --- a/src/core-job/src/main/resources/mybatis-mapper/JobLockMapper.xml +++ b/src/core-job/src/main/resources/mybatis-mapper/JobLockMapper.xml @@ -7,8 +7,8 @@ <result column="lock_node" jdbcType="VARCHAR" property="lockNode" /> <result column="lock_expire_time" jdbcType="TIMESTAMP" property="lockExpireTime" /> <result column="priority" jdbcType="INTEGER" property="priority" /> - <result column="create_time" jdbcType="TIMESTAMP" property="createTime" /> - <result column="update_time" jdbcType="TIMESTAMP" property="updateTime" /> + <result column="create_time" jdbcType="BIGINT" property="createTime" /> + <result column="update_time" jdbcType="BIGINT" property="updateTime" /> </resultMap> <resultMap id="PriorityFistRandomOrderJob" type="org.apache.kylin.job.domain.PriorityFistRandomOrderJob"> <result column="lock_id" jdbcType="VARCHAR" property="jobId" /> @@ -41,7 +41,7 @@ lock_expire_time, priority, create_time, update_time ) values (#{id,jdbcType=BIGINT}, #{lockId,jdbcType=VARCHAR}, #{lockNode,jdbcType=VARCHAR}, - #{lockExpireTime,jdbcType=TIMESTAMP}, #{priority,jdbcType=INTEGER}, #{createTime,jdbcType=TIMESTAMP}, #{updateTime,jdbcType=TIMESTAMP} + #{lockExpireTime,jdbcType=TIMESTAMP}, #{priority,jdbcType=INTEGER}, #{createTime,jdbcType=BIGINT}, #{updateTime,jdbcType=BIGINT} ) </insert> <insert id="insertSelective" parameterType="org.apache.kylin.job.domain.JobLock"> @@ -86,10 +86,10 @@ #{priority,jdbcType=INTEGER}, </if> <if test="createTime != null"> - #{createTime,jdbcType=TIMESTAMP}, + #{createTime,jdbcType=BIGINT}, </if> <if test="updateTime != null"> - #{updateTime,jdbcType=TIMESTAMP}, + #{updateTime,jdbcType=BIGINT}, </if> </trim> </insert> @@ -109,7 +109,7 @@ <if test="database == 'postgresql'"> lock_expire_time = CURRENT_TIMESTAMP + interval '${renewalSec} second', </if> - update_time = CURRENT_TIMESTAMP + update_time = #{updateTime,jdbcType=BIGINT} where lock_id = #{lockId,jdbcType=VARCHAR} and (lock_node is null or lock_expire_time <![CDATA[<]]> CURRENT_TIMESTAMP or lock_node = #{lockNode,jdbcType=VARCHAR}) diff --git a/src/core-job/src/main/resources/script/schema_job_info_h2.sql b/src/core-job/src/main/resources/script/schema_job_info_h2.sql index a180819ab3..9335bd3502 100644 --- a/src/core-job/src/main/resources/script/schema_job_info_h2.sql +++ b/src/core-job/src/main/resources/script/schema_job_info_h2.sql @@ -21,14 +21,14 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_info ( job_id varchar(100) NOT NULL, job_type varchar(50) NOT NULL, job_status varchar(50) NOT NULL, - project varchar(50) NOT NULL, - subject varchar(50) NOT NULL, + project varchar(100) NOT NULL, + subject varchar(200) NOT NULL, model_id varchar(100), priority integer DEFAULT 3, job_content longblob NOT NULL, mvcc bigint(10), - create_time timestamp DEFAULT CURRENT_TIMESTAMP, - update_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + create_time bigint, + update_time bigint, job_duration_millis bigint(10) NOT NULL DEFAULT '0' COMMENT 'total duration milliseconds', PRIMARY KEY (id), UNIQUE KEY uk_job_id (job_id) diff --git a/src/core-job/src/main/resources/script/schema_job_info_mysql.sql b/src/core-job/src/main/resources/script/schema_job_info_mysql.sql index e28469638a..63187e29b2 100644 --- a/src/core-job/src/main/resources/script/schema_job_info_mysql.sql +++ b/src/core-job/src/main/resources/script/schema_job_info_mysql.sql @@ -23,13 +23,13 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_info ( job_type varchar(50) NOT NULL, job_status varchar(50) NOT NULL, project varchar(100) NOT NULL, - subject varchar(100) NOT NULL, + subject varchar(200) NOT NULL, model_id varchar(100), priority integer DEFAULT 3, mvcc bigint(10), job_content longblob NOT NULL, - create_time timestamp DEFAULT CURRENT_TIMESTAMP, - update_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + create_time bigint, + update_time bigint, job_duration_millis bigint(10) NOT NULL DEFAULT '0' COMMENT 'total duration milliseconds', PRIMARY KEY (id), UNIQUE KEY uk_job_id (job_id) diff --git a/src/core-job/src/main/resources/script/schema_job_info_postgresql.sql b/src/core-job/src/main/resources/script/schema_job_info_postgresql.sql index 0414f555b8..604a5d2f68 100644 --- a/src/core-job/src/main/resources/script/schema_job_info_postgresql.sql +++ b/src/core-job/src/main/resources/script/schema_job_info_postgresql.sql @@ -23,13 +23,13 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_info ( job_type varchar(50) NOT NULL, job_status varchar(50) NOT NULL, project varchar(100) NOT NULL, - subject varchar(100) NOT NULL, + subject varchar(200) NOT NULL, model_id varchar(100), priority integer DEFAULT 3, mvcc bigint, job_content bytea NOT NULL, - create_time timestamptz(3) DEFAULT CURRENT_TIMESTAMP, - update_time timestamptz(3) DEFAULT CURRENT_TIMESTAMP, + create_time bigint, + update_time bigint, job_duration_millis bigint NOT NULL DEFAULT '0' ); comment on column KE_IDENTIFIED_job_info.job_duration_millis is 'total duration milliseconds'; diff --git a/src/core-job/src/main/resources/script/schema_job_lock_h2.sql b/src/core-job/src/main/resources/script/schema_job_lock_h2.sql index ee02cb29b3..7f886da928 100644 --- a/src/core-job/src/main/resources/script/schema_job_lock_h2.sql +++ b/src/core-job/src/main/resources/script/schema_job_lock_h2.sql @@ -23,8 +23,8 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_lock ( lock_node varchar(50) DEFAULT NULL COMMENT 'who locked it', lock_expire_time timestamp COMMENT 'when does the lock expire', priority integer DEFAULT 3, - create_time timestamp DEFAULT CURRENT_TIMESTAMP, - update_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + create_time bigint, + update_time bigint, PRIMARY KEY (id), UNIQUE KEY uk_lock_id (lock_id) ) DEFAULT CHARSET=utf8; diff --git a/src/core-job/src/main/resources/script/schema_job_lock_mysql.sql b/src/core-job/src/main/resources/script/schema_job_lock_mysql.sql index 388c1609e9..bd59896a94 100644 --- a/src/core-job/src/main/resources/script/schema_job_lock_mysql.sql +++ b/src/core-job/src/main/resources/script/schema_job_lock_mysql.sql @@ -23,8 +23,8 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_lock ( lock_node varchar(50) DEFAULT NULL COMMENT 'who locked it', lock_expire_time timestamp COMMENT 'when does the lock expire', priority integer DEFAULT 3, - create_time timestamp DEFAULT CURRENT_TIMESTAMP, - update_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + create_time bigint, + update_time bigint, PRIMARY KEY (id), UNIQUE KEY uk_lock_id (lock_id) ) AUTO_INCREMENT=10000 ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/src/core-job/src/main/resources/script/schema_job_lock_postgresql.sql b/src/core-job/src/main/resources/script/schema_job_lock_postgresql.sql index 4e493275ae..4142a73fdf 100644 --- a/src/core-job/src/main/resources/script/schema_job_lock_postgresql.sql +++ b/src/core-job/src/main/resources/script/schema_job_lock_postgresql.sql @@ -22,8 +22,8 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_lock ( lock_node varchar(50) DEFAULT NULL, lock_expire_time timestamptz DEFAULT NULL, priority integer DEFAULT 3, - create_time timestamptz DEFAULT CURRENT_TIMESTAMP, - update_time timestamptz DEFAULT CURRENT_TIMESTAMP + create_time bigint, + update_time bigint ); comment on column KE_IDENTIFIED_job_lock.lock_id is 'what is locked'; comment on column KE_IDENTIFIED_job_lock.lock_node is 'who locked it'; diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/job/util/JobFilterUtil.java b/src/data-loading-service/src/main/java/org/apache/kylin/job/util/JobFilterUtil.java index 91be102892..65e4a13060 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/job/util/JobFilterUtil.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/job/util/JobFilterUtil.java @@ -93,7 +93,7 @@ public class JobFilterUtil { List<String> scheduleStateNames = scheduleStates.stream().map(executableState -> executableState.name()) .collect(Collectors.toList()); - return new JobMapperFilter(scheduleStates, jobFilter.getJobNames(), queryStartTime, + return new JobMapperFilter(scheduleStates, jobFilter.getJobNames(), queryStartTime.getTime(), Lists.newArrayList(subjects), null, jobId, null, jobFilter.getProject(), orderByField, orderType, offset, limit, JobMybatisConfig.JOB_INFO_TABLE, null); } diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobResourceServiceTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobResourceServiceTest.java index daf05b69ef..5446304f68 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobResourceServiceTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobResourceServiceTest.java @@ -18,7 +18,6 @@ package org.apache.kylin.rest.service; -import java.util.Date; import java.util.Map; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; @@ -119,7 +118,7 @@ public class JobResourceServiceTest extends NLocalFileMetadataTestCase { theMock.when(() -> JobContextUtil.getJobInfoDao(Mockito.any())).thenReturn(jobInfoDao); Assert.assertEquals(0, jobResourceService.getQueueNames().size()); JobInfo jobInfoNotTasks = new JobInfo(); - jobInfoNotTasks.setUpdateTime(new Date()); + jobInfoNotTasks.setUpdateTime(System.currentTimeMillis()); jobInfoNotTasks.setJobContent(JobInfoUtil.serializeExecutablePO(new ExecutablePO())); Mockito.doReturn(Lists.newArrayList(jobInfoNotTasks)).when(jobInfoDao) @@ -141,7 +140,7 @@ public class JobResourceServiceTest extends NLocalFileMetadataTestCase { info.put(ExecutableConstants.QUEUE_NAME, "test_build_queue"); subTaskOutput.setInfo(info); po.setTasks(Lists.newArrayList(subTask)); - jobInfo.setUpdateTime(new Date()); + jobInfo.setUpdateTime(System.currentTimeMillis()); jobInfo.setJobContent(JobInfoUtil.serializeExecutablePO(po)); Mockito.doReturn(Lists.newArrayList(jobInfo)).when(jobInfoDao).getJobInfoListByFilter(Mockito.any()); } diff --git a/src/ops-service/src/test/java/org/apache/kylin/rest/service/OpsServiceTest.java b/src/ops-service/src/test/java/org/apache/kylin/rest/service/OpsServiceTest.java index e31f829e02..f4c79e7c08 100644 --- a/src/ops-service/src/test/java/org/apache/kylin/rest/service/OpsServiceTest.java +++ b/src/ops-service/src/test/java/org/apache/kylin/rest/service/OpsServiceTest.java @@ -25,7 +25,6 @@ import static org.mockito.ArgumentMatchers.anyString; import java.io.IOException; import java.util.Collections; -import java.util.Date; import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -110,7 +109,7 @@ public class OpsServiceTest extends NLocalFileMetadataTestCase { JobInfo jobInfo = new JobInfo(); jobInfo.setJobId("mock_job_id"); jobInfo.setProject(project); - jobInfo.setUpdateTime(new Date()); + jobInfo.setUpdateTime(System.currentTimeMillis()); ExecutablePO originExecutable = new ExecutablePO(); originExecutable.setName("origin_executable"); byte[] originJobContent = JobInfoUtil.serializeExecutablePO(originExecutable); @@ -232,7 +231,7 @@ public class OpsServiceTest extends NLocalFileMetadataTestCase { JobInfo jobInfo = new JobInfo(); jobInfo.setJobId("mock_job_id3"); jobInfo.setProject(UnitOfWork.GLOBAL_UNIT); - jobInfo.setUpdateTime(new Date()); + jobInfo.setUpdateTime(System.currentTimeMillis()); ExecutablePO originExecutable = new ExecutablePO(); originExecutable.setName("origin_executable"); byte[] originJobContent = JobInfoUtil.serializeExecutablePO(originExecutable); @@ -316,7 +315,7 @@ public class OpsServiceTest extends NLocalFileMetadataTestCase { JobInfo jobInfo = new JobInfo(); jobInfo.setJobId("mock_job_id2"); jobInfo.setProject(project); - jobInfo.setUpdateTime(new Date()); + jobInfo.setUpdateTime(System.currentTimeMillis()); ExecutablePO originExecutable = new ExecutablePO(); originExecutable.setName("origin_executable"); byte[] originJobContent = JobInfoUtil.serializeExecutablePO(originExecutable); diff --git a/src/tool/src/main/java/org/apache/kylin/tool/JobInfoTool.java b/src/tool/src/main/java/org/apache/kylin/tool/JobInfoTool.java index 7d4d800f0e..8ec578ac20 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/JobInfoTool.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/JobInfoTool.java @@ -28,7 +28,6 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.nio.charset.Charset; -import java.sql.Date; import java.util.Arrays; import java.util.List; import java.util.zip.ZipEntry; @@ -99,9 +98,7 @@ public class JobInfoTool extends CancelableTask { } public void extractFull(File dir, long startTime, long endTime) { - Date startDate = new Date(startTime); - Date endDate = new Date(endTime); - JobMapperFilter filter = JobMapperFilter.builder().timeRange(Arrays.asList(startDate, endDate)).build(); + JobMapperFilter filter = JobMapperFilter.builder().timeRange(Arrays.asList(startTime, endTime)).build(); List<JobInfo> jobs = JobContextUtil.getJobInfoDao(KylinConfig.getInstanceFromEnv()) .getJobInfoListByFilter(filter); for (JobInfo job : jobs) {