This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit db866b7d0a9e6a1911a367e393c9c1f9018fa71f Author: XiaoxiangYu <hit_la...@126.com> AuthorDate: Mon Jul 27 23:40:56 2020 +0800 KYLIN-4660 remove some unsupported features - Advanced snapshot - Shrunken dictionary --- .../java/org/apache/kylin/cube/CubeManager.java | 18 +- .../kylin/cube/cli/DictionaryGeneratorCLI.java | 6 +- .../java/org/apache/kylin/cube/model/CubeDesc.java | 18 - .../apache/kylin/cube/model/SnapshotTableDesc.java | 12 - .../org/apache/kylin/dict/ShrunkenDictionary.java | 160 -------- .../kylin/dict/ShrunkenDictionaryBuilder.java | 49 --- .../dict/lookup/AbstractLookupRowEncoder.java | 122 ------ .../kylin/dict/lookup/ExtTableSnapshotInfo.java | 149 ------- .../dict/lookup/ExtTableSnapshotInfoManager.java | 213 ---------- .../kylin/dict/lookup/LookupProviderFactory.java | 82 ---- .../dict/lookup/cache/RocksDBLookupBuilder.java | 83 ---- .../dict/lookup/cache/RocksDBLookupRowEncoder.java | 70 ---- .../dict/lookup/cache/RocksDBLookupTable.java | 116 ------ .../dict/lookup/cache/RocksDBLookupTableCache.java | 430 --------------------- .../apache/kylin/dict/ShrunkenDictionaryTest.java | 144 ------- .../lookup/cache/RocksDBLookupRowEncoderTest.java | 80 ---- .../lookup/cache/RocksDBLookupTableCacheTest.java | 231 ----------- .../dict/lookup/cache/RocksDBLookupTableTest.java | 161 -------- kylin-spark-project/kylin-spark-common/pom.xml | 2 +- 19 files changed, 4 insertions(+), 2142 deletions(-) diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 8eb7887..6844d64 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -51,8 +51,6 @@ import org.apache.kylin.cube.model.DimensionDesc; import org.apache.kylin.cube.model.SnapshotTableDesc; import org.apache.kylin.dict.DictionaryInfo; import org.apache.kylin.dict.DictionaryManager; -import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo; -import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager; import org.apache.kylin.dict.lookup.ILookupTable; import org.apache.kylin.dict.lookup.LookupProviderFactory; import org.apache.kylin.dict.lookup.SnapshotManager; @@ -544,11 +542,7 @@ public class CubeManager implements IRealizationProvider { String tableName = join.getPKSide().getTableIdentity(); CubeDesc cubeDesc = cubeSegment.getCubeDesc(); SnapshotTableDesc snapshotTableDesc = cubeDesc.getSnapshotTableDesc(tableName); - if (snapshotTableDesc == null || !snapshotTableDesc.isExtSnapshotTable()) { - return getInMemLookupTable(cubeSegment, join, snapshotTableDesc); - } else { - return getExtLookupTable(cubeSegment, tableName, snapshotTableDesc); - } + return getInMemLookupTable(cubeSegment, join, snapshotTableDesc); } private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc join, @@ -567,16 +561,6 @@ public class CubeManager implements IRealizationProvider { } } - private ILookupTable getExtLookupTable(CubeSegment cubeSegment, String tableName, - SnapshotTableDesc snapshotTableDesc) { - String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc); - - ExtTableSnapshotInfo extTableSnapshot = ExtTableSnapshotInfoManager.getInstance(config) - .getSnapshot(snapshotResPath); - TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject()); - return LookupProviderFactory.getExtLookupTable(tableDesc, extTableSnapshot); - } - private String getSnapshotResPath(CubeSegment cubeSegment, String tableName, SnapshotTableDesc snapshotTableDesc) { String snapshotResPath; if (snapshotTableDesc == null || !snapshotTableDesc.isGlobal()) { diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java index 67b7bdb..f74c49f 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java @@ -111,10 +111,8 @@ public class DictionaryGeneratorCLI { TableRef table = dim.getTableRef(); if (cubeSeg.getModel().isLookupTable(table)) { // only the snapshot desc is not ext type, need to take snapshot - if (!cubeSeg.getCubeDesc().isExtSnapshotTable(table.getTableIdentity())) { - toSnapshot.add(table.getTableIdentity()); - toCheckLookup.add(table); - } + toSnapshot.add(table.getTableIdentity()); + toCheckLookup.add(table); } } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index f0ec311..b8890cb 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -1459,24 +1459,6 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { return desc.isGlobal(); } - public boolean isExtSnapshotTable(String tableName) { - SnapshotTableDesc desc = getSnapshotTableDesc(tableName); - if (desc == null) { - return false; - } - return desc.isExtSnapshotTable(); - } - - public List<String> getAllExtLookupSnapshotTypes() { - List<String> result = Lists.newArrayList(); - for (SnapshotTableDesc snapshotTableDesc : snapshotTableDescList) { - if (snapshotTableDesc.isExtSnapshotTable()) { - result.add(snapshotTableDesc.getStorageType()); - } - } - return result; - } - public boolean isStreamingCube() { return getModel().getRootFactTable().getTableDesc().isStreamingTable(); } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java index e61240b..cf3dbfb 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java @@ -60,16 +60,4 @@ public class SnapshotTableDesc implements java.io.Serializable{ public void setGlobal(boolean global) { this.global = global; } - - public boolean isExtSnapshotTable() { - return !SnapshotTable.STORAGE_TYPE_METASTORE.equals(storageType); - } - - public boolean isEnableLocalCache() { - return enableLocalCache; - } - - public void setEnableLocalCache(boolean enableLocalCache) { - this.enableLocalCache = enableLocalCache; - } } diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionary.java deleted file mode 100644 index 584d58e..0000000 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionary.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.dict; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.PrintStream; -import java.util.Locale; -import java.util.Map; - -import org.apache.kylin.common.util.Dictionary; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; - -public class ShrunkenDictionary<T> extends Dictionary<T> { - - private ImmutableMap<T, Integer> valueToIdMap; - private ImmutableMap<Integer, T> idToValueMap; - - private int minId; - private int maxId; - private int sizeOfId; - private int sizeOfValue; - - private ValueSerializer<T> valueSerializer; - - public ShrunkenDictionary(ValueSerializer<T> valueSerializer) { // default constructor for Writable interface - this.valueSerializer = valueSerializer; - } - - public ShrunkenDictionary(ValueSerializer<T> valueSerializer, int minId, int maxId, int sizeOfId, int sizeOfValue, - Map<T, Integer> valueToIdMap) { - this.valueSerializer = valueSerializer; - - this.minId = minId; - this.maxId = maxId; - this.sizeOfId = sizeOfId; - this.sizeOfValue = sizeOfValue; - - Preconditions.checkNotNull(valueToIdMap); - this.valueToIdMap = ImmutableMap.<T, Integer> builder().putAll(valueToIdMap).build(); - } - - @Override - public int getMinId() { - return minId; - } - - @Override - public int getMaxId() { - return maxId; - } - - @Override - public int getSizeOfId() { - return sizeOfId; - } - - @Override - public int getSizeOfValue() { - return sizeOfValue; - } - - @Override - public boolean contains(Dictionary<?> another) { - return false; - } - - protected int getIdFromValueImpl(T value, int roundingFlag) { - Integer id = valueToIdMap.get(value); - if (id == null) { - return -1; - } - return id; - } - - protected T getValueFromIdImpl(int id) { - if (idToValueMap == null) { - idToValueMap = buildIdToValueMap(); - } - return idToValueMap.get(id); - } - - private ImmutableMap<Integer, T> buildIdToValueMap() { - ImmutableMap.Builder<Integer, T> idToValueMapBuilder = ImmutableMap.builder(); - for (T value : valueToIdMap.keySet()) { - idToValueMapBuilder.put(valueToIdMap.get(value), value); - } - return idToValueMapBuilder.build(); - } - - public void dump(PrintStream out) { - out.println(String.format(Locale.ROOT, "Total %d values for ShrunkenDictionary", valueToIdMap.size())); - } - - public void write(DataOutput out) throws IOException { - out.writeInt(minId); - out.writeInt(maxId); - out.writeInt(sizeOfId); - out.writeInt(sizeOfValue); - - out.writeInt(valueToIdMap.size()); - for (T value : valueToIdMap.keySet()) { - valueSerializer.writeValue(out, value); - out.writeInt(valueToIdMap.get(value)); - } - } - - public void readFields(DataInput in) throws IOException { - this.minId = in.readInt(); - this.maxId = in.readInt(); - this.sizeOfId = in.readInt(); - this.sizeOfValue = in.readInt(); - - int sizeValueMap = in.readInt(); - ImmutableMap.Builder<T, Integer> valueToIdMapBuilder = ImmutableMap.builder(); - for (int i = 0; i < sizeValueMap; i++) { - T value = valueSerializer.readValue(in); - int id = in.readInt(); - valueToIdMapBuilder.put(value, id); - } - this.valueToIdMap = valueToIdMapBuilder.build(); - } - - public interface ValueSerializer<T> { - void writeValue(DataOutput out, T value) throws IOException; - - T readValue(DataInput in) throws IOException; - } - - public static class StringValueSerializer implements ValueSerializer<String> { - @Override - public void writeValue(DataOutput out, String value) throws IOException { - out.writeUTF(value); - } - - @Override - public String readValue(DataInput in) throws IOException { - return in.readUTF(); - } - } -} diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionaryBuilder.java deleted file mode 100644 index ab3df5e..0000000 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionaryBuilder.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.dict; - -import java.util.Map; - -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.dict.ShrunkenDictionary.ValueSerializer; - -import com.google.common.collect.Maps; - -public class ShrunkenDictionaryBuilder<T> { - - private Map<T, Integer> valueToIdMap; - - private Dictionary<T> fullDict; - - public ShrunkenDictionaryBuilder(Dictionary<T> fullDict) { - this.fullDict = fullDict; - - this.valueToIdMap = Maps.newHashMap(); - } - - public void addValue(T value) { - int id = fullDict.getIdFromValue(value); - valueToIdMap.put(value, id); - } - - public ShrunkenDictionary<T> build(ValueSerializer<T> valueSerializer) { - return new ShrunkenDictionary<>(valueSerializer, fullDict.getMinId(), fullDict.getMaxId(), - fullDict.getSizeOfId(), fullDict.getSizeOfValue(), valueToIdMap); - } -} diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/AbstractLookupRowEncoder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/AbstractLookupRowEncoder.java deleted file mode 100644 index 5efe129..0000000 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/AbstractLookupRowEncoder.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict.lookup; - -import java.nio.ByteBuffer; -import java.util.Arrays; - -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.dimension.DimensionEncoding; -import org.apache.kylin.metadata.model.TableDesc; - -/** - * Abstract encoder/decoder - * - */ -abstract public class AbstractLookupRowEncoder<R> { - protected ByteBuffer keyByteBuffer = ByteBuffer.allocate(1024 * 1024); - - protected int columnsNum; - protected int[] keyIndexes; - protected int[] valueIndexes; - - public AbstractLookupRowEncoder(TableDesc tableDesc, String[] keyColumns) { - this.columnsNum = tableDesc.getColumns().length; - this.keyIndexes = new int[keyColumns.length]; - this.valueIndexes = new int[columnsNum - keyColumns.length]; - int keyIdx = 0; - int valIdx = 0; - for (int i = 0; i < columnsNum; i++) { - boolean isKeyColumn = false; - for (String keyColumn : keyColumns) { - if (keyColumn.equals(tableDesc.getColumns()[i].getName())) { - isKeyColumn = true; - break; - } - } - if (isKeyColumn) { - keyIndexes[keyIdx] = i; - keyIdx++; - } else { - valueIndexes[valIdx] = i; - valIdx++; - } - } - } - - abstract public R encode(String[] row); - - abstract public String[] decode(R result); - - public String[] getKeyData(String[] row) { - return extractColValues(row, keyIndexes); - } - - public String[] getValueData(String[] row) { - return extractColValues(row, valueIndexes); - } - - public byte[] encodeStringsWithLenPfx(String[] keys, boolean allowNull) { - keyByteBuffer.clear(); - for (String key : keys) { - if (key == null && !allowNull) { - throw new IllegalArgumentException("key cannot be null:" + Arrays.toString(keys)); - } - byte[] byteKey = toBytes(key); - keyByteBuffer.putShort((short) byteKey.length); - keyByteBuffer.put(byteKey); - } - byte[] result = new byte[keyByteBuffer.position()]; - System.arraycopy(keyByteBuffer.array(), 0, result, 0, keyByteBuffer.position()); - return result; - } - - protected void decodeFromLenPfxBytes(byte[] rowKey, int[] valueIdx, String[] result) { - ByteBuffer byteBuffer = ByteBuffer.wrap(rowKey); - for (int i = 0; i < valueIdx.length; i++) { - short keyLen = byteBuffer.getShort(); - byte[] keyBytes = new byte[keyLen]; - byteBuffer.get(keyBytes); - result[valueIdx[i]] = fromBytes(keyBytes); - } - } - - protected String[] extractColValues(String[] row, int[] indexes) { - String[] result = new String[indexes.length]; - int i = 0; - for (int idx : indexes) { - result[i++] = row[idx]; - } - return result; - } - - protected byte[] toBytes(String str) { - if (str == null) { - return new byte[] { DimensionEncoding.NULL }; - } - return Bytes.toBytes(str); - } - - protected String fromBytes(byte[] bytes) { - if (DimensionEncoding.isNull(bytes, 0, bytes.length)) { - return null; - } - return Bytes.toString(bytes); - } -} diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ExtTableSnapshotInfo.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ExtTableSnapshotInfo.java deleted file mode 100644 index 80a2b77..0000000 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ExtTableSnapshotInfo.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict.lookup; - -import java.io.IOException; - -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.RootPersistentEntity; -import org.apache.kylin.source.IReadableTable.TableSignature; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; -import com.fasterxml.jackson.annotation.JsonProperty; - -@SuppressWarnings("serial") -@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class ExtTableSnapshotInfo extends RootPersistentEntity { - public static final String STORAGE_TYPE_HBASE = "hbase"; - - @JsonProperty("tableName") - private String tableName; - - @JsonProperty("signature") - private TableSignature signature; - - @JsonProperty("key_columns") - private String[] keyColumns; - - @JsonProperty("storage_type") - private String storageType; - - @JsonProperty("storage_location_identifier") - private String storageLocationIdentifier; - - @JsonProperty("shard_num") - private int shardNum; - - @JsonProperty("row_cnt") - private long rowCnt; - - @JsonProperty("last_build_time") - private long lastBuildTime; - - // default constructor for JSON serialization - public ExtTableSnapshotInfo() { - } - - public ExtTableSnapshotInfo(TableSignature signature, String tableName) throws IOException { - this.signature = signature; - this.tableName = tableName; - } - - public void setTableName(String tableName) { - this.tableName = tableName; - } - - public String getResourcePath() { - return getResourcePath(tableName, uuid); - } - - public String getResourceDir() { - return getResourceDir(tableName); - } - - public static String getResourcePath(String tableName, String uuid) { - return getResourceDir(tableName) + "/" + uuid + ".snapshot"; - } - - public static String getResourceDir(String tableName) { - return ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT + "/" + tableName; - } - - public TableSignature getSignature() { - return signature; - } - - public String getStorageType() { - return storageType; - } - - public void setStorageType(String storageType) { - this.storageType = storageType; - } - - public String getStorageLocationIdentifier() { - return storageLocationIdentifier; - } - - public void setStorageLocationIdentifier(String storageLocationIdentifier) { - this.storageLocationIdentifier = storageLocationIdentifier; - } - - public String[] getKeyColumns() { - return keyColumns; - } - - public void setKeyColumns(String[] keyColumns) { - this.keyColumns = keyColumns; - } - - public int getShardNum() { - return shardNum; - } - - public void setShardNum(int shardNum) { - this.shardNum = shardNum; - } - - public String getTableName() { - return tableName; - } - - public void setSignature(TableSignature signature) { - this.signature = signature; - } - - public long getRowCnt() { - return rowCnt; - } - - public void setRowCnt(long rowCnt) { - this.rowCnt = rowCnt; - } - - public long getLastBuildTime() { - return lastBuildTime; - } - - public void setLastBuildTime(long lastBuildTime) { - this.lastBuildTime = lastBuildTime; - } - -} diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ExtTableSnapshotInfoManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ExtTableSnapshotInfoManager.java deleted file mode 100644 index 4a84e33..0000000 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ExtTableSnapshotInfoManager.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict.lookup; - -import java.io.IOException; -import java.util.List; -import java.util.NavigableSet; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -import com.google.common.collect.Sets; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.JsonSerializer; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.Serializer; -import org.apache.kylin.metadata.TableMetadataManager; -import org.apache.kylin.source.IReadableTable.TableSignature; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; - -public class ExtTableSnapshotInfoManager { - - private static final Logger logger = LoggerFactory.getLogger(ExtTableSnapshotInfoManager.class); - public static Serializer<ExtTableSnapshotInfo> SNAPSHOT_SERIALIZER = new JsonSerializer<>(ExtTableSnapshotInfo.class); - - // static cached instances - private static final ConcurrentMap<KylinConfig, ExtTableSnapshotInfoManager> SERVICE_CACHE = new ConcurrentHashMap<KylinConfig, ExtTableSnapshotInfoManager>(); - - public static ExtTableSnapshotInfoManager getInstance(KylinConfig config) { - ExtTableSnapshotInfoManager r = SERVICE_CACHE.get(config); - if (r == null) { - synchronized (ExtTableSnapshotInfoManager.class) { - r = SERVICE_CACHE.get(config); - if (r == null) { - r = new ExtTableSnapshotInfoManager(config); - SERVICE_CACHE.put(config, r); - if (SERVICE_CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - } - } - } - } - return r; - } - - public static void clearCache() { - synchronized (SERVICE_CACHE) { - SERVICE_CACHE.clear(); - } - } - - // ============================================================================ - - private KylinConfig config; - private LoadingCache<String, ExtTableSnapshotInfo> snapshotCache; // resource - - private ExtTableSnapshotInfoManager(KylinConfig config) { - this.config = config; - this.snapshotCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, ExtTableSnapshotInfo>() { - @Override - public void onRemoval(RemovalNotification<String, ExtTableSnapshotInfo> notification) { - ExtTableSnapshotInfoManager.logger.info("Snapshot with resource path " + notification.getKey() - + " is removed due to " + notification.getCause()); - } - }).maximumSize(1000)// - .expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, ExtTableSnapshotInfo>() { - @Override - public ExtTableSnapshotInfo load(String key) throws Exception { - ExtTableSnapshotInfo snapshot = ExtTableSnapshotInfoManager.this.load(key); - return snapshot; - } - }); - } - - /** - * - * @param signature source table signature - * @param tableName - * @return latest snapshot info - * @throws IOException - */ - public ExtTableSnapshotInfo getLatestSnapshot(TableSignature signature, String tableName) throws IOException { - ExtTableSnapshotInfo snapshot = new ExtTableSnapshotInfo(signature, tableName); - snapshot.updateRandomUuid(); - ExtTableSnapshotInfo dupSnapshot = checkDupByInfo(snapshot); - return dupSnapshot; - } - - public ExtTableSnapshotInfo getSnapshot(String snapshotResPath) { - try { - return snapshotCache.get(snapshotResPath); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - - public ExtTableSnapshotInfo getSnapshot(String tableName, String snapshotID) { - return getSnapshot(ExtTableSnapshotInfo.getResourcePath(tableName, snapshotID)); - } - - public List<ExtTableSnapshotInfo> getSnapshots(String tableName) throws IOException { - String tableSnapshotsPath = ExtTableSnapshotInfo.getResourceDir(tableName); - ResourceStore store = TableMetadataManager.getInstance(this.config).getStore(); - return store.getAllResources(tableSnapshotsPath, SNAPSHOT_SERIALIZER); - } - - public Set<String> getAllExtSnapshotResPaths() throws IOException { - Set<String> result = Sets.newHashSet(); - ResourceStore store = TableMetadataManager.getInstance(this.config).getStore(); - Set<String> snapshotTablePaths = store.listResources(ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT); - if (snapshotTablePaths == null) { - return result; - } - for (String snapshotTablePath : snapshotTablePaths) { - Set<String> snapshotPaths = store.listResources(snapshotTablePath); - if (snapshotPaths != null) { - result.addAll(snapshotPaths); - } - } - return result; - } - - public void removeSnapshot(String tableName, String snapshotID) throws IOException { - String snapshotResPath = ExtTableSnapshotInfo.getResourcePath(tableName, snapshotID); - snapshotCache.invalidate(snapshotResPath); - ResourceStore store = TableMetadataManager.getInstance(this.config).getStore(); - store.deleteResource(snapshotResPath); - } - - /** - * create ext table snapshot - * @param signature - * @param tableName - * @param keyColumns - *@param storageType - * @param storageLocation @return created snapshot - * @throws IOException - */ - public ExtTableSnapshotInfo createSnapshot(TableSignature signature, String tableName, String snapshotID, String[] keyColumns, - int shardNum, String storageType, String storageLocation) throws IOException { - ExtTableSnapshotInfo snapshot = new ExtTableSnapshotInfo(); - snapshot.setUuid(snapshotID); - snapshot.setSignature(signature); - snapshot.setTableName(tableName); - snapshot.setKeyColumns(keyColumns); - snapshot.setStorageType(storageType); - snapshot.setStorageLocationIdentifier(storageLocation); - snapshot.setShardNum(shardNum); - save(snapshot); - return snapshot; - } - - public void updateSnapshot(ExtTableSnapshotInfo extTableSnapshot) throws IOException { - save(extTableSnapshot); - snapshotCache.invalidate(extTableSnapshot.getResourcePath()); - } - - private ExtTableSnapshotInfo checkDupByInfo(ExtTableSnapshotInfo snapshot) throws IOException { - ResourceStore store = TableMetadataManager.getInstance(this.config).getStore(); - String resourceDir = snapshot.getResourceDir(); - NavigableSet<String> existings = store.listResources(resourceDir); - if (existings == null) - return null; - - TableSignature sig = snapshot.getSignature(); - for (String existing : existings) { - ExtTableSnapshotInfo existingSnapshot = load(existing); - // direct load from store - if (existingSnapshot != null && sig.equals(existingSnapshot.getSignature())) - return existingSnapshot; - } - return null; - } - - private ExtTableSnapshotInfo load(String resourcePath) throws IOException { - ResourceStore store = TableMetadataManager.getInstance(this.config).getStore(); - ExtTableSnapshotInfo snapshot = store.getResource(resourcePath, SNAPSHOT_SERIALIZER); - - return snapshot; - } - - public void save(ExtTableSnapshotInfo snapshot) throws IOException { - ResourceStore store = TableMetadataManager.getInstance(this.config).getStore(); - String path = snapshot.getResourcePath(); - store.checkAndPutResource(path, snapshot, SNAPSHOT_SERIALIZER); - } - -} diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupProviderFactory.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupProviderFactory.java index 64ccef5..7ca83d0 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupProviderFactory.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupProviderFactory.java @@ -19,94 +19,12 @@ package org.apache.kylin.dict.lookup; import java.io.IOException; -import java.lang.reflect.Constructor; -import java.util.Map; - -import org.apache.kylin.dict.lookup.IExtLookupTableCache.CacheState; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.source.IReadableTable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Maps; public class LookupProviderFactory { - private static final Logger logger = LoggerFactory.getLogger(LookupProviderFactory.class); - private static Map<String, String> lookupProviderImplClassMap = Maps.newConcurrentMap(); - - static { - registerLookupProvider(ExtTableSnapshotInfo.STORAGE_TYPE_HBASE, - "org.apache.kylin.storage.hbase.lookup.HBaseLookupProvider"); - } - - public static void registerLookupProvider(String storageType, String implClassName) { - lookupProviderImplClassMap.put(storageType, implClassName); - } - - public static IExtLookupProvider getExtLookupProvider(String storageType) { - String className = lookupProviderImplClassMap.get(storageType); - if (className == null) { - throw new IllegalStateException("no implementation class found for storage type:" + storageType); - } - try { - Class clazz = Class.forName(className); - Constructor constructor = clazz.getConstructor(); - return (IExtLookupProvider) constructor.newInstance(); - } catch (ReflectiveOperationException e) { - throw new IllegalStateException("the lookup implementation class is invalid for storage type:" - + storageType, e); - } - } - public static ILookupTable getInMemLookupTable(TableDesc tableDesc, String[] pkCols, IReadableTable readableTable) throws IOException { return new LookupStringTable(tableDesc, pkCols, readableTable); } - - public static ILookupTable getExtLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) { - IExtLookupTableCache extLookupTableCache = getExtLookupProvider(extTableSnapshot.getStorageType()).getLocalCache(); - if (extLookupTableCache == null) { - return getExtLookupTableWithoutCache(tableDesc, extTableSnapshot); - } - ILookupTable cachedLookupTable = extLookupTableCache.getCachedLookupTable(tableDesc, extTableSnapshot, true); - if (cachedLookupTable != null) { - logger.info("try to use cached lookup table:{}", extTableSnapshot.getResourcePath()); - return cachedLookupTable; - } - logger.info("use ext lookup table:{}", extTableSnapshot.getResourcePath()); - return getExtLookupTableWithoutCache(tableDesc, extTableSnapshot); - } - - public static ILookupTable getExtLookupTableWithoutCache(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) { - IExtLookupProvider provider = getExtLookupProvider(extTableSnapshot.getStorageType()); - return provider.getLookupTable(tableDesc, extTableSnapshot); - } - - public static <T> T createEngineAdapter(String lookupStorageType, Class<T> engineInterface) { - IExtLookupProvider provider = getExtLookupProvider(lookupStorageType); - return provider.adaptToBuildEngine(engineInterface); - } - - public static void rebuildLocalCache(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo) { - IExtLookupTableCache tablesCache = getExtLookupProvider(extTableSnapshotInfo.getStorageType()).getLocalCache(); - if (tablesCache != null) { - tablesCache.buildSnapshotCache(tableDesc, extTableSnapshotInfo, getExtLookupTableWithoutCache(tableDesc, extTableSnapshotInfo)); - } - } - - public static void removeLocalCache(ExtTableSnapshotInfo extTableSnapshotInfo) { - IExtLookupTableCache tablesCache = getExtLookupProvider(extTableSnapshotInfo.getStorageType()).getLocalCache(); - if (tablesCache != null) { - tablesCache.removeSnapshotCache(extTableSnapshotInfo); - } - } - - public static CacheState getCacheState(ExtTableSnapshotInfo extTableSnapshotInfo) { - IExtLookupTableCache tablesCache = getExtLookupProvider(extTableSnapshotInfo.getStorageType()).getLocalCache(); - if (tablesCache != null) { - return tablesCache.getCacheState(extTableSnapshotInfo); - } - return CacheState.NONE; - } - } diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupBuilder.java deleted file mode 100644 index 7408e21..0000000 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupBuilder.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict.lookup.cache; - -import java.io.File; - -import org.apache.commons.io.FileUtils; -import org.apache.kylin.dict.lookup.ILookupTable; -import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV; -import org.apache.kylin.metadata.model.TableDesc; -import org.rocksdb.CompactionStyle; -import org.rocksdb.CompressionType; -import org.rocksdb.Options; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; -import org.rocksdb.util.SizeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RocksDBLookupBuilder { - private static final Logger logger = LoggerFactory.getLogger(RocksDBLookupBuilder.class); - - static { - RocksDB.loadLibrary(); - } - private Options options; - private String dbPath; - private TableDesc tableDesc; - private RocksDBLookupRowEncoder encoder; - private int writeBatchSize; - - public RocksDBLookupBuilder(TableDesc tableDesc, String[] keyColumns, String dbPath) { - this.tableDesc = tableDesc; - this.encoder = new RocksDBLookupRowEncoder(tableDesc, keyColumns); - this.dbPath = dbPath; - this.writeBatchSize = 500; - this.options = new Options(); - options.setCreateIfMissing(true).setWriteBufferSize(8 * SizeUnit.KB).setMaxWriteBufferNumber(3) - .setMaxBackgroundCompactions(5).setCompressionType(CompressionType.SNAPPY_COMPRESSION) - .setCompactionStyle(CompactionStyle.UNIVERSAL); - - } - - public void build(ILookupTable srcLookupTable) { - File dbFolder = new File(dbPath); - if (dbFolder.exists()) { - logger.info("remove rocksdb folder:{} to rebuild table cache:{}", dbPath, tableDesc.getIdentity()); - FileUtils.deleteQuietly(dbFolder); - } else { - logger.info("create new rocksdb folder:{} for table cache:{}", dbPath, tableDesc.getIdentity()); - dbFolder.mkdirs(); - } - logger.info("start to build lookup table:{} to rocks db:{}", tableDesc.getIdentity(), dbPath); - try (RocksDB rocksDB = RocksDB.open(options, dbPath)) { - // todo use batch may improve write performance - for (String[] row : srcLookupTable) { - KV kv = encoder.encode(row); - rocksDB.put(kv.getKey(), kv.getValue()); - } - } catch (RocksDBException e) { - logger.error("error when put data to rocksDB", e); - throw new RuntimeException("error when write data to rocks db", e); - } - - logger.info("source table:{} has been written to rocks db:{}", tableDesc.getIdentity(), dbPath); - } -} diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoder.java deleted file mode 100644 index 7455a7b..0000000 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoder.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict.lookup.cache; - -import org.apache.kylin.dict.lookup.AbstractLookupRowEncoder; -import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV; -import org.apache.kylin.metadata.model.TableDesc; - -/** - * encode/decode original table row to rocksDB KV - * - */ -public class RocksDBLookupRowEncoder extends AbstractLookupRowEncoder<KV>{ - - public RocksDBLookupRowEncoder(TableDesc tableDesc, String[] keyColumns) { - super(tableDesc, keyColumns); - } - - public KV encode(String[] row) { - String[] keys = getKeyData(row); - String[] values = getValueData(row); - byte[] encodeKey = encodeStringsWithLenPfx(keys, false); - byte[] encodeValue = encodeStringsWithLenPfx(values, true); - - return new KV(encodeKey, encodeValue); - } - - public String[] decode(KV kv) { - String[] result = new String[columnsNum]; - - decodeFromLenPfxBytes(kv.key, keyIndexes, result); - decodeFromLenPfxBytes(kv.value, valueIndexes, result); - - return result; - } - - public static class KV { - private byte[] key; - private byte[] value; - - public KV(byte[] key, byte[] value) { - this.key = key; - this.value = value; - } - - public byte[] getKey() { - return key; - } - - public byte[] getValue() { - return value; - } - } -} diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTable.java deleted file mode 100644 index 80e327d..0000000 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTable.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict.lookup.cache; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.kylin.common.util.Array; -import org.apache.kylin.dict.lookup.ILookupTable; -import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV; -import org.apache.kylin.metadata.model.TableDesc; -import org.rocksdb.Options; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RocksDBLookupTable implements ILookupTable { - private static final Logger logger = LoggerFactory.getLogger(RocksDBLookupTable.class); - static { - RocksDB.loadLibrary(); - } - - private RocksDB rocksDB; - private Options options; - - private RocksDBLookupRowEncoder rowEncoder; - - public RocksDBLookupTable(TableDesc tableDesc, String[] keyColumns, String dbPath) { - this.options = new Options(); - this.rowEncoder = new RocksDBLookupRowEncoder(tableDesc, keyColumns); - try { - this.rocksDB = RocksDB.openReadOnly(options, dbPath); - } catch (RocksDBException e) { - throw new IllegalStateException("cannot open rocks db in path:" + dbPath, e); - } - } - - @Override - public String[] getRow(Array<String> key) { - byte[] encodeKey = rowEncoder.encodeStringsWithLenPfx(key.data, false); - try { - byte[] value = rocksDB.get(encodeKey); - if (value == null) { - return null; - } - return rowEncoder.decode(new KV(encodeKey, value)); - } catch (RocksDBException e) { - throw new IllegalStateException("error when get key from rocksdb", e); - } - } - - @Override - public Iterator<String[]> iterator() { - final RocksIterator rocksIterator = getRocksIterator(); - rocksIterator.seekToFirst(); - - return new Iterator<String[]>() { - int counter; - - @Override - public boolean hasNext() { - boolean valid = rocksIterator.isValid(); - if (!valid) { - rocksIterator.close(); - } - return valid; - } - - @Override - public String[] next() { - counter++; - if (counter % 100000 == 0) { - logger.info("scanned {} rows from rocksDB", counter); - } - String[] result = rowEncoder.decode(new KV(rocksIterator.key(), rocksIterator.value())); - rocksIterator.next(); - return result; - } - - @Override - public void remove() { - throw new UnsupportedOperationException("not support operation"); - } - }; - } - - private RocksIterator getRocksIterator() { - return rocksDB.newIterator(); - } - - @Override - public void close() throws IOException { - options.close(); - if (rocksDB != null) { - rocksDB.close(); - } - } -} diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java deleted file mode 100644 index bbcaaf3..0000000 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java +++ /dev/null @@ -1,430 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict.lookup.cache; - -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.annotation.Nullable; - -import org.apache.commons.io.FileUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo; -import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager; -import org.apache.kylin.dict.lookup.IExtLookupTableCache; -import org.apache.kylin.dict.lookup.ILookupTable; -import org.apache.kylin.dict.lookup.LookupProviderFactory; -import org.apache.kylin.metadata.model.TableDesc; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Charsets; -import com.google.common.base.Predicate; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import com.google.common.cache.Weigher; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.io.Files; - -public class RocksDBLookupTableCache implements IExtLookupTableCache { - private static final Logger logger = LoggerFactory.getLogger(RocksDBLookupTableCache.class); - - private static final String CACHE_TYPE_ROCKSDB = "rocksdb"; - private static final String STATE_FILE = "STATE"; - private static final String DB_FILE = "db"; - - private String basePath; - private long maxCacheSizeInKB; - private Cache<String, CachedTableInfo> tablesCache; - - private ConcurrentMap<String, Boolean> inBuildingTables = Maps.newConcurrentMap(); - - private ExecutorService cacheBuildExecutor; - private ScheduledExecutorService cacheStateCheckExecutor; - private CacheStateChecker cacheStateChecker; - - // static cached instances - private static final ConcurrentMap<KylinConfig, RocksDBLookupTableCache> SERVICE_CACHE = new ConcurrentHashMap<>(); - - public static RocksDBLookupTableCache getInstance(KylinConfig config) { - RocksDBLookupTableCache r = SERVICE_CACHE.get(config); - if (r == null) { - synchronized (RocksDBLookupTableCache.class) { - r = SERVICE_CACHE.get(config); - if (r == null) { - r = new RocksDBLookupTableCache(config); - SERVICE_CACHE.put(config, r); - if (SERVICE_CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - } - } - } - } - return r; - } - - public static void clearCache() { - synchronized (SERVICE_CACHE) { - SERVICE_CACHE.clear(); - } - } - - // ============================================================================ - - private KylinConfig config; - - private RocksDBLookupTableCache(KylinConfig config) { - this.config = config; - init(); - } - - private void init() { - this.basePath = getCacheBasePath(config); - - this.maxCacheSizeInKB = (long) (config.getExtTableSnapshotLocalCacheMaxSizeGB() * 1024 * 1024); - this.tablesCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, CachedTableInfo>() { - @Override - public void onRemoval(RemovalNotification<String, CachedTableInfo> notification) { - logger.warn(notification.getValue() + " is removed " + "because of " + notification.getCause()); - notification.getValue().cleanStorage(); - } - }).maximumWeight(maxCacheSizeInKB).weigher(new Weigher<String, CachedTableInfo>() { - @Override - public int weigh(String key, CachedTableInfo value) { - return value.getSizeInKB(); - } - }).build(); - restoreCacheState(); - cacheStateChecker = new CacheStateChecker(); - initExecutors(); - } - - protected static String getCacheBasePath(KylinConfig config) { - String basePath = config.getExtTableSnapshotLocalCachePath(); - if ((!basePath.startsWith("/")) && (KylinConfig.getKylinHome() != null)) { - basePath = KylinConfig.getKylinHome() + File.separator + basePath; - } - return basePath + File.separator + CACHE_TYPE_ROCKSDB; - } - - private void restoreCacheState() { - File dbBaseFolder = new File(basePath); - if (!dbBaseFolder.exists()) { - dbBaseFolder.mkdirs(); - } - Map<String, File[]> tableSnapshotsFileMap = getCachedTableSnapshotsFolders(dbBaseFolder); - for (Entry<String, File[]> tableSnapshotsEntry : tableSnapshotsFileMap.entrySet()) { - for (File snapshotFolder : tableSnapshotsEntry.getValue()) { - initSnapshotState(tableSnapshotsEntry.getKey(), snapshotFolder); - } - } - } - - private Map<String, File[]> getCachedTableSnapshotsFolders(File dbBaseFolder) { - Map<String, File[]> result = Maps.newHashMap(); - File[] tableFolders = dbBaseFolder.listFiles(new FileFilter() { - @Override - public boolean accept(File file) { - return file.isDirectory(); - } - }); - if (tableFolders == null) { - return result; - } - - for (File tableFolder : tableFolders) { - String tableName = tableFolder.getName(); - File[] snapshotFolders = tableFolder.listFiles(new FileFilter() { - @Override - public boolean accept(File snapshotFile) { - return snapshotFile.isDirectory(); - } - }); - result.put(tableName, snapshotFolders); - } - return result; - } - - private void initSnapshotState(String tableName, File snapshotCacheFolder) { - String snapshotID = snapshotCacheFolder.getName(); - File stateFile = getCacheStateFile(snapshotCacheFolder.getAbsolutePath()); - if (stateFile.exists()) { - try { - String stateStr = Files.toString(stateFile, Charsets.UTF_8); - String resourcePath = ExtTableSnapshotInfo.getResourcePath(tableName, snapshotID); - if (CacheState.AVAILABLE.name().equals(stateStr)) { - tablesCache.put(resourcePath, new CachedTableInfo(snapshotCacheFolder.getAbsolutePath())); - } - } catch (IOException e) { - logger.error("error to read state file:" + stateFile.getAbsolutePath()); - } - } - } - - private void initExecutors() { - this.cacheBuildExecutor = new ThreadPoolExecutor(0, 50, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("lookup-cache-build-thread")); - this.cacheStateCheckExecutor = Executors - .newSingleThreadScheduledExecutor(new NamedThreadFactory("lookup-cache-state-checker")); - cacheStateCheckExecutor.scheduleAtFixedRate(cacheStateChecker, 10, 10 * 60L, TimeUnit.SECONDS); // check every 10 minutes - } - - @Override - public ILookupTable getCachedLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo, - boolean buildIfNotExist) { - String resourcePath = extTableSnapshotInfo.getResourcePath(); - if (inBuildingTables.containsKey(resourcePath)) { - logger.info("cache is in building for snapshot:" + resourcePath); - return null; - } - CachedTableInfo cachedTableInfo = tablesCache.getIfPresent(resourcePath); - if (cachedTableInfo == null) { - if (buildIfNotExist) { - buildSnapshotCache(tableDesc, extTableSnapshotInfo, - getSourceLookupTable(tableDesc, extTableSnapshotInfo)); - } - logger.info("no available cache ready for the table snapshot:" + extTableSnapshotInfo.getResourcePath()); - return null; - } - - String[] keyColumns = extTableSnapshotInfo.getKeyColumns(); - String dbPath = getSnapshotStorePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId()); - return new RocksDBLookupTable(tableDesc, keyColumns, dbPath); - } - - private ILookupTable getSourceLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo) { - return LookupProviderFactory.getExtLookupTableWithoutCache(tableDesc, extTableSnapshotInfo); - } - - @Override - public void buildSnapshotCache(final TableDesc tableDesc, final ExtTableSnapshotInfo extTableSnapshotInfo, - final ILookupTable sourceTable) { - if (extTableSnapshotInfo.getSignature().getSize() / 1024 > maxCacheSizeInKB * 2 / 3) { - logger.warn("the size is to large to build to cache for snapshot:{}, size:{}, skip cache building", - extTableSnapshotInfo.getResourcePath(), extTableSnapshotInfo.getSignature().getSize()); - return; - } - final String[] keyColumns = extTableSnapshotInfo.getKeyColumns(); - final String cachePath = getSnapshotCachePath(extTableSnapshotInfo.getTableName(), - extTableSnapshotInfo.getId()); - final String dbPath = getSnapshotStorePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId()); - final String snapshotResPath = extTableSnapshotInfo.getResourcePath(); - - if (inBuildingTables.containsKey(snapshotResPath)) { - logger.info("there is already snapshot cache in building for snapshot:{}, skip it", snapshotResPath); - return; - } - if (inBuildingTables.putIfAbsent(snapshotResPath, true) == null) { - cacheBuildExecutor.submit(new Runnable() { - @Override - public void run() { - try { - RocksDBLookupBuilder builder = new RocksDBLookupBuilder(tableDesc, keyColumns, dbPath); - builder.build(sourceTable); - saveSnapshotCacheState(extTableSnapshotInfo, cachePath); - } catch (Exception e) { - logger.error("error when build snapshot cache", e); - } finally { - inBuildingTables.remove(snapshotResPath); - } - } - }); - - } else { - logger.info("there is already snapshot cache in building for snapshot:{}, skip it", snapshotResPath); - } - } - - @Override - public void removeSnapshotCache(ExtTableSnapshotInfo extTableSnapshotInfo) { - tablesCache.invalidate(extTableSnapshotInfo.getResourcePath()); - } - - @Override - public CacheState getCacheState(ExtTableSnapshotInfo extTableSnapshotInfo) { - String resourcePath = extTableSnapshotInfo.getResourcePath(); - if (inBuildingTables.containsKey(resourcePath)) { - return CacheState.IN_BUILDING; - } - File stateFile = getCacheStateFile( - getSnapshotCachePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId())); - if (!stateFile.exists()) { - return CacheState.NONE; - } - try { - String stateString = Files.toString(stateFile, Charsets.UTF_8); - return CacheState.valueOf(stateString); - } catch (IOException e) { - logger.error("error when read state file", e); - } - return CacheState.NONE; - } - - public long getTotalCacheSize() { - return FileUtils.sizeOfDirectory(new File(getCacheBasePath(config))); - } - - public void checkCacheState() { - cacheStateChecker.run(); - } - - private void saveSnapshotCacheState(ExtTableSnapshotInfo extTableSnapshotInfo, String cachePath) { - File stateFile = getCacheStateFile( - getSnapshotCachePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId())); - try { - Files.write(CacheState.AVAILABLE.name(), stateFile, Charsets.UTF_8); - tablesCache.put(extTableSnapshotInfo.getResourcePath(), new CachedTableInfo(cachePath)); - } catch (IOException e) { - throw new RuntimeException( - "error when write cache state for snapshot:" + extTableSnapshotInfo.getResourcePath()); - } - } - - private File getCacheStateFile(String snapshotCacheFolder) { - String stateFilePath = snapshotCacheFolder + File.separator + STATE_FILE; - return new File(stateFilePath); - } - - protected String getSnapshotStorePath(String tableName, String snapshotID) { - return getSnapshotCachePath(tableName, snapshotID) + File.separator + DB_FILE; - } - - protected String getSnapshotCachePath(String tableName, String snapshotID) { - return basePath + File.separator + tableName + File.separator + snapshotID; - } - - private class CacheStateChecker implements Runnable { - - @Override - public void run() { - try { - String cacheBasePath = getCacheBasePath(config); - logger.info("check snapshot local cache state, local path:{}", cacheBasePath); - File baseFolder = new File(cacheBasePath); - if (!baseFolder.exists()) { - return; - } - Map<String, File[]> tableSnapshotsFileMap = getCachedTableSnapshotsFolders(baseFolder); - List<Pair<String, File>> allCachedSnapshots = Lists.newArrayList(); - for (Entry<String, File[]> tableSnapshotsEntry : tableSnapshotsFileMap.entrySet()) { - String tableName = tableSnapshotsEntry.getKey(); - for (File file : tableSnapshotsEntry.getValue()) { - String snapshotID = file.getName(); - allCachedSnapshots - .add(new Pair<>(ExtTableSnapshotInfo.getResourcePath(tableName, snapshotID), file)); - } - } - - final Set<String> activeSnapshotSet = ExtTableSnapshotInfoManager.getInstance(config) - .getAllExtSnapshotResPaths(); - - List<Pair<String, File>> toRemovedCachedSnapshots = Lists.newArrayList( - FluentIterable.from(allCachedSnapshots).filter(new Predicate<Pair<String, File>>() { - @Override - public boolean apply(@Nullable Pair<String, File> input) { - long lastModified = input.getSecond().lastModified(); - return !activeSnapshotSet.contains(input.getFirst()) && lastModified > 0 - && lastModified < (System.currentTimeMillis() - - config.getExtTableSnapshotLocalCacheCheckVolatileRange()); - } - })); - for (Pair<String, File> toRemovedCachedSnapshot : toRemovedCachedSnapshots) { - File snapshotCacheFolder = toRemovedCachedSnapshot.getSecond(); - logger.info("removed cache file:{}, it is not referred by any cube", - snapshotCacheFolder.getAbsolutePath()); - try { - FileUtils.deleteDirectory(snapshotCacheFolder); - } catch (IOException e) { - logger.error("fail to remove folder:" + snapshotCacheFolder.getAbsolutePath(), e); - } - tablesCache.invalidate(toRemovedCachedSnapshot.getFirst()); - } - } catch (Exception e) { - logger.error("error happens when check cache state", e); - } - - } - } - - private static class CachedTableInfo { - private String cachePath; - - private long dbSize; - - public CachedTableInfo(String cachePath) { - this.cachePath = cachePath; - this.dbSize = FileUtils.sizeOfDirectory(new File(cachePath)); - } - - public int getSizeInKB() { - return (int) (dbSize / 1024); - } - - public void cleanStorage() { - logger.info("clean cache storage for path:" + cachePath); - try { - FileUtils.deleteDirectory(new File(cachePath)); - } catch (IOException e) { - logger.error("file delete fail:" + cachePath, e); - } - } - } - - /** - * A simple named thread factory. - */ - private static class NamedThreadFactory implements ThreadFactory { - private final ThreadGroup group; - private final AtomicInteger threadNumber = new AtomicInteger(1); - private final String namePrefix; - - public NamedThreadFactory(String threadPrefix) { - final SecurityManager s = System.getSecurityManager(); - this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); - this.namePrefix = threadPrefix + "-"; - } - - @Override - public Thread newThread(Runnable r) { - final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); - t.setDaemon(true); - return t; - } - } -} diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/ShrunkenDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/ShrunkenDictionaryTest.java deleted file mode 100644 index b2981d4..0000000 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/ShrunkenDictionaryTest.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.dict; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -public class ShrunkenDictionaryTest { - private static List<String> testData; - private static Dictionary originDict; - private static Dictionary shrunkenDict; - - @BeforeClass - public static void setUp() { - LocalFileMetadataTestCase.staticCreateTestMetadata(); - prepareTestData(); - } - - @AfterClass - public static void after() { - LocalFileMetadataTestCase.staticCleanupTestMetadata(); - } - - @Test - public void testStringDictionary() { - try { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - - shrunkenDict.write(dos); - - ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); - DataInputStream dis = new DataInputStream(bis); - - ShrunkenDictionary.StringValueSerializer valueSerializer = new ShrunkenDictionary.StringValueSerializer(); - Dictionary<String> dShrunkenDict = new ShrunkenDictionary<>(valueSerializer); - dShrunkenDict.readFields(dis); - - for (int i = 0; i < testData.size(); i += 2) { - String value = testData.get(i); - Assert.assertEquals(originDict.getIdFromValue(value), dShrunkenDict.getIdFromValue(value)); - } - } catch (IOException e) { - } - } - - @Test - public void testGetMinId() { - assertEquals(0, shrunkenDict.getMinId()); - } - - @Test - public void testGetMaxId() { - assertEquals(6, shrunkenDict.getMaxId()); - } - - @Test - public void testGetSizeOfId() { - assertEquals(1, shrunkenDict.getSizeOfId()); - } - - @Test - public void testGetSizeOfValue() { - assertEquals(9, shrunkenDict.getSizeOfValue()); - } - - @Test - public void testContains() { - assertFalse(shrunkenDict.contains(originDict)); - } - - @Test - public void testGetValueFromIdImpl() { - for (int i = 0; i < testData.size(); i += 2) { - assertEquals(testData.get(i), shrunkenDict.getValueFromId(originDict.getIdFromValue(testData.get(i)))); - } - } - - private static void prepareTestData() { - testData = new ArrayList<>(); - testData.add(""); - testData.add("part"); - testData.add("par"); - testData.add("partition"); - testData.add("party"); - testData.add("parties"); - testData.add("paint"); - - originDict = constructOriginDict(); - ShrunkenDictionary.StringValueSerializer valueSerializer = new ShrunkenDictionary.StringValueSerializer(); - shrunkenDict = constructShrunkenDict(originDict, valueSerializer); - } - - private static Dictionary constructOriginDict() { - TrieDictionaryBuilder<String> dictBuilder = new TrieDictionaryBuilder<>(new StringBytesConverter()); - for (String str : testData) { - dictBuilder.addValue(str); - } - Dictionary<String> dict = dictBuilder.build(0); - return dict; - } - - private static Dictionary constructShrunkenDict(Dictionary dictionary, - ShrunkenDictionary.ValueSerializer valueSerializer) { - ShrunkenDictionaryBuilder<String> shrunkenDictBuilder = new ShrunkenDictionaryBuilder<>(dictionary); - for (int i = 0; i < testData.size(); i += 2) { - System.out.println(testData.get(i)); - shrunkenDictBuilder.addValue(testData.get(i)); - } - Dictionary<String> shrunkenDict = shrunkenDictBuilder.build(valueSerializer); - return shrunkenDict; - } -} diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoderTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoderTest.java deleted file mode 100644 index ff5fdff..0000000 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoderTest.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict.lookup.cache; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertNull; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV; -import org.apache.kylin.metadata.TableMetadataManager; -import org.apache.kylin.metadata.model.TableDesc; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class RocksDBLookupRowEncoderTest extends LocalFileMetadataTestCase { - private TableDesc tableDesc; - - @Before - public void setup() throws Exception { - this.createTestMetadata(); - TableMetadataManager metadataManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); - tableDesc = metadataManager.getTableDesc("TEST_COUNTRY", "default"); - } - - @After - public void after() throws Exception { - this.cleanupTestMetadata(); - } - - @Test - public void testEnDeCode() { - RocksDBLookupRowEncoder lookupRowEncoder = new RocksDBLookupRowEncoder(tableDesc, new String[] { "COUNTRY" }); - String[] row = new String[] { "AD", "42.546245", "1.601554", "Andorra" }; - KV kv = lookupRowEncoder.encode(row); - - String[] decodeRow = lookupRowEncoder.decode(kv); - assertArrayEquals(row, decodeRow); - } - - @Test - public void testEnDeCodeWithNullValue() { - RocksDBLookupRowEncoder lookupRowEncoder = new RocksDBLookupRowEncoder(tableDesc, new String[] { "COUNTRY" }); - String[] row = new String[] { "AD", "42.546245", "1.601554", null }; - KV kv = lookupRowEncoder.encode(row); - - String[] decodeRow = lookupRowEncoder.decode(kv); - assertNull(decodeRow[3]); - assertArrayEquals(row, decodeRow); - } - - @Test - public void testEnDeCodeWithMultiKeys() { - RocksDBLookupRowEncoder lookupRowEncoder = new RocksDBLookupRowEncoder(tableDesc, new String[] { "COUNTRY", - "NAME" }); - String[] row = new String[] { "AD", "42.546245", "1.601554", "Andorra" }; - KV kv = lookupRowEncoder.encode(row); - - String[] decodeRow = lookupRowEncoder.decode(kv); - assertArrayEquals(row, decodeRow); - } - -} diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCacheTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCacheTest.java deleted file mode 100644 index fa430fa..0000000 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCacheTest.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict.lookup.cache; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.util.Iterator; -import java.util.Random; - -import org.apache.commons.io.FileUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Array; -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.common.util.RandomUtil; -import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo; -import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager; -import org.apache.kylin.dict.lookup.IExtLookupProvider; -import org.apache.kylin.dict.lookup.IExtLookupTableCache; -import org.apache.kylin.dict.lookup.IExtLookupTableCache.CacheState; -import org.apache.kylin.dict.lookup.ILookupTable; -import org.apache.kylin.dict.lookup.LookupProviderFactory; -import org.apache.kylin.metadata.TableMetadataManager; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.source.IReadableTable.TableSignature; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.google.common.base.Charsets; -import com.google.common.io.Files; - -/** - */ -public class RocksDBLookupTableCacheTest extends LocalFileMetadataTestCase { - private static final String TABLE_COUNTRY = "DEFAULT.TEST_COUNTRY"; - private static final String MOCK_EXT_LOOKUP = "mock"; - private TableDesc tableDesc; - private KylinConfig kylinConfig; - - @Before - public void setup() throws Exception { - createTestMetadata(); - TableMetadataManager metadataManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); - kylinConfig = getTestConfig(); - tableDesc = metadataManager.getTableDesc(TABLE_COUNTRY, "default"); - cleanCache(); - LookupProviderFactory.registerLookupProvider(MOCK_EXT_LOOKUP, MockedLookupProvider.class.getName()); - } - - private void cleanCache() { - FileUtils.deleteQuietly(new File(kylinConfig.getExtTableSnapshotLocalCachePath())); - } - - @After - public void tearDown() { - cleanupTestMetadata(); - cleanCache(); - } - - @Test - public void testBuildTableCache() throws Exception { - String snapshotID = RandomUtil.randomUUID().toString(); - ExtTableSnapshotInfo snapshotInfo = buildSnapshotCache(snapshotID, 10000); - assertEquals(CacheState.AVAILABLE, RocksDBLookupTableCache.getInstance(kylinConfig).getCacheState(snapshotInfo)); - } - - private ExtTableSnapshotInfo buildSnapshotCache(String snapshotID, int rowCnt) throws Exception { - ExtTableSnapshotInfo snapshotInfo = new ExtTableSnapshotInfo(); - snapshotInfo.setTableName(TABLE_COUNTRY); - snapshotInfo.setUuid(snapshotID); - snapshotInfo.setStorageType(MOCK_EXT_LOOKUP); - snapshotInfo.setKeyColumns(new String[] { "COUNTRY" }); - snapshotInfo.setRowCnt(rowCnt); - snapshotInfo.setSignature(new TableSignature("/test", rowCnt, System.currentTimeMillis())); - - ExtTableSnapshotInfoManager.getInstance(kylinConfig).save(snapshotInfo); - RocksDBLookupTableCache cache = RocksDBLookupTableCache.getInstance(kylinConfig); - cache.buildSnapshotCache(tableDesc, snapshotInfo, getLookupTableWithRandomData(rowCnt)); - - while (cache.getCacheState(snapshotInfo) == CacheState.IN_BUILDING) { - Thread.sleep(500); - } - return snapshotInfo; - } - - @Test - public void testRestoreCacheFromFiles() throws Exception { - String snapshotID = RandomUtil.randomUUID().toString(); - String snapshotCacheBasePath = RocksDBLookupTableCache.getCacheBasePath(kylinConfig) + File.separator - + TABLE_COUNTRY + File.separator + snapshotID; - String dbPath = snapshotCacheBasePath + File.separator + "db"; - RocksDBLookupBuilder builder = new RocksDBLookupBuilder(tableDesc, new String[] { "COUNTRY" }, dbPath); - builder.build(getLookupTableWithRandomData(10000)); - String stateFilePath = snapshotCacheBasePath + File.separator + "STATE"; - Files.write(CacheState.AVAILABLE.name(), new File(stateFilePath), Charsets.UTF_8); - - RocksDBLookupTableCache cache = RocksDBLookupTableCache.getInstance(kylinConfig); - ExtTableSnapshotInfo snapshotInfo = new ExtTableSnapshotInfo(); - snapshotInfo.setTableName(TABLE_COUNTRY); - snapshotInfo.setUuid(snapshotID); - snapshotInfo.setStorageType(MOCK_EXT_LOOKUP); - snapshotInfo.setKeyColumns(new String[] { "COUNTRY" }); - ILookupTable lookupTable = cache.getCachedLookupTable(tableDesc, snapshotInfo, false); - int rowCnt = 0; - for (String[] strings : lookupTable) { - rowCnt++; - } - lookupTable.close(); - assertEquals(10000, rowCnt); - } - - @Test - public void testEvict() throws Exception { - kylinConfig.setProperty("kylin.snapshot.ext.local.cache.max-size-gb", "0.005"); - int snapshotNum = 10; - int snapshotRowCnt = 100000; - for (int i = 0; i < snapshotNum; i++) { - buildSnapshotCache(RandomUtil.randomUUID().toString(), snapshotRowCnt); - } - assertTrue(RocksDBLookupTableCache.getInstance(kylinConfig).getTotalCacheSize() < 0.006 * 1024 * 1024 * 1024); - } - - @Test - public void testCheckCacheState() throws Exception { - ExtTableSnapshotInfo snapshotInfo = buildSnapshotCache(RandomUtil.randomUUID().toString(), 1000); - RocksDBLookupTableCache cache = RocksDBLookupTableCache.getInstance(kylinConfig); - ILookupTable cachedLookupTable = cache.getCachedLookupTable(tableDesc, snapshotInfo, false); - assertNotNull(cachedLookupTable); - cachedLookupTable.close(); - - ExtTableSnapshotInfoManager.getInstance(kylinConfig).removeSnapshot(snapshotInfo.getTableName(), snapshotInfo.getId()); - cache.checkCacheState(); - String cacheLocalPath = cache.getSnapshotCachePath(snapshotInfo.getTableName(), snapshotInfo.getId()); - // won't cleanup because it is newly created in last 1 hour - assertTrue(new File(cacheLocalPath).exists()); - - // change the volatile value - kylinConfig.setProperty("kylin.snapshot.ext.local.cache.check.volatile", "0"); - cache.checkCacheState(); - // this time it should be removed. - assertFalse(new File(cacheLocalPath).exists()); - - cachedLookupTable = cache.getCachedLookupTable(tableDesc, snapshotInfo, false); - assertNull(cachedLookupTable); - } - - public static class MockedLookupProvider implements IExtLookupProvider { - - @Override - public ILookupTable getLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) { - return getLookupTableWithRandomData(extTableSnapshot.getRowCnt()); - } - - @Override - public IExtLookupTableCache getLocalCache() { - return null; - } - - @Override - public <I> I adaptToBuildEngine(Class<I> engineInterface) { - return null; - } - } - - private static ILookupTable getLookupTableWithRandomData(final long rowNum) { - return new ILookupTable() { - Random random = new Random(); - - @Override - public String[] getRow(Array<String> key) { - return new String[0]; - } - - @Override - public void close() throws IOException { - - } - - @Override - public Iterator<String[]> iterator() { - return new Iterator<String[]>() { - private int iterCnt = 0; - - @Override - public boolean hasNext() { - return iterCnt < rowNum; - } - - @Override - public String[] next() { - iterCnt++; - return genRandomRow(iterCnt); - } - - @Override - public void remove() { - - } - }; - } - - private String[] genRandomRow(int id) { - return new String[] { "keyyyyy" + id, String.valueOf(random.nextDouble()), - String.valueOf(random.nextDouble()), "Andorra" + id }; - } - }; - } - -} diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableTest.java deleted file mode 100644 index d5bbd40..0000000 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableTest.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict.lookup.cache; - -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Random; - -import org.apache.commons.io.FileUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Array; -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.dict.lookup.ILookupTable; -import org.apache.kylin.metadata.TableMetadataManager; -import org.apache.kylin.metadata.model.TableDesc; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.google.common.collect.Lists; - -/** - */ -public class RocksDBLookupTableTest extends LocalFileMetadataTestCase { - - private TableDesc tableDesc; - private RocksDBLookupTable lookupTable; - private Random random; - private int sourceRowNum; - - @Before - public void setup() throws Exception { - createTestMetadata(); - TableMetadataManager metadataManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); - this.random = new Random(); - tableDesc = metadataManager.getTableDesc("TEST_COUNTRY", "default"); - sourceRowNum = 10000; - genTestData(); - lookupTable = new RocksDBLookupTable(tableDesc, new String[] { "COUNTRY" }, "lookup_cache/TEST_COUNTRY"); - } - - private void genTestData() { - removeTestDataIfExists(); - File file = new File("lookup_cache/TEST_COUNTRY"); - file.mkdirs(); - RocksDBLookupBuilder builder = new RocksDBLookupBuilder(tableDesc, new String[] { "COUNTRY" }, - "lookup_cache/TEST_COUNTRY"); - long start = System.currentTimeMillis(); - builder.build(getLookupTableWithRandomData(sourceRowNum)); - long take = System.currentTimeMillis() - start; - System.out.println("take:" + take + " ms to complete build"); - } - - private void removeTestDataIfExists() { - FileUtils.deleteQuietly(new File("lookup_cache")); - } - - @After - public void tearDown() throws IOException { - cleanupTestMetadata(); - removeTestDataIfExists(); - lookupTable.close(); - } - - @Test - public void testIterator() throws Exception { - System.out.println("start iterator table"); - long start = System.currentTimeMillis(); - Iterator<String[]> iter = lookupTable.iterator(); - int count = 0; - while (iter.hasNext()) { - iter.next(); - count++; - } - long take = System.currentTimeMillis() - start; - System.out.println("scan " + count + " rows, take " + take + " ms"); - - } - - @Test - public void testGet() throws Exception { - int getNum = 3000; - List<String[]> keys = Lists.newArrayList(); - for (int i = 0; i < getNum; i++) { - String[] keyi = new String[] { "keyyyyy" + random.nextInt(sourceRowNum) }; - keys.add(keyi); - } - - long start = System.currentTimeMillis(); - for (int i = 0; i < getNum; i++) { - String[] row = lookupTable.getRow(new Array<>(keys.get(i))); - if (row == null) { - System.out.println("null value for key:" + Arrays.toString(keys.get(i))); - } - } - long take = System.currentTimeMillis() - start; - System.out.println("muliti get " + getNum + " rows, take " + take + " ms"); - } - - private ILookupTable getLookupTableWithRandomData(final int rowNum) { - return new ILookupTable() { - @Override - public String[] getRow(Array<String> key) { - return new String[0]; - } - - @Override - public void close() throws IOException { - - } - - @Override - public Iterator<String[]> iterator() { - return new Iterator<String[]>() { - private int iterCnt = 0; - - @Override - public boolean hasNext() { - return iterCnt < rowNum; - } - - @Override - public String[] next() { - iterCnt++; - return genRandomRow(iterCnt); - } - - @Override - public void remove() { - - } - }; - } - }; - } - - private String[] genRandomRow(int id) { - return new String[] { "keyyyyy" + id, String.valueOf(random.nextDouble()), String.valueOf(random.nextDouble()), - "Andorra" + id }; - } - -} diff --git a/kylin-spark-project/kylin-spark-common/pom.xml b/kylin-spark-project/kylin-spark-common/pom.xml index b4317ce..58b48c0 100644 --- a/kylin-spark-project/kylin-spark-common/pom.xml +++ b/kylin-spark-project/kylin-spark-common/pom.xml @@ -22,7 +22,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <name>Apche Kylin on parquet - Common</name> + <name>Apache Kylin 4.X - Common</name> <packaging>jar</packaging> <artifactId>kylin-spark-common</artifactId> <version>4.0.0-SNAPSHOT</version>