This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 95f399e KYLIN-3597 fix sonar issues 95f399e is described below commit 95f399e98957992e9a13fa1e71ee319e3018f246 Author: shaofengshi <shaofeng...@apache.org> AuthorDate: Wed Sep 26 17:21:31 2018 +0800 KYLIN-3597 fix sonar issues --- .../common/persistence/HDFSResourceStore.java | 14 +- .../common/persistence/JDBCConnectionManager.java | 16 +- .../kylin/common/persistence/JDBCResourceDAO.java | 26 +- .../common/persistence/JDBCResourceStore.java | 3 +- .../storage/gtrecord/GTCubeStorageQueryBase.java | 49 ++-- .../kylin/storage/hbase/steps/CreateHTableJob.java | 95 +++----- .../storage/hbase/util/HbaseStreamingInput.java | 262 --------------------- 7 files changed, 81 insertions(+), 384 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java index 1739ce0..e5bef40 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java @@ -65,18 +65,18 @@ public class HDFSResourceStore extends ResourceStore { if (path == null) { // missing path is not expected, but don't fail it path = kylinConfig.getHdfsWorkingDirectory() + "tmp_metadata"; - logger.warn("Missing path, fall back to " + path); + logger.warn("Missing path, fall back to {0}", path); } fs = HadoopUtil.getFileSystem(path); Path metadataPath = new Path(path); if (fs.exists(metadataPath) == false) { - logger.warn("Path not exist in HDFS, create it: " + path); + logger.warn("Path not exist in HDFS, create it: {0}", path); createMetaFolder(metadataPath); } hdfsMetaPath = metadataPath; - logger.info("hdfs meta path : " + hdfsMetaPath.toString()); + logger.info("hdfs meta path : {0}", hdfsMetaPath.toString()); } @@ -86,7 +86,7 @@ public class HDFSResourceStore extends ResourceStore { fs.mkdirs(metaDirName); } - logger.info("hdfs meta path created: " + metaDirName.toString()); + logger.info("hdfs meta path created: {0}", metaDirName.toString()); } @Override @@ -159,7 +159,7 @@ public class HDFSResourceStore extends ResourceStore { Path p = getRealHDFSPath(resPath); if (fs.exists(p) && fs.isFile(p)) { if (fs.getFileStatus(p).getLen() == 0) { - logger.warn("Zero length file: " + p.toString()); + logger.warn("Zero length file: {0}", p.toString()); } FSDataInputStream in = fs.open(p); long t = in.readLong(); @@ -190,9 +190,9 @@ public class HDFSResourceStore extends ResourceStore { @Override protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException { - logger.trace("res path : " + resPath); + logger.trace("res path : {0}", resPath); Path p = getRealHDFSPath(resPath); - logger.trace("put resource : " + p.toUri()); + logger.trace("put resource : {0}", p.toUri()); FSDataOutputStream out = null; try { out = fs.create(p, true); diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java index 753601a..5f56de1 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java @@ -42,15 +42,9 @@ public class JDBCConnectionManager { private static JDBCConnectionManager INSTANCE = null; - private static Object lock = new Object(); - - public static JDBCConnectionManager getConnectionManager() { + public static synchronized JDBCConnectionManager getConnectionManager() { if (INSTANCE == null) { - synchronized (lock) { - if (INSTANCE == null) { - INSTANCE = new JDBCConnectionManager(KylinConfig.getInstanceFromEnv()); - } - } + INSTANCE = new JDBCConnectionManager(KylinConfig.getInstanceFromEnv()); } return INSTANCE; } @@ -67,10 +61,10 @@ public class JDBCConnectionManager { dataSource = BasicDataSourceFactory.createDataSource(getDbcpProperties()); Connection conn = getConn(); DatabaseMetaData mdm = conn.getMetaData(); - logger.info("Connected to " + mdm.getDatabaseProductName() + " " + mdm.getDatabaseProductVersion()); + logger.info("Connected to {0} {1}", mdm.getDatabaseProductName(), mdm.getDatabaseProductVersion()); closeQuietly(conn); } catch (Exception e) { - throw new RuntimeException(e); + throw new IllegalArgumentException(e); } } @@ -94,7 +88,7 @@ public class JDBCConnectionManager { ret.remove("passwordEncrypted"); } - logger.info("Connecting to Jdbc with url:" + ret.get("url") + " by user " + ret.get("username")); + logger.info("Connecting to Jdbc with url:{0} by user {1}", ret.get("url"), ret.get("username")); putIfMissing(ret, "driverClassName", "com.mysql.jdbc.Driver"); putIfMissing(ret, "maxActive", "5"); diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java index a226af6..dce0894 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java @@ -32,8 +32,10 @@ import java.text.FieldPosition; import java.text.MessageFormat; import java.util.List; import java.util.Locale; +import java.util.NavigableSet; import java.util.TreeSet; +import com.google.common.base.Preconditions; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -116,7 +118,7 @@ public class JDBCResourceDAO { if (fetchContent) { try { resource.setContent(getInputStream(resourcePath, rs)); - } catch (Throwable e) { + } catch (Exception e) { if (!isAllowBroken) { throw new SQLException(e); } @@ -147,8 +149,8 @@ public class JDBCResourceDAO { } //fetch primary key only - public TreeSet<String> listAllResource(final String folderPath, final boolean recursive) throws SQLException { - final TreeSet<String> allResourceName = new TreeSet<>(); + public NavigableSet<String> listAllResource(final String folderPath, final boolean recursive) throws SQLException { + final NavigableSet<String> allResourceName = new TreeSet<>(); executeSql(new SqlOperation() { @Override public void execute(Connection connection) throws SQLException { @@ -158,7 +160,7 @@ public class JDBCResourceDAO { rs = pstat.executeQuery(); while (rs.next()) { String path = rs.getString(META_TABLE_KEY); - assert path.startsWith(folderPath); + Preconditions.checkState(path.startsWith(folderPath)); if (recursive) { allResourceName.add(path); } else { @@ -192,7 +194,7 @@ public class JDBCResourceDAO { resource.setTimestamp(rs.getLong(META_TABLE_TS)); try { resource.setContent(getInputStream(resPath, rs)); - } catch (Throwable e) { + } catch (Exception e) { if (!isAllowBroken) { throw new SQLException(e); } @@ -240,7 +242,7 @@ public class JDBCResourceDAO { if (!skipHdfs) { try { deleteHDFSResourceIfExist(resourcePath); - } catch (Throwable e) { + } catch (Exception e) { throw new SQLException(e); } } @@ -389,7 +391,7 @@ public class JDBCResourceDAO { bout = new ByteArrayOutputStream(); IOUtils.copy(resource.getContent(), bout); return bout.toByteArray(); - } catch (Throwable e) { + } catch (Exception e) { throw new SQLException(e); } finally { IOUtils.closeQuietly(bout); @@ -635,10 +637,10 @@ public class JDBCResourceDAO { out = redirectFileSystem.create(redirectPath); out.write(largeColumn); return redirectPath; - } catch (Throwable e) { + } catch (Exception e) { try { rollbackLargeCellFromHdfs(resPath); - } catch (Throwable ex) { + } catch (Exception ex) { logger.error("fail to roll back resource " + resPath + " in hdfs", ex); } throw new SQLException(e); @@ -659,12 +661,12 @@ public class JDBCResourceDAO { redirectFileSystem.delete(redirectPath, true); logger.warn("no backup for hdfs file {} is found, clean it", resPath); } - } catch (Throwable e) { + } catch (Exception e) { try { //last try to delete redirectPath, because we prefer a deleted rather than incomplete redirectFileSystem.delete(redirectPath, true); - } catch (Throwable ex) { + } catch (Exception ex) { logger.error("fail to delete resource " + redirectPath + " in hdfs", ex); } @@ -679,7 +681,7 @@ public class JDBCResourceDAO { if (redirectFileSystem.exists(oldPath)) { redirectFileSystem.delete(oldPath, true); } - } catch (Throwable e) { + } catch (Exception e) { logger.warn("error cleaning the backup file for " + redirectPath + ", leave it as garbage", e); } } diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java index ea6e231..a0a58cb 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java @@ -23,7 +23,6 @@ import java.io.InputStream; import java.sql.SQLException; import java.util.List; import java.util.NavigableSet; -import java.util.TreeSet; import org.apache.commons.io.IOUtils; import org.apache.kylin.common.KylinConfig; @@ -91,7 +90,7 @@ public class JDBCResourceStore extends ResourceStore { @Override protected NavigableSet<String> listResourcesImpl(String folderPath, boolean recursive) throws IOException { try { - final TreeSet<String> result = resourceDAO.listAllResource(makeFolderPath(folderPath), recursive); + final NavigableSet<String> result = resourceDAO.listAllResource(makeFolderPath(folderPath), recursive); return result.isEmpty() ? null : result; } catch (SQLException e) { throw new IOException(e); diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index 269833f..5f4d4be 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -90,9 +90,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { List<CubeSegmentScanner> scanners = Lists.newArrayList(); SegmentPruner segPruner = new SegmentPruner(sqlDigest.filter); for (CubeSegment cubeSeg : segPruner.listSegmentsForQuery(cubeInstance)) { - CubeSegmentScanner scanner; - - scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), // + CubeSegmentScanner scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), // request.getGroups(), request.getDynGroups(), request.getDynGroupExprs(), // request.getMetrics(), request.getDynFuncs(), // request.getFilter(), request.getHavingFilter(), request.getContext()); @@ -121,8 +119,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { TupleFilter filter = sqlDigest.filter; // build dimension & metrics - Set<TblColRef> dimensions = new LinkedHashSet<TblColRef>(); - Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>(); + Set<TblColRef> dimensions = new LinkedHashSet<>(); + Set<FunctionDesc> metrics = new LinkedHashSet<>(); buildDimensionsAndMetrics(sqlDigest, dimensions, metrics); // all dimensions = groups + other(like filter) dimensions @@ -136,13 +134,13 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { otherDimsD.removeAll(groupsD); // identify cuboid - Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>(); + Set<TblColRef> dimensionsD = new LinkedHashSet<>(); dimensionsD.addAll(groupsD); dimensionsD.addAll(otherDimsD); Cuboid cuboid = findCuboid(cubeInstance, dimensionsD, metrics); context.setCuboid(cuboid); - // set cuboid to GridTable mapping; + // set cuboid to GridTable mapping boolean noDynamicCols; // dynamic dimensions List<TblColRef> dynGroups = Lists.newArrayList(sqlDigest.dynGroupbyColumns.keySet()); @@ -288,7 +286,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) { toCheck = filter.getChildren(); } else { - return (Set<CompareTupleFilter>) Collections.EMPTY_SET; + return Collections.emptySet(); } Set<CompareTupleFilter> result = Sets.newHashSet(); @@ -308,7 +306,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { private long getQueryFilterMask(Set<TblColRef> filterColumnD) { long filterMask = 0; - logger.info("Filter column set for query: " + filterColumnD.toString()); + logger.info("Filter column set for query: {0}", filterColumnD.toString()); if (filterColumnD.size() != 0) { RowKeyColDesc[] allColumns = cubeDesc.getRowkey().getRowKeyColumns(); for (int i = 0; i < allColumns.length; i++) { @@ -317,7 +315,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { } } } - logger.info("Filter mask is: " + filterMask); + logger.info("Filter mask is: {0}", filterMask); return filterMask; } @@ -433,21 +431,19 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { if (!groupsD.containsAll(cuboid.getColumns().subList(0, size))) { storageLimitLevel = StorageLimitLevel.LIMIT_ON_RETURN_SIZE; logger.debug( - "storageLimitLevel set to LIMIT_ON_RETURN_SIZE because groupD is not clustered at head, groupsD: " - + groupsD // - + " with cuboid columns: " + cuboid.getColumns()); + "storageLimitLevel set to LIMIT_ON_RETURN_SIZE because groupD is not clustered at head, groupsD: {0} with cuboid columns: {1}", groupsD.toString(), cuboid.getColumns().toString()); } if (!dynGroups.isEmpty()) { storageLimitLevel = StorageLimitLevel.NO_LIMIT; - logger.debug("Storage limit push down is impossible because the query has dynamic groupby " + dynGroups); + logger.debug("Storage limit push down is impossible because the query has dynamic groupby {0}", dynGroups); } // derived aggregation is bad, unless expanded columns are already in group by if (!groups.containsAll(derivedPostAggregation)) { storageLimitLevel = StorageLimitLevel.NO_LIMIT; - logger.debug("storageLimitLevel set to NO_LIMIT because derived column require post aggregation: " - + derivedPostAggregation); + logger.debug("storageLimitLevel set to NO_LIMIT because derived column require post aggregation: {0}", + derivedPostAggregation); } if (!TupleFilter.isEvaluableRecursively(filter)) { @@ -457,7 +453,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { if (!loosenedColumnD.isEmpty()) { // KYLIN-2173 storageLimitLevel = StorageLimitLevel.NO_LIMIT; - logger.debug("storageLimitLevel set to NO_LIMIT because filter is loosened: " + loosenedColumnD); + logger.debug("storageLimitLevel set to NO_LIMIT because filter is loosened: {0}", loosenedColumnD); } if (context.hasSort()) { @@ -469,7 +465,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { for (FunctionDesc functionDesc : functionDescs) { if (functionDesc.isDimensionAsMetric()) { storageLimitLevel = StorageLimitLevel.NO_LIMIT; - logger.debug("storageLimitLevel set to NO_LIMIT because {} isDimensionAsMetric ", functionDesc); + logger.debug("storageLimitLevel set to NO_LIMIT because {0} isDimensionAsMetric ", functionDesc); } } @@ -488,8 +484,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { } if (!shardByInGroups.isEmpty()) { enabled = false; - logger.debug("Aggregate partition results is not beneficial because shard by columns in groupD: " - + shardByInGroups); + logger.debug("Aggregate partition results is not beneficial because shard by columns in groupD: {0}", + shardByInGroups); } if (!context.isNeedStorageAggregation()) { @@ -536,7 +532,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { return null; // OK, push down - logger.info("Push down having filter " + havingFilter); + logger.info("Push down having filter {0}", havingFilter); // convert columns in the filter Set<TblColRef> aggrOutCols = new HashSet<>(); @@ -568,21 +564,20 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { } if (cuboid.requirePostAggregation()) { - logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId()); + logger.info("exactAggregation is false because cuboid {0}=>{1}", cuboid.getInputID(), cuboid.getId()); return false; } // derived aggregation is bad, unless expanded columns are already in group by if (!groups.containsAll(derivedPostAggregation)) { - logger.info("exactAggregation is false because derived column require post aggregation: " - + derivedPostAggregation); + logger.info("exactAggregation is false because derived column require post aggregation: {0}", + derivedPostAggregation); return false; } // other columns (from filter) is bad, unless they are ensured to have single value if (!singleValuesD.containsAll(othersD)) { - logger.info("exactAggregation is false because some column not on group by: " + othersD // - + " (single value column: " + singleValuesD + ")"); + logger.info("exactAggregation is false because some column not on group by: {0} (single value column: {1})", othersD, singleValuesD); return false; } @@ -610,7 +605,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { } } - logger.info("exactAggregation is true, cuboid id is " + cuboid.getId()); + logger.info("exactAggregation is true, cuboid id is {0}", String.valueOf(cuboid.getId())); return true; } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java index 7205802..ca8da25 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java @@ -32,7 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.io.NullWritable; @@ -52,7 +52,6 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.CubeStatsReader; import org.apache.kylin.engine.mr.common.CuboidShardUtil; -import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.metadata.model.IEngineAware; import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; @@ -108,7 +107,7 @@ public class CreateHTableJob extends AbstractHadoopJob { for (Long cuboid : buildingCuboids) { Double cuboidSize = cuboidSizeMap.get(cuboid); if (cuboidSize == null) { - logger.warn(cuboid + "cuboid's size is null will replace by 0"); + logger.warn("{0} cuboid's size is null will replace by 0", cuboid); cuboidSize = 0.0; } optimizedCuboidSizeMap.put(cuboid, cuboidSize); @@ -128,7 +127,7 @@ public class CreateHTableJob extends AbstractHadoopJob { return 0; } - private void exportHBaseConfiguration(String hbaseTableName) throws Exception { + private void exportHBaseConfiguration(String hbaseTableName) throws IOException { Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration(); HadoopUtil.healSickConfig(hbaseConf); @@ -136,14 +135,12 @@ public class CreateHTableJob extends AbstractHadoopJob { HTable table = new HTable(hbaseConf, hbaseTableName); HFileOutputFormat2.configureIncrementalLoadMap(job, table); - logger.info("Saving HBase configuration to " + hbaseConfPath); + logger.info("Saving HBase configuration to {0}", hbaseConfPath); FileSystem fs = HadoopUtil.getWorkingFileSystem(); FSDataOutputStream out = null; try { out = fs.create(new Path(hbaseConfPath)); job.getConfiguration().writeXml(out); - } catch (IOException e) { - throw new ExecuteException("Write hbase configuration failed", e); } finally { IOUtils.closeQuietly(out); } @@ -167,7 +164,7 @@ public class CreateHTableJob extends AbstractHadoopJob { final CubeDesc cubeDesc = cubeSegment.getCubeDesc(); float cut = cubeDesc.getConfig().getKylinHBaseRegionCut(); - logger.info("Cut for HBase region is " + cut + "GB"); + logger.info("Cut for HBase region is {0} GB", String.valueOf(cut)); double totalSizeInM = 0; for (Double cuboidSize : cubeSizeMap.values()) { @@ -182,7 +179,7 @@ public class CreateHTableJob extends AbstractHadoopJob { nRegion = Math.max(kylinConfig.getHBaseRegionCountMin(), nRegion); nRegion = Math.min(kylinConfig.getHBaseRegionCountMax(), nRegion); - if (cubeSegment.isEnableSharding()) {//&& (nRegion > 1)) { + if (cubeSegment.isEnableSharding()) { //use prime nRegions to help random sharding int original = nRegion; if (nRegion == 0) { @@ -190,22 +187,22 @@ public class CreateHTableJob extends AbstractHadoopJob { } if (nRegion > Short.MAX_VALUE) { - logger.info("Too many regions! reduce to " + Short.MAX_VALUE); + logger.info("Too many regions! reduce to {0}" + String.valueOf(Short.MAX_VALUE)); nRegion = Short.MAX_VALUE; } if (nRegion != original) { logger.info( - "Region count is adjusted from " + original + " to " + nRegion + " to help random sharding"); + "Region count is adjusted from {0} to {1} to help random sharding", String.valueOf(original), String.valueOf(nRegion)); } } int mbPerRegion = (int) (totalSizeInM / nRegion); mbPerRegion = Math.max(1, mbPerRegion); - logger.info("Total size " + totalSizeInM + "M (estimated)"); - logger.info("Expecting " + nRegion + " regions."); - logger.info("Expecting " + mbPerRegion + " MB per region."); + logger.info("Total size {0} M (estimated)", String.valueOf(totalSizeInM)); + logger.info("Expecting {0} regions.", String.valueOf(nRegion)); + logger.info("Expecting {0} MB per region.", String.valueOf(mbPerRegion)); if (cubeSegment.isEnableSharding()) { //each cuboid will be split into different number of shards @@ -247,42 +244,15 @@ public class CreateHTableJob extends AbstractHadoopJob { } for (int i = 0; i < nRegion; ++i) { - logger.info( - String.format(Locale.ROOT, "Region %d's estimated size is %.2f MB, accounting for %.2f percent", - i, regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM)); + logger.info("Region {0}'s estimated size is {1} MB, accounting for {2} percent", + String.valueOf(i), String.valueOf(regionSizes[i]), String.valueOf(100.0 * regionSizes[i] / totalSizeInM)); } CuboidShardUtil.saveCuboidShards(cubeSegment, cuboidShards, nRegion); saveHFileSplits(innerRegionSplits, mbPerRegion, hfileSplitsOutputFolder, kylinConfig); return getSplitsByRegionCount(nRegion); - } else { - List<Long> regionSplit = Lists.newArrayList(); - - long size = 0; - int regionIndex = 0; - int cuboidCount = 0; - for (int i = 0; i < allCuboids.size(); i++) { - long cuboidId = allCuboids.get(i); - if (size >= mbPerRegion || (size + cubeSizeMap.get(cuboidId)) >= mbPerRegion * 1.2) { - // if the size already bigger than threshold, or it will exceed by 20%, cut for next region - regionSplit.add(cuboidId); - logger.info("Region " + regionIndex + " will be " + size + " MB, contains cuboids < " + cuboidId - + " (" + cuboidCount + ") cuboids"); - size = 0; - cuboidCount = 0; - regionIndex++; - } - size += cubeSizeMap.get(cuboidId); - cuboidCount++; - } - - byte[][] result = new byte[regionSplit.size()][]; - for (int i = 0; i < regionSplit.size(); i++) { - result[i] = Bytes.toBytes(regionSplit.get(i)); - } - - return result; + throw new IllegalStateException("Not supported"); } } @@ -308,20 +278,20 @@ public class CreateHTableJob extends AbstractHadoopJob { } // keep the tweak for sandbox test - if (hfileSizeMB > 0.0 && kylinConfig.isDevEnv()) { - hfileSizeMB = mbPerRegion / 2; + if (hfileSizeMB > 0.0f && kylinConfig.isDevEnv()) { + hfileSizeMB = mbPerRegion / 2f; } - int compactionThreshold = Integer.valueOf(hbaseConf.get("hbase.hstore.compactionThreshold", "3")); - logger.info("hbase.hstore.compactionThreshold is " + compactionThreshold); - if (hfileSizeMB > 0.0 && hfileSizeMB * compactionThreshold < mbPerRegion) { - hfileSizeMB = mbPerRegion / compactionThreshold; + int compactionThreshold = Integer.parseInt(hbaseConf.get("hbase.hstore.compactionThreshold", "3")); + logger.info("hbase.hstore.compactionThreshold is {0}", String.valueOf(compactionThreshold)); + if (hfileSizeMB > 0.0f && hfileSizeMB * compactionThreshold < mbPerRegion) { + hfileSizeMB = ((float) mbPerRegion) / compactionThreshold; } - if (hfileSizeMB <= 0) { + if (hfileSizeMB <= 0f) { hfileSizeMB = mbPerRegion; } - logger.info("hfileSizeMB:" + hfileSizeMB); + logger.info("hfileSizeMB {0}", String.valueOf(hfileSizeMB)); final Path hfilePartitionFile = new Path(outputFolder, "part-r-00000_hfile"); short regionCount = (short) innerRegionSplits.size(); @@ -331,7 +301,7 @@ public class CreateHTableJob extends AbstractHadoopJob { // skip 0 byte[] split = new byte[RowConstants.ROWKEY_SHARDID_LEN]; BytesUtil.writeUnsigned(i, split, 0, RowConstants.ROWKEY_SHARDID_LEN); - splits.add(split); // split by region; + splits.add(split); } HashMap<Long, Double> cuboidSize = innerRegionSplits.get(i); @@ -344,8 +314,7 @@ public class CreateHTableJob extends AbstractHadoopJob { for (Long cuboid : allCuboids) { if (accumulatedSize >= hfileSizeMB) { - logger.info( - String.format(Locale.ROOT, "Region %d's hfile %d size is %.2f mb", i, j, accumulatedSize)); + logger.info("Region {0}'s hfile {1} size is {2} mb", String.valueOf(i), String.valueOf(j), String.valueOf(accumulatedSize)); byte[] split = new byte[RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN]; BytesUtil.writeUnsigned(i, split, 0, RowConstants.ROWKEY_SHARDID_LEN); System.arraycopy(Bytes.toBytes(cuboid), 0, split, RowConstants.ROWKEY_SHARDID_LEN, @@ -359,17 +328,17 @@ public class CreateHTableJob extends AbstractHadoopJob { } - SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf, + try (SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf, SequenceFile.Writer.file(hfilePartitionFile), SequenceFile.Writer.keyClass(RowKeyWritable.class), - SequenceFile.Writer.valueClass(NullWritable.class)); + SequenceFile.Writer.valueClass(NullWritable.class))) { - for (int i = 0; i < splits.size(); i++) { - //when we compare the rowkey, we compare the row firstly. - hfilePartitionWriter.append( - new RowKeyWritable(KeyValue.createFirstOnRow(splits.get(i)).createKeyOnly(false).getKey()), - NullWritable.get()); + for (int i = 0; i < splits.size(); i++) { + //when we compare the rowkey, we compare the row firstly. + hfilePartitionWriter.append( + new RowKeyWritable(KeyValueUtil.createFirstOnRow(splits.get(i), 9223372036854775807L).createKeyOnly(false).getKey()), + NullWritable.get()); + } } - hfilePartitionWriter.close(); } public static void main(String[] args) throws Exception { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java deleted file mode 100644 index 47f4c58..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.hbase.util; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Calendar; -import java.util.List; -import java.util.Locale; -import java.util.Random; -import java.util.TimeZone; -import java.util.concurrent.Semaphore; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.storage.hbase.HBaseConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -/** - */ -public class HbaseStreamingInput { - private static final Logger logger = LoggerFactory.getLogger(HbaseStreamingInput.class); - - private static final int CELL_SIZE = 128 * 1024; // 128 KB - private static final byte[] CF = "F".getBytes(StandardCharsets.UTF_8); - private static final byte[] QN = "C".getBytes(StandardCharsets.UTF_8); - - public static void createTable(String tableName) throws IOException { - Connection conn = getConnection(); - Admin hadmin = conn.getAdmin(); - - try { - boolean tableExist = hadmin.tableExists(TableName.valueOf(tableName)); - if (tableExist) { - logger.info("HTable '" + tableName + "' already exists"); - return; - } - - logger.info("Creating HTable '" + tableName + "'"); - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); - desc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());//disable region split - desc.setMemStoreFlushSize(512 << 20);//512M - - HColumnDescriptor fd = new HColumnDescriptor(CF); - fd.setBlocksize(CELL_SIZE); - desc.addFamily(fd); - hadmin.createTable(desc); - - logger.info("HTable '" + tableName + "' created"); - } finally { - IOUtils.closeQuietly(conn); - IOUtils.closeQuietly(hadmin); - } - } - - private static void scheduleJob(Semaphore semaphore, int interval) { - while (true) { - semaphore.release(); - try { - Thread.sleep(interval); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - public static void addData(String tableName) throws IOException { - - createTable(tableName); - - final Semaphore semaphore = new Semaphore(0); - new Thread(new Runnable() { - @Override - public void run() { - scheduleJob(semaphore, 300000);//5 minutes a batch - } - }).start(); - - while (true) { - try { - semaphore.acquire(); - int waiting = semaphore.availablePermits(); - if (waiting > 0) { - logger.warn("There are another " + waiting + " batches waiting to be added"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - e.printStackTrace(); - } - - Connection conn = getConnection(); - Table table = conn.getTable(TableName.valueOf(tableName)); - - byte[] key = new byte[8 + 4];//time + id - - logger.info("============================================"); - long startTime = System.currentTimeMillis(); - logger.info("data load start time in millis: " + startTime); - logger.info("data load start at " + formatTime(startTime)); - List<Put> buffer = Lists.newArrayList(); - for (int i = 0; i < (1 << 10); ++i) { - long time = System.currentTimeMillis(); - Bytes.putLong(key, 0, time); - Bytes.putInt(key, 8, i); - Put put = new Put(key); - byte[] cell = randomBytes(CELL_SIZE); - put.addColumn(CF, QN, cell); - buffer.add(put); - } - table.put(buffer); - table.close(); - conn.close(); - long endTime = System.currentTimeMillis(); - logger.info("data load end at " + formatTime(endTime)); - logger.info("data load time consumed: " + (endTime - startTime)); - logger.info("============================================"); - } - } - - public static void randomScan(String tableName) throws IOException { - - final Semaphore semaphore = new Semaphore(0); - new Thread(new Runnable() { - @Override - public void run() { - scheduleJob(semaphore, 60000);//1 minutes a batch - } - }).start(); - - while (true) { - try { - semaphore.acquire(); - int waiting = semaphore.drainPermits(); - if (waiting > 0) { - logger.warn("Too many queries to handle! Blocking " + waiting + " sets of scan requests"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - e.printStackTrace(); - } - - Random r = new Random(); - Connection conn = getConnection(); - Table table = conn.getTable(TableName.valueOf(tableName)); - - long leftBound = getFirstKeyTime(table); - long rightBound = System.currentTimeMillis(); - - for (int t = 0; t < 5; ++t) { - long start = (long) (leftBound + r.nextDouble() * (rightBound - leftBound)); - long end = start + 600000;//a period of 10 minutes - logger.info("A scan from " + formatTime(start) + " to " + formatTime(end)); - - Scan scan = new Scan(); - scan.setStartRow(Bytes.toBytes(start)); - scan.setStopRow(Bytes.toBytes(end)); - scan.addFamily(CF); - ResultScanner scanner = table.getScanner(scan); - long hash = 0; - int rowCount = 0; - for (Result result : scanner) { - Cell cell = result.getColumnLatestCell(CF, QN); - byte[] value = cell.getValueArray(); - if (cell.getValueLength() != CELL_SIZE) { - logger.error("value size invalid!!!!!"); - } - - hash += Arrays.hashCode(Arrays.copyOfRange(value, cell.getValueOffset(), - cell.getValueLength() + cell.getValueOffset())); - rowCount++; - } - scanner.close(); - logger.info("Scanned " + rowCount + " rows, the (meaningless) hash for the scan is " + hash); - } - table.close(); - conn.close(); - } - } - - private static long getFirstKeyTime(Table table) throws IOException { - long startTime = 0; - - Scan scan = new Scan(); - scan.addFamily(CF); - ResultScanner scanner = table.getScanner(scan); - for (Result result : scanner) { - Cell cell = result.getColumnLatestCell(CF, QN); - byte[] key = cell.getRowArray(); - startTime = Bytes.toLong(key, cell.getRowOffset(), 8); - logger.info("Retrieved first record time: " + formatTime(startTime)); - break;//only get first one - } - scanner.close(); - return startTime; - - } - - private static Connection getConnection() throws IOException { - return HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); - } - - private static String formatTime(long time) { - DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss", Locale.ROOT); - Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("GMT"), Locale.ROOT); - cal.setTimeInMillis(time); - return dateFormat.format(cal.getTime()); - } - - private static byte[] randomBytes(int lenth) { - byte[] bytes = new byte[lenth]; - Random rand = new Random(); - rand.nextBytes(bytes); - return bytes; - } - - public static void main(String[] args) throws Exception { - - if (args[0].equalsIgnoreCase("createtable")) { - createTable(args[1]); - } else if (args[0].equalsIgnoreCase("adddata")) { - addData(args[1]); - } else if (args[0].equalsIgnoreCase("randomscan")) { - randomScan(args[1]); - } - } - -}