This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 8c78e7c96d095bc30b8fd7ec0745f7a49cf8658e Author: etherge <ethe...@163.com> AuthorDate: Fri Feb 14 18:27:18 2020 -0500 minor, Math operands should be cast before assignment --- .../persistence/ExponentialBackoffRetry.java | 2 +- .../cube/cuboid/algorithm/BPUSCalculator.java | 2 +- .../cube/cuboid/algorithm/CuboidStatsUtil.java | 4 +- .../dict/lookup/cache/RocksDBLookupTableCache.java | 46 +++++++++------- .../kylin/metrics/lib/impl/BlockingReservoir.java | 9 ++-- .../org/apache/kylin/storage/StorageContext.java | 2 +- .../kylin/engine/mr/streaming/ColumnToRowJob.java | 2 +- .../apache/kylin/query/relnode/OLAPProjectRel.java | 2 +- .../kylin/rest/controller/QueryController.java | 2 +- .../apache/kylin/rest/job/KylinHealthCheckJob.java | 15 +++--- .../java/org/apache/kylin/source/jdbc/SqlUtil.java | 3 +- .../kylin/storage/hbase/util/CubeMigrationCLI.java | 61 +++++++++++++++------- .../kylin/stream/coordinator/Coordinator.java | 38 +++++++------- .../coordinate/StreamingCoordinator.java | 3 +- .../storage/columnar/ColumnarSegmentStore.java | 2 +- .../columnar/FSInputGeneralColumnDataReader.java | 2 +- .../stream/server/ReplicaSetLeaderSelector.java | 2 +- .../org/apache/kylin/tool/CubeMigrationCLI.java | 6 +-- 18 files changed, 119 insertions(+), 84 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ExponentialBackoffRetry.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ExponentialBackoffRetry.java index 315c51e..d06337b 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/ExponentialBackoffRetry.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ExponentialBackoffRetry.java @@ -101,7 +101,7 @@ public class ExponentialBackoffRetry { if (retryCount == 0) firstSleepTime = System.currentTimeMillis(); - long ms = baseSleepTimeMs * (1 << retryCount); + long ms = baseSleepTimeMs * (1L << retryCount); if (ms > maxSleepTimeMs) ms = maxSleepTimeMs; diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/BPUSCalculator.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/BPUSCalculator.java index 39c52da..3041af9 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/BPUSCalculator.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/BPUSCalculator.java @@ -120,7 +120,7 @@ public class BPUSCalculator implements BenefitPolicy { protected double getCostSaving(long descendant, long cuboid) { long cuboidCost = getCuboidCost(cuboid); long descendantAggCost = getCuboidAggregationCost(descendant); - return descendantAggCost - cuboidCost; + return (double) descendantAggCost - cuboidCost; } protected Long getCuboidCost(long cuboid) { diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtil.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtil.java index d9aaf54..1f9914b 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtil.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtil.java @@ -125,7 +125,7 @@ public class CuboidStatsUtil { nEffective++; } } - + if (nEffective != 0) srcCuboidsStats.put(cuboid, totalEstRowCount / nEffective); else @@ -349,7 +349,7 @@ public class CuboidStatsUtil { } private static double calculateRollupRatio(Pair<Long, Long> rollupStats) { - double rollupInputCount = rollupStats.getFirst() + rollupStats.getSecond(); + double rollupInputCount = (double) rollupStats.getFirst() + rollupStats.getSecond(); return rollupInputCount == 0 ? 0 : 1.0 * rollupStats.getFirst() / rollupInputCount; } } diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java index c748e18..bbcaaf3 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java @@ -200,13 +200,14 @@ public class RocksDBLookupTableCache implements IExtLookupTableCache { private void initExecutors() { this.cacheBuildExecutor = new ThreadPoolExecutor(0, 50, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("lookup-cache-build-thread")); - this.cacheStateCheckExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory( - "lookup-cache-state-checker")); - cacheStateCheckExecutor.scheduleAtFixedRate(cacheStateChecker, 10, 10 * 60, TimeUnit.SECONDS); // check every 10 minutes + this.cacheStateCheckExecutor = Executors + .newSingleThreadScheduledExecutor(new NamedThreadFactory("lookup-cache-state-checker")); + cacheStateCheckExecutor.scheduleAtFixedRate(cacheStateChecker, 10, 10 * 60L, TimeUnit.SECONDS); // check every 10 minutes } @Override - public ILookupTable getCachedLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo, boolean buildIfNotExist) { + public ILookupTable getCachedLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo, + boolean buildIfNotExist) { String resourcePath = extTableSnapshotInfo.getResourcePath(); if (inBuildingTables.containsKey(resourcePath)) { logger.info("cache is in building for snapshot:" + resourcePath); @@ -215,7 +216,8 @@ public class RocksDBLookupTableCache implements IExtLookupTableCache { CachedTableInfo cachedTableInfo = tablesCache.getIfPresent(resourcePath); if (cachedTableInfo == null) { if (buildIfNotExist) { - buildSnapshotCache(tableDesc, extTableSnapshotInfo, getSourceLookupTable(tableDesc, extTableSnapshotInfo)); + buildSnapshotCache(tableDesc, extTableSnapshotInfo, + getSourceLookupTable(tableDesc, extTableSnapshotInfo)); } logger.info("no available cache ready for the table snapshot:" + extTableSnapshotInfo.getResourcePath()); return null; @@ -231,14 +233,16 @@ public class RocksDBLookupTableCache implements IExtLookupTableCache { } @Override - public void buildSnapshotCache(final TableDesc tableDesc, final ExtTableSnapshotInfo extTableSnapshotInfo, final ILookupTable sourceTable) { + public void buildSnapshotCache(final TableDesc tableDesc, final ExtTableSnapshotInfo extTableSnapshotInfo, + final ILookupTable sourceTable) { if (extTableSnapshotInfo.getSignature().getSize() / 1024 > maxCacheSizeInKB * 2 / 3) { logger.warn("the size is to large to build to cache for snapshot:{}, size:{}, skip cache building", extTableSnapshotInfo.getResourcePath(), extTableSnapshotInfo.getSignature().getSize()); return; } final String[] keyColumns = extTableSnapshotInfo.getKeyColumns(); - final String cachePath = getSnapshotCachePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId()); + final String cachePath = getSnapshotCachePath(extTableSnapshotInfo.getTableName(), + extTableSnapshotInfo.getId()); final String dbPath = getSnapshotStorePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId()); final String snapshotResPath = extTableSnapshotInfo.getResourcePath(); @@ -278,8 +282,8 @@ public class RocksDBLookupTableCache implements IExtLookupTableCache { if (inBuildingTables.containsKey(resourcePath)) { return CacheState.IN_BUILDING; } - File stateFile = getCacheStateFile(getSnapshotCachePath(extTableSnapshotInfo.getTableName(), - extTableSnapshotInfo.getId())); + File stateFile = getCacheStateFile( + getSnapshotCachePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId())); if (!stateFile.exists()) { return CacheState.NONE; } @@ -301,14 +305,14 @@ public class RocksDBLookupTableCache implements IExtLookupTableCache { } private void saveSnapshotCacheState(ExtTableSnapshotInfo extTableSnapshotInfo, String cachePath) { - File stateFile = getCacheStateFile(getSnapshotCachePath(extTableSnapshotInfo.getTableName(), - extTableSnapshotInfo.getId())); + File stateFile = getCacheStateFile( + getSnapshotCachePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId())); try { Files.write(CacheState.AVAILABLE.name(), stateFile, Charsets.UTF_8); tablesCache.put(extTableSnapshotInfo.getResourcePath(), new CachedTableInfo(cachePath)); } catch (IOException e) { - throw new RuntimeException("error when write cache state for snapshot:" - + extTableSnapshotInfo.getResourcePath()); + throw new RuntimeException( + "error when write cache state for snapshot:" + extTableSnapshotInfo.getResourcePath()); } } @@ -347,17 +351,19 @@ public class RocksDBLookupTableCache implements IExtLookupTableCache { } } - final Set<String> activeSnapshotSet = ExtTableSnapshotInfoManager.getInstance(config).getAllExtSnapshotResPaths(); + final Set<String> activeSnapshotSet = ExtTableSnapshotInfoManager.getInstance(config) + .getAllExtSnapshotResPaths(); - List<Pair<String, File>> toRemovedCachedSnapshots = Lists.newArrayList(FluentIterable.from( - allCachedSnapshots).filter(new Predicate<Pair<String, File>>() { - @Override + List<Pair<String, File>> toRemovedCachedSnapshots = Lists.newArrayList( + FluentIterable.from(allCachedSnapshots).filter(new Predicate<Pair<String, File>>() { + @Override public boolean apply(@Nullable Pair<String, File> input) { long lastModified = input.getSecond().lastModified(); return !activeSnapshotSet.contains(input.getFirst()) && lastModified > 0 - && lastModified < (System.currentTimeMillis() - config.getExtTableSnapshotLocalCacheCheckVolatileRange()); - } - })); + && lastModified < (System.currentTimeMillis() + - config.getExtTableSnapshotLocalCacheCheckVolatileRange()); + } + })); for (Pair<String, File> toRemovedCachedSnapshot : toRemovedCachedSnapshots) { File snapshotCacheFolder = toRemovedCachedSnapshot.getSecond(); logger.info("removed cache file:{}, it is not referred by any cube", diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java index d754b19..22f199a 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java @@ -161,9 +161,10 @@ public class BlockingReservoir extends AbstractActiveReservoir { startTime = System.currentTimeMillis(); continue; } else if (size() < minReportSize && (System.currentTimeMillis() - startTime < maxReportTime)) { - logger.info("The number of records in the blocking queue is less than {} and " + - "the duration from last reporting is less than {} ms. " + - "Will delay to report!", minReportSize, maxReportTime); + logger.info( + "The number of records in the blocking queue is less than {} and " + + "the duration from last reporting is less than {} ms. " + "Will delay to report!", + minReportSize, maxReportTime); sleep(); continue; } @@ -177,7 +178,7 @@ public class BlockingReservoir extends AbstractActiveReservoir { private void sleep() { try { - Thread.sleep(60 * 1000); + Thread.sleep(60 * 1000L); } catch (InterruptedException e) { logger.warn("Interrupted during running"); } diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java index 5d2d06f..f763605 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java @@ -155,7 +155,7 @@ public class StorageContext { return; } - long temp = this.getOffset() + this.getLimit(); + long temp = this.getOffset() + (long) this.getLimit(); if (!isValidPushDownLimit(temp)) { logger.warn("Not enabling limit push down because current limit is invalid: " + this.getLimit()); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnToRowJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnToRowJob.java index 68070eb..2ca4ce4 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnToRowJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnToRowJob.java @@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory; public class ColumnToRowJob extends AbstractHadoopJob { private static final Logger logger = LoggerFactory.getLogger(ColumnToRowJob.class); - private static final long DEFAULT_SIZE_PER_REDUCER = 16 * 1024 * 1024; + private static final long DEFAULT_SIZE_PER_REDUCER = 16 * 1024 * 1024L; private static final int MAX_REDUCERS = 1000; @Override diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java index 8be7249..155a586 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java @@ -136,7 +136,7 @@ public class OLAPProjectRel extends Project implements OLAPRel { public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { boolean hasRexOver = RexOver.containsOver(getProjects(), null); RelOptCost relOptCost = super.computeSelfCost(planner, mq).multiplyBy(.05) - .multiplyBy(getProjects().size() * (hasRexOver ? 50 : 1)) + .multiplyBy(getProjects().size() * (double) (hasRexOver ? 50 : 1)) .plus(planner.getCostFactory().makeCost(0.1 * caseCount, 0, 0)); return planner.getCostFactory().makeCost(relOptCost.getRows(), 0, 0); } diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java index da0a1e5..07a1da9 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java @@ -208,7 +208,7 @@ public class QueryController extends BasicController { if (runTimeMoreThan == -1) { return QueryContextFacade.getAllRunningQueries(); } else { - return QueryContextFacade.getLongRunningQueries(runTimeMoreThan * 1000); + return QueryContextFacade.getLongRunningQueries(runTimeMoreThan * 1000L); } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/KylinHealthCheckJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/KylinHealthCheckJob.java index ec2d263..ea3bbc2 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/job/KylinHealthCheckJob.java +++ b/server-base/src/main/java/org/apache/kylin/rest/job/KylinHealthCheckJob.java @@ -18,7 +18,12 @@ package org.apache.kylin.rest.job; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Locale; + import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; @@ -47,11 +52,7 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.List; -import java.util.Locale; +import com.google.common.collect.Lists; public class KylinHealthCheckJob extends AbstractApplication { private static final Logger logger = LoggerFactory.getLogger(KylinHealthCheckJob.class); @@ -288,7 +289,7 @@ public class KylinHealthCheckJob extends AbstractApplication { long sizeRecordSize = cube.getInputRecordSizeBytes(); if (sizeRecordSize > 0) { long cubeDataSize = cube.getSizeKB() * 1024; - double expansionRate = cubeDataSize / sizeRecordSize; + double expansionRate = (double) cubeDataSize / sizeRecordSize; if (sizeRecordSize > 1L * expansionCheckMinCubeSizeInGb * 1024 * 1024 * 1024) { if (expansionRate > warningExpansionRate) { logger.info("Cube: {} in project: {} with too large expansion rate: {}, cube data size: {}G", diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java index 9299d78..ea3d0f1 100644 --- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java +++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java @@ -23,6 +23,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; import java.util.Random; + import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.source.hive.DBConnConf; import org.slf4j.Logger; @@ -84,7 +85,7 @@ public class SqlUtil { logger.warn("while use:" + dbconf, e); try { int rt = r.nextInt(10); - Thread.sleep(rt * 1000); + Thread.sleep(rt * 1000L); } catch (InterruptedException e1) { Thread.interrupted(); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java index 0bd60d5..ee05178 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java @@ -97,7 +97,8 @@ public class CubeMigrationCLI { private static final String ACL_INFO_FAMILY_PARENT_COLUMN = "p"; public static void main(String[] args) throws IOException, InterruptedException { - logger.warn("org.apache.kylin.storage.hbase.util.CubeMigrationCLI is deprecated, use org.apache.kylin.tool.CubeMigrationCLI instead"); + logger.warn( + "org.apache.kylin.storage.hbase.util.CubeMigrationCLI is deprecated, use org.apache.kylin.tool.CubeMigrationCLI instead"); if (args.length != 8) { usage(); @@ -108,12 +109,22 @@ public class CubeMigrationCLI { } private static void usage() { - System.out.println("Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName copyAclOrNot purgeOrNot overwriteIfExists realExecute"); - System.out.println(" srcKylinConfigUri: The KylinConfig of the cube’s source \n" + "dstKylinConfigUri: The KylinConfig of the cube’s new home \n" + "cubeName: the name of cube to be migrated. \n" + "projectName: The target project in the target environment.(Make sure it exist) \n" + "copyAclOrNot: true or false: whether copy cube ACL to target environment. \n" + "purgeOrNot: true or false: whether purge the cube from src server after the migration. \n" + "overwriteIfExists: overw [...] + System.out.println( + "Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName copyAclOrNot purgeOrNot overwriteIfExists realExecute"); + System.out.println(" srcKylinConfigUri: The KylinConfig of the cube’s source \n" + + "dstKylinConfigUri: The KylinConfig of the cube’s new home \n" + + "cubeName: the name of cube to be migrated. \n" + + "projectName: The target project in the target environment.(Make sure it exist) \n" + + "copyAclOrNot: true or false: whether copy cube ACL to target environment. \n" + + "purgeOrNot: true or false: whether purge the cube from src server after the migration. \n" + + "overwriteIfExists: overwrite cube if it already exists in the target environment. \n" + + "realExecute: if false, just print the operations to take, if true, do the real migration. \n"); } - public static void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { + public static void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, + String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) + throws IOException, InterruptedException { srcConfig = srcCfg; srcStore = ResourceStore.getStore(srcConfig); @@ -163,12 +174,16 @@ public class CubeMigrationCLI { } } - public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { + public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, + String purgeAndDisable, String overwriteIfExists, String realExecute) + throws IOException, InterruptedException { - moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, projectName, copyAcl, purgeAndDisable, overwriteIfExists, realExecute); + moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, + projectName, copyAcl, purgeAndDisable, overwriteIfExists, realExecute); } - public static void checkMigrationSuccess(KylinConfig kylinConfig, String cubeName, Boolean ifFix) throws IOException { + public static void checkMigrationSuccess(KylinConfig kylinConfig, String cubeName, Boolean ifFix) + throws IOException { CubeMigrationCheckCLI checkCLI = new CubeMigrationCheckCLI(kylinConfig, ifFix); checkCLI.execute(cubeName); } @@ -198,12 +213,14 @@ public class CubeMigrationCLI { private static void changeHtableHost(CubeInstance cube) { for (CubeSegment segment : cube.getSegments()) { - operations.add(new Opt(OptType.CHANGE_HTABLE_HOST, new Object[] { segment.getStorageLocationIdentifier() })); + operations + .add(new Opt(OptType.CHANGE_HTABLE_HOST, new Object[] { segment.getStorageLocationIdentifier() })); } } private static void copyACL(CubeInstance cube, String projectName) { - operations.add(new Opt(OptType.COPY_ACL, new Object[] { cube.getUuid(), cube.getDescriptor().getModel().getUuid(), projectName })); + operations.add(new Opt(OptType.COPY_ACL, + new Object[] { cube.getUuid(), cube.getDescriptor().getModel().getUuid(), projectName })); } private static void copyFilesInMetaStore(CubeInstance cube, String overwriteIfExists) throws IOException { @@ -213,7 +230,8 @@ public class CubeMigrationCLI { listCubeRelatedResources(cube, metaItems, dictAndSnapshot); if (dstStore.exists(cube.getResourcePath()) && !overwriteIfExists.equalsIgnoreCase("true")) - throw new IllegalStateException("The cube named " + cube.getName() + " already exists on target metadata store. Use overwriteIfExists to overwrite it"); + throw new IllegalStateException("The cube named " + cube.getName() + + " already exists on target metadata store. Use overwriteIfExists to overwrite it"); for (String item : metaItems) { operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[] { item })); @@ -224,7 +242,8 @@ public class CubeMigrationCLI { } } - private static void addCubeAndModelIntoProject(CubeInstance srcCube, String cubeName, String projectName) throws IOException { + private static void addCubeAndModelIntoProject(CubeInstance srcCube, String cubeName, String projectName) + throws IOException { String projectResPath = ProjectInstance.concatResourcePath(projectName); if (!dstStore.exists(projectResPath)) throw new IllegalStateException("The target project " + projectName + "does not exist"); @@ -236,7 +255,8 @@ public class CubeMigrationCLI { operations.add(new Opt(OptType.PURGE_AND_DISABLE, new Object[] { cubeName })); } - private static void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, Set<String> dictAndSnapshot) throws IOException { + private static void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, + Set<String> dictAndSnapshot) throws IOException { CubeDesc cubeDesc = cube.getDescriptor(); metaResource.add(cube.getResourcePath()); @@ -443,8 +463,10 @@ public class CubeMigrationCLI { Table srcAclHtable = null; Table destAclHtable = null; try { - srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()).getTable(TableName.valueOf(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); - destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); + srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()) + .getTable(TableName.valueOf(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); + destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()) + .getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); // cube acl Result result = srcAclHtable.get(new Get(Bytes.toBytes(cubeId))); @@ -455,8 +477,10 @@ public class CubeMigrationCLI { byte[] value = CellUtil.cloneValue(cell); // use the target project uuid as the parent - if (Bytes.toString(family).equals(ACL_INFO_FAMILY) && Bytes.toString(column).equals(ACL_INFO_FAMILY_PARENT_COLUMN)) { - String valueString = "{\"id\":\"" + projUUID + "\",\"type\":\"org.apache.kylin.metadata.project.ProjectInstance\"}"; + if (Bytes.toString(family).equals(ACL_INFO_FAMILY) + && Bytes.toString(column).equals(ACL_INFO_FAMILY_PARENT_COLUMN)) { + String valueString = "{\"id\":\"" + projUUID + + "\",\"type\":\"org.apache.kylin.metadata.project.ProjectInstance\"}"; value = Bytes.toBytes(valueString); } Put put = new Put(Bytes.toBytes(cubeId)); @@ -531,7 +555,8 @@ public class CubeMigrationCLI { String modelId = (String) opt.params[1]; Table destAclHtable = null; try { - destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); + destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()) + .getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); destAclHtable.delete(new Delete(Bytes.toBytes(cubeId))); destAclHtable.delete(new Delete(Bytes.toBytes(modelId))); @@ -572,7 +597,7 @@ public class CubeMigrationCLI { if (nRetry > 3) { throw new InterruptedException("Cannot rename folder " + srcPath + " to folder " + dstPath); } else { - Thread.sleep(sleepTime * nRetry * nRetry); + Thread.sleep((long) sleepTime * nRetry * nRetry); } } } diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java index 938c0b4..a0be4b1 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java @@ -37,10 +37,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.collect.Collections2; +import javax.annotation.Nullable; + import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; @@ -68,23 +66,23 @@ import org.apache.kylin.stream.coordinator.assign.Assigner; import org.apache.kylin.stream.coordinator.assign.AssignmentUtil; import org.apache.kylin.stream.coordinator.assign.AssignmentsCache; import org.apache.kylin.stream.coordinator.assign.CubePartitionRoundRobinAssigner; +import org.apache.kylin.stream.coordinator.assign.DefaultAssigner; +import org.apache.kylin.stream.coordinator.client.CoordinatorClient; import org.apache.kylin.stream.coordinator.exception.ClusterStateException; -import org.apache.kylin.stream.coordinator.exception.StoreException; -import org.apache.kylin.stream.coordinator.exception.ClusterStateException.TransactionStep; import org.apache.kylin.stream.coordinator.exception.ClusterStateException.ClusterState; +import org.apache.kylin.stream.coordinator.exception.ClusterStateException.TransactionStep; import org.apache.kylin.stream.coordinator.exception.CoordinateException; import org.apache.kylin.stream.coordinator.exception.NotLeadCoordinatorException; -import org.apache.kylin.stream.coordinator.assign.DefaultAssigner; -import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol; -import org.apache.kylin.stream.core.model.CubeAssignment; -import org.apache.kylin.stream.core.model.ReplicaSet; -import org.apache.kylin.stream.coordinator.client.CoordinatorClient; +import org.apache.kylin.stream.coordinator.exception.StoreException; import org.apache.kylin.stream.core.client.HttpReceiverAdminClient; import org.apache.kylin.stream.core.client.ReceiverAdminClient; +import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol; import org.apache.kylin.stream.core.model.AssignRequest; import org.apache.kylin.stream.core.model.ConsumerStatsResponse; +import org.apache.kylin.stream.core.model.CubeAssignment; import org.apache.kylin.stream.core.model.Node; import org.apache.kylin.stream.core.model.PauseConsumersRequest; +import org.apache.kylin.stream.core.model.ReplicaSet; import org.apache.kylin.stream.core.model.ResumeConsumerRequest; import org.apache.kylin.stream.core.model.SegmentBuildState; import org.apache.kylin.stream.core.model.StartConsumersRequest; @@ -97,21 +95,23 @@ import org.apache.kylin.stream.core.source.ISourcePositionHandler; import org.apache.kylin.stream.core.source.ISourcePositionHandler.MergeStrategy; import org.apache.kylin.stream.core.source.IStreamingSource; import org.apache.kylin.stream.core.source.Partition; -import org.apache.kylin.stream.core.source.StreamingTableSourceInfo; import org.apache.kylin.stream.core.source.StreamingSourceFactory; +import org.apache.kylin.stream.core.source.StreamingTableSourceInfo; import org.apache.kylin.stream.core.util.HDFSUtil; import org.apache.kylin.stream.core.util.NamedThreadFactory; import org.apache.kylin.stream.core.util.NodeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import javax.annotation.Nullable; - /** * <pre> * Each Kylin streaming cluster has at least one coordinator processes/server, coordinator @@ -1226,8 +1226,8 @@ public class Coordinator implements CoordinatorClient { private boolean isInOptimize(CubeInstance cube) { Segments<CubeSegment> readyPendingSegments = cube.getSegments(SegmentStatusEnum.READY_PENDING); if (readyPendingSegments.size() > 0) { - logger.info("The cube {} has READY_PENDING segments {}. It's not allowed for building", - cube.getName(), readyPendingSegments); + logger.info("The cube {} has READY_PENDING segments {}. It's not allowed for building", cube.getName(), + readyPendingSegments); return true; } Segments<CubeSegment> newSegments = cube.getSegments(SegmentStatusEnum.NEW); @@ -1240,7 +1240,9 @@ public class Coordinator implements CoordinatorClient { if (job != null && job instanceof CubingJob) { CubingJob cubingJob = (CubingJob) job; if (CubingJob.CubingJobTypeEnum.OPTIMIZE.toString().equals(cubingJob.getJobType())) { - logger.info("The cube {} is in optimization. It's not allowed to build new segments during optimization.", cube.getName()); + logger.info( + "The cube {} is in optimization. It's not allowed to build new segments during optimization.", + cube.getName()); return true; } } @@ -1333,7 +1335,7 @@ public class Coordinator implements CoordinatorClient { restoreJobStatusChecker(); while (true) { try { - Thread.sleep(5 * 60 * 1000); + Thread.sleep(5 * 60 * 1000L); } catch (InterruptedException exception) { Thread.interrupted(); break; diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java index 20e4947..54553a8 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java @@ -146,7 +146,6 @@ public class StreamingCoordinator implements CoordinatorClient { clusterStateCheckExecutor.scheduleAtFixedRate(clusterDoctor, 5, 10, TimeUnit.MINUTES); } - /** * Assign the streaming cube to replica sets. Replica sets is calculated by Assigner. * @@ -630,7 +629,7 @@ public class StreamingCoordinator implements CoordinatorClient { buildJobSubmitter.restore(); while (true) { try { - Thread.sleep(5 * 60 * 1000); + Thread.sleep(5 * 60 * 1000L); } catch (InterruptedException exception) { Thread.interrupted(); break; diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java index 5982065..dde1be2 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java @@ -247,7 +247,7 @@ public class ColumnarSegmentStore implements IStreamingSegmentStore { List<DataSegmentFragment> result = Lists.newArrayList(); int originFragmentsNum = allFragments.size(); int minFragments = config.getStreamingMinFragmentsInSegment(); - long maxFragmentSize = config.getStreamingMaxFragmentSizeInMb() * 1024 * 1024; + long maxFragmentSize = config.getStreamingMaxFragmentSizeInMb() * 1024 * 1024L; long toMergeDataSize = 0; for (int i = 0; i < originFragmentsNum; i++) { DataSegmentFragment fragment = allFragments.get(i); diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java index bbe06ae..214ece3 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java @@ -30,7 +30,7 @@ public class FSInputGeneralColumnDataReader implements ColumnDataReader { public FSInputGeneralColumnDataReader(FSDataInputStream fsInputStream, int dataStartOffset, int dataLength) throws IOException { this.fsInputStream = fsInputStream; - fsInputStream.seek(dataStartOffset + dataLength - 4); + fsInputStream.seek(dataStartOffset + dataLength - 4L); this.numOfVals = fsInputStream.readInt(); fsInputStream.seek(dataStartOffset); } diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java index e7bdbde..5419a3b 100644 --- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java +++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java @@ -73,7 +73,7 @@ public class ReplicaSetLeaderSelector extends LeaderSelectorListenerAdapter impl } while (true) { try { - Thread.sleep(5 * 60 * 1000); + Thread.sleep(5 * 60 * 1000L); } catch (InterruptedException exception) { Thread.interrupted(); break; diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java index ce3b203..9212d08 100644 --- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java @@ -201,7 +201,7 @@ public class CubeMigrationCLI extends AbstractApplication { showOpts(); } } - + public void checkMigrationSuccess(KylinConfig kylinConfig, String cubeName, Boolean ifFix) throws IOException { CubeMigrationCheckCLI checkCLI = new CubeMigrationCheckCLI(kylinConfig, ifFix); checkCLI.execute(cubeName); @@ -632,7 +632,7 @@ public class CubeMigrationCLI extends AbstractApplication { } } } - + private String renameTableWithinProject(String srcItem) { if (dstProject != null && srcItem.contains(ResourceStore.TABLE_RESOURCE_ROOT)) { String tableIdentity = TableDesc.parseResourcePath(srcItem).getTable(); @@ -670,7 +670,7 @@ public class CubeMigrationCLI extends AbstractApplication { if (nRetry > 3) { throw new InterruptedException("Cannot rename folder " + srcPath + " to folder " + dstPath); } else { - Thread.sleep(sleepTime * nRetry * nRetry); + Thread.sleep((long) sleepTime * nRetry * nRetry); } } }