This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 4f3d8e70124349b121e920b93a0683303014ef11 Author: XiaoxiangYu <hit_la...@126.com> AuthorDate: Fri Sep 20 23:23:42 2019 +0800 KYLIN-4141 Build Global Dictionary in no time Currently, realtime OLAP do not support COUNT_DISTINCT(bitmap) because of lack the ability of encoding string at once. Here I want to use RocksDB & HBase as implementation of streaming distributed dictionary to do that. --- .../org/apache/kylin/common/KylinConfigBase.java | 7 + stream-core/pom.xml | 5 + .../stream/core/dict/LocalDictionaryStore.java | 144 ++++++++++++++ .../stream/core/dict/RemoteDictionaryStore.java | 215 ++++++++++++++++++++ .../core/dict/StreamingDictionaryClient.java | 216 +++++++++++++++++++++ .../core/dict/StreamingDistributedDictionary.java | 91 +++++++++ .../core/storage/IStreamingSegmentStore.java | 5 + .../core/storage/StreamingSegmentManager.java | 45 ++++- .../storage/columnar/ColumnarSegmentStore.java | 12 ++ .../core/storage/columnar/SegmentMemoryStore.java | 12 +- .../kylin/stream/server/StreamingServer.java | 1 + 11 files changed, 748 insertions(+), 5 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 9d6dc41..a0a1d38 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2224,6 +2224,13 @@ public abstract class KylinConfigBase implements Serializable { return getOptional("kylin.stream.metrics.option", ""); } + /** + * whether to print encode integer value for count distinct string value, only for debug/test purpose + */ + public boolean isPrintRealtimeDictEnabled() { + return Boolean.parseBoolean(getOptional("kylin.stream.print-realtime-dict-enabled", "false")); + } + public long getStreamMetricsInterval() { return Long.parseLong(getOptional("kylin.stream.metrics.interval", "5")); } diff --git a/stream-core/pom.xml b/stream-core/pom.xml index e6fa05c..4cd1dcd 100644 --- a/stream-core/pom.xml +++ b/stream-core/pom.xml @@ -54,6 +54,11 @@ <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <scope>provided</scope> + </dependency> <dependency> <groupId>org.apache.curator</groupId> diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/dict/LocalDictionaryStore.java b/stream-core/src/main/java/org/apache/kylin/stream/core/dict/LocalDictionaryStore.java new file mode 100644 index 0000000..f84ea0c --- /dev/null +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/dict/LocalDictionaryStore.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.stream.core.dict; + +import com.google.common.base.Preconditions; +import org.apache.kylin.common.util.ByteArray; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.util.SizeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kylin.stream.core.dict.StreamingDictionaryClient.ID_FOR_EXCEPTION; +import static org.apache.kylin.stream.core.dict.StreamingDictionaryClient.ID_UNKNOWN; + +/** + * Please refer to https://github.com/facebook/rocksdb/wiki/RocksJava-Basics + */ +public class LocalDictionaryStore implements Closeable { + private static Logger logger = LoggerFactory.getLogger(LocalDictionaryStore.class); + private RocksDB db; + private File dictPath; + private String baseStorePath = "DictCache"; + private Map<ByteArray, ColumnFamilyHandle> columnFamilyHandleMap = new HashMap<>(); + String cubeName; + + public LocalDictionaryStore(String tableColumn) { + this.dictPath = new File(baseStorePath, tableColumn); + this.cubeName = tableColumn; + } + + public void init(String[] cfs) throws Exception { + logger.debug("Checking streaming dict local store for {} at {}.", cubeName, String.join(", ", cfs)); + if (!dictPath.exists() && dictPath.mkdirs()) { + logger.warn("Create {} failed.", dictPath); + } + // maybe following options is naive, should improve in the future + try (DBOptions options = new DBOptions() + .setCreateIfMissing(true) + .setCreateMissingColumnFamilies(true) + .setMaxBackgroundCompactions(5) + .setWritableFileMaxBufferSize(400 * SizeUnit.KB)) { + String dataPath = dictPath.getAbsolutePath() + "/data"; + List<ColumnFamilyDescriptor> columnFamilyDescriptorList = new ArrayList<>(); + List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<>(); // to be fill in + for (String family : cfs) { + ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( + family.getBytes(StandardCharsets.UTF_8)); + columnFamilyDescriptorList.add(columnFamilyDescriptor); + } + logger.debug("Try to open rocksdb {}.", dataPath); + db = RocksDB.open(options, dataPath, columnFamilyDescriptorList, columnFamilyHandleList); + Preconditions.checkNotNull(db, "RocksDB cannot created for some reasons."); + for (int i = 0; i < columnFamilyHandleList.size(); i++) { + columnFamilyHandleMap.put(new ByteArray(cfs[i].getBytes(StandardCharsets.UTF_8)), + columnFamilyHandleList.get(i)); + } + } catch (Exception e) { + logger.error("Init rocks db failed.", e); + throw e; + } + logger.debug("Init local dict succeed."); + } + + public boolean put(ByteArray column, String key, Integer value) { + try { + ColumnFamilyHandle handle = columnFamilyHandleMap.get(column); + Preconditions.checkNotNull(handle, + new String(column.array(), StandardCharsets.UTF_8) + " cannot find matched handle."); + db.put(handle, key.getBytes(StandardCharsets.UTF_8), value.toString().getBytes(StandardCharsets.UTF_8)); + return true; + } catch (Exception rdbe) { + logger.error("Put failed.", rdbe); + return false; + } + } + + /** + * Try get dictId for value. + * + * @return ID_UNKNOWN if not exists locally; ID_FOR_EXCEPTION for Exception; + * other positive value for real dictId + */ + public int encode(ByteArray column, String value) { + byte[] values; + try { + ColumnFamilyHandle handle = columnFamilyHandleMap.get(column); + Preconditions.checkNotNull(handle, + new String(column.array(), StandardCharsets.UTF_8) + " cannot find matched handle."); + values = db.get(handle, value.getBytes(StandardCharsets.UTF_8)); + } catch (Exception rdbe) { + logger.error("Can not get from rocksDB.", rdbe); + return ID_FOR_EXCEPTION; + } + if (values != null) { + if (values.length == 0) { + logger.warn("Empty values for {}", value); + return ID_UNKNOWN; + } else { + try { + return Integer.parseInt(new String(values, StandardCharsets.UTF_8)); + } catch (Exception e) { + logger.error("parseInt " + new ByteArray(values).toString(), e); + return ID_FOR_EXCEPTION; + } + } + } + return ID_UNKNOWN; + } + + @Override + public void close() { + logger.debug("Close rocks db."); + for (ColumnFamilyHandle familyHandle : columnFamilyHandleMap.values()) { + familyHandle.close(); + } + db.close(); + } +} diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/dict/RemoteDictionaryStore.java b/stream-core/src/main/java/org/apache/kylin/stream/core/dict/RemoteDictionaryStore.java new file mode 100644 index 0000000..0d65090 --- /dev/null +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/dict/RemoteDictionaryStore.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.stream.core.dict; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.lock.DistributedLock; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.HadoopUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import static org.apache.kylin.stream.core.dict.StreamingDictionaryClient.ID_FOR_EMPTY_STR; +import static org.apache.kylin.stream.core.dict.StreamingDictionaryClient.ID_FOR_EXCEPTION; +import static org.apache.kylin.stream.core.dict.StreamingDictionaryClient.ID_UNKNOWN; + +/** + * Used HBase as remote dictionary store + * Need drop this table manually when you don't need it. + */ +public class RemoteDictionaryStore { + private static Logger logger = LoggerFactory.getLogger(RemoteDictionaryStore.class); + + private final byte[] hbaseTableName; + private final String tableName; + private final byte[] encodeQualifierName = "encode_value".getBytes(StandardCharsets.UTF_8); + private final byte[] tsQualifierName = "ts".getBytes(StandardCharsets.UTF_8); + private Table table; + private boolean printValue = KylinConfig.getInstanceFromEnv().isPrintRealtimeDictEnabled(); + + public RemoteDictionaryStore(String cubeName) { + hbaseTableName = cubeName.getBytes(StandardCharsets.UTF_8); + tableName = cubeName; + } + + public void init(String[] cfs) throws IOException { + logger.debug("Checking streaming remote store for {} at {}.", tableName, String.join(", ", cfs)); + Connection conn = getConnection(); + Admin admin = conn.getAdmin(); + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(hbaseTableName)); + for (String family : cfs) { + HColumnDescriptor fd = new HColumnDescriptor(family); + desc.addFamily(fd); + } + DistributedLock lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentProcess(); + try { + boolean locked = lock.lock(lockPath()); + if (locked && !admin.tableExists(TableName.valueOf(hbaseTableName))) { + logger.info("Create htable with {}.", desc); + admin.createTable(desc); + } else { + logger.info("Table exists or cannot fetch lock {}", desc); + } + } finally { + admin.close(); + if (lock != null && lock.isLockedByMe(lockPath())) { + lock.unlock(lockPath()); + } + } + table = conn.getTable(TableName.valueOf(hbaseTableName)); + } + + /** + * <pre> + * 1. when size of rowkeyStr is zero, return ID_FOR_EMPTY_STR + * 2. when checkPrevious set to true + * 1. when rowkeyStr exists in HBase and related value equals to expectedValue return expectedValue (side effect is put putValue) + * 2. else return ID_UNKNOWN + * 3. when checkPrevious set to false + * 1. when rowkeyStr not exists in HBase return putValue (side effect is put putValue) + * 2. else return ID_UNKNOWN + * 4. when meet non IOException, return ID_FOR_EXCEPTION + * 5. when meet IOException, retry foever + * </pre> + */ + public int checkAndPutWithRetry(ByteArray columnFamily, String rowkeyStr, int expectedValue, int putValue, + boolean checkPrevious) { + IOException hbaseSideException; + int retryTimes = 0; + int encoedId = ID_FOR_EXCEPTION; + do { + try { + encoedId = checkAndPut(columnFamily, rowkeyStr, expectedValue, putValue, checkPrevious); + hbaseSideException = null; + } catch (IOException e) { + logger.error("CheckAndPut failed at " + rowkeyStr + ", columnFamily " + + new String(columnFamily.array(), StandardCharsets.UTF_8), e); + hbaseSideException = e; + retryTimes++; + try { + long sleep = 1000L * (retryTimes <= 10 ? retryTimes : 10); + logger.debug("Sleep to wait set succeed for {} ms.", sleep); + Thread.sleep(sleep); + } catch (InterruptedException ie) { + // DO NOTHING + } + } + } while (hbaseSideException != null); + return encoedId; + } + + int checkAndPut(ByteArray columnFamily, String rowkeyStr, int expectedValue, int putValue, boolean checkPrevious) + throws IOException { + byte[] rowkey = rowkeyStr.getBytes(StandardCharsets.UTF_8); + if (rowkey.length == 0) { + return ID_FOR_EMPTY_STR; + } + byte[] valueByte = Integer.toString(putValue).getBytes(StandardCharsets.UTF_8); + Put put = new Put(rowkey); + put.addColumn(columnFamily.array(), encodeQualifierName, valueByte); + put.addColumn(columnFamily.array(), tsQualifierName, Bytes.toBytes(System.currentTimeMillis())); + boolean hasPut = table.checkAndPut(rowkey, columnFamily.array(), encodeQualifierName, + checkPrevious ? Integer.toString(expectedValue).getBytes(StandardCharsets.UTF_8) : null, put); + if (hasPut) { + if (printValue) { + logger.debug("Encode {} to {}", rowkeyStr, putValue); + } + return putValue; + } else { + return ID_UNKNOWN; + } + } + + /** + * Retry forever + */ + public int encodeWithRetry(ByteArray column, String rowkeyStr) { + IOException hbaseSideException; + int retryTimes = 0; + int encoedId = ID_UNKNOWN; + do { + try { + encoedId = encode(column, rowkeyStr); + hbaseSideException = null; + } catch (IOException e) { + logger.error("Encode failed at " + rowkeyStr + ", column " + + new String(column.array(), StandardCharsets.UTF_8), e); + hbaseSideException = e; + retryTimes++; + try { + long sleep = 1000L * (retryTimes <= 10 ? retryTimes : 10); + logger.debug("Sleep to wait set succeed for {} ms.", sleep); + Thread.sleep(sleep); + } catch (InterruptedException ie) { + // DO NOTHING + } + } + } while (hbaseSideException != null); + return encoedId; + } + + /** + * Get encode integer from remote dictionary store. + */ + int encode(ByteArray column, String rowkeyStr) throws IOException { + byte[] rowkey = rowkeyStr.getBytes(StandardCharsets.UTF_8); + if (rowkey.length == 0) { + return ID_FOR_EMPTY_STR; + } + Get get = new Get(rowkey); + Result res = table.get(get); + byte[] resBytes = res.getValue(column.array(), encodeQualifierName); + byte[] tsBytes = res.getValue(column.array(), tsQualifierName); + String realId = new String(resBytes, StandardCharsets.UTF_8); + String ts = new String(tsBytes, StandardCharsets.UTF_8); + if (printValue) { + logger.debug("Encode {} to {} [{}]", rowkeyStr, realId, ts); + } + return Integer.parseInt(realId); + } + + static Connection getConnection() { + Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration()); + try { + return ConnectionFactory.createConnection(conf); + } catch (IOException ioe) { + throw new IllegalStateException("Cannot connect to HBase.", ioe); + } + } + + private String lockPath() { + return "/realtime/create_global_dict_table/" + tableName; + } +} \ No newline at end of file diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/dict/StreamingDictionaryClient.java b/stream-core/src/main/java/org/apache/kylin/stream/core/dict/StreamingDictionaryClient.java new file mode 100644 index 0000000..c9d4412 --- /dev/null +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/dict/StreamingDictionaryClient.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.stream.core.dict; + +import org.apache.kylin.common.Closeable; +import org.apache.kylin.common.util.ByteArray; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.Locale; +import java.util.concurrent.atomic.AtomicLong; + +/** + * <pre/> + * Used to encode string into integer for each cube at on time, it connect both local&remote dict store. + * One table/dir for each cube, one column family for each column in cube. + * Not thread-safe. + * + * For each receiver, it will ask allocation for a range of integer for String encode. Range will be [startDictId, startDictId + DICT_ID_RANGE_LEN); + * [startDictId, startDictId + offset) is currently be used as dictId, [startDictId + offset, startDictId + DICT_ID_RANGE_LEN) is not used. + * + * ID_FOR_EMPTY_STR is for empty string, from MIN_ID_FOR_NO_EMPTY_STR to MAX_ID_FOR_NO_EMPTY_STR is for non-empty string. + * </pre> + */ +public class StreamingDictionaryClient implements Closeable { + + private static Logger logger = LoggerFactory.getLogger(StreamingDictionaryClient.class); + + private static final ByteArray defaultCf = new ByteArray("default".getBytes(StandardCharsets.UTF_8)); + private static final String GLOBAL_START_ID = "GLOBAL_START_ID"; + public static final String MSG_TEMPLATE = "Dictionary Client Info: ExceptionLocal:%9d, ExceptionRemote:%9d, HitLocal:%9d, MissLocal:%9d, ALL:%9d ."; + + public static final int DICT_ID_RANGE_LEN = 30000; + + public static final int ID_UNKNOWN = -2; + public static final int ID_FOR_EXCEPTION = -1; + public static final int ID_FOR_EMPTY_STR = 0; + public static final int MIN_ID_FOR_NO_EMPTY_STR = 1; + public static final int MAX_ID_FOR_NO_EMPTY_STR = Integer.MAX_VALUE - DICT_ID_RANGE_LEN - 10; + + private int startDictId = -1; + private int offset = 0; + + /** Local stats for cache hit/miss/error */ + private AtomicLong encodeCounter = new AtomicLong(); + private AtomicLong hitLocal = new AtomicLong(); + private AtomicLong missLocal = new AtomicLong(); + private AtomicLong errorLocal = new AtomicLong(); + private AtomicLong errorRemote = new AtomicLong(); + private long lastCheck = System.currentTimeMillis(); + private long lastTotalError = 0; + + /** Local Storage Layer*/ + private LocalDictionaryStore localStore; + + /** Remote Storage Layer*/ + private RemoteDictionaryStore remoteStore; + + public StreamingDictionaryClient(String cubeName, String[] columns) { + String[] columnFamily = getCf(columns); + try { + localStore = new LocalDictionaryStore(cubeName); + remoteStore = new RemoteDictionaryStore(cubeName); + remoteStore.init(columnFamily); + localStore.init(columnFamily); + } catch (Exception e) { + throw new RuntimeException("Init dictionary failed.", e); + } + } + + /** + * Encoded a string into integer. + * Not thread-safe. + */ + public int encode(ByteArray column, String value) { + // pre check + checkDictIdRange(); + printStat(); + + // fetch from local + int localId = localStore.encode(column, value); + int expected = startDictId + offset; + if (localId >= ID_FOR_EMPTY_STR) { + hitLocal.addAndGet(1); + return localId; + } + + // fetch from remote + int remoteId; + if (localId == ID_FOR_EXCEPTION) { + errorLocal.addAndGet(1); + } else { + missLocal.addAndGet(1); + } + + remoteId = remoteStore.checkAndPutWithRetry(column, value, ID_UNKNOWN, expected, false); + if (remoteId == ID_FOR_EXCEPTION) { + // should be better here + errorRemote.addAndGet(1); + return MIN_ID_FOR_NO_EMPTY_STR; + } else if (remoteId == ID_UNKNOWN) { + // remote exists, fetch from remote + remoteId = remoteStore.encodeWithRetry(column, value); + } else { + // remote not exists, put to remote, advance offset + offset++; + } + + // set back to local cache + if (remoteId > ID_FOR_EMPTY_STR && !localStore.put(column, value, remoteId)) { + errorLocal.addAndGet(1); + } + return remoteId; + } + + //===================================================================================== + //================================= Internal method =================================== + + void checkDictIdRange() { + // init startDictId + if (startDictId == -1) { + logger.debug("Init dict range."); + int res = remoteStore.checkAndPutWithRetry(defaultCf, GLOBAL_START_ID, MIN_ID_FOR_NO_EMPTY_STR, + MIN_ID_FOR_NO_EMPTY_STR, false); + if (res != ID_UNKNOWN) { + logger.debug("First dictId in global."); + startDictId = MIN_ID_FOR_NO_EMPTY_STR; + } else { + startDictId = findStartId(); + logger.debug("After allcated, current startDictId is {}.", startDictId); + } + } + + // need to ask for another range + if (offset >= DICT_ID_RANGE_LEN - 1) { + logger.debug("Ask for another dictId range. Current startDictId is {}.", startDictId); + startDictId = findStartId(); + logger.debug("After allcated, current startDictId is {}.", startDictId); + offset = 0; + } + + if (startDictId >= MAX_ID_FOR_NO_EMPTY_STR) { + // do something here to fix overflow + } + } + + /** + * Try to find a exclusive dictId range for current process. + */ + int findStartId() { + int finalV = ID_UNKNOWN; + int oldV = remoteStore.encodeWithRetry(defaultCf, GLOBAL_START_ID); + boolean hasPut = false; + while (!hasPut) { + int res = remoteStore.checkAndPutWithRetry(defaultCf, GLOBAL_START_ID, oldV, oldV + DICT_ID_RANGE_LEN, + true); + if (res == ID_UNKNOWN) { // put failed + oldV = remoteStore.encodeWithRetry(defaultCf, GLOBAL_START_ID); + } else { // put success + finalV = res; + hasPut = true; + } + } + return finalV; + } + + /** + * Create column family for each column in one cube + */ + private String[] getCf(String[] columns) { + String[] cfs = new String[columns.length + 1]; + cfs[0] = "default"; // RocksDB need it + int idx = 1; + for (String col : columns) { + cfs[idx++] = col; + } + return cfs; + } + + private void printStat() { + long curr = encodeCounter.addAndGet(1); + if (System.currentTimeMillis() - lastCheck >= 10000) { + long totalError = errorRemote.get() + errorLocal.get(); + String msg = String.format(Locale.ROOT, MSG_TEMPLATE, errorLocal.get(), errorRemote.get(), hitLocal.get(), + missLocal.get(), curr); + if (totalError > lastTotalError) { + logger.warn("Exception in dict\n {}", msg); + lastTotalError = totalError; + } else { + logger.info(msg); + } + lastCheck = System.currentTimeMillis(); + } + } + + @Override + public void close() { + localStore.close(); + } +} diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/dict/StreamingDistributedDictionary.java b/stream-core/src/main/java/org/apache/kylin/stream/core/dict/StreamingDistributedDictionary.java new file mode 100644 index 0000000..8b4eca7 --- /dev/null +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/dict/StreamingDistributedDictionary.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.stream.core.dict; + +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Dictionary; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; + +/** + * Encode string into integer in real-time and distributed way. + */ +public class StreamingDistributedDictionary extends Dictionary<String> { + + private ByteArray cubeColumn; + private transient StreamingDictionaryClient streamingDictionaryClient; + + public StreamingDistributedDictionary(String cubeColumn, StreamingDictionaryClient streamingDictionaryClient) { + this.streamingDictionaryClient = streamingDictionaryClient; + this.cubeColumn = new ByteArray(cubeColumn.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public int getMinId() { + return Integer.MIN_VALUE; + } + + @Override + public int getMaxId() { + return Integer.MAX_VALUE; + } + + @Override + public int getSizeOfId() { + return 0; + } + + @Override + public int getSizeOfValue() { + return 0; + } + + @Override + public boolean contains(Dictionary<?> another) { + return false; + } + + @Override + protected int getIdFromValueImpl(String value, int roundingFlag) { + return streamingDictionaryClient.encode(cubeColumn, value); + } + + @Override + protected String getValueFromIdImpl(int id) { + return ""; + } + + @Override + public void dump(PrintStream out) { + throw new UnsupportedOperationException("Do not dump me."); + } + + @Override + public void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException("Do not copy me."); + } + + @Override + public void readFields(DataInput in) { + throw new UnsupportedOperationException("Do not read me."); + } +} diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/IStreamingSegmentStore.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/IStreamingSegmentStore.java index a5e082b..adc49a5 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/IStreamingSegmentStore.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/IStreamingSegmentStore.java @@ -20,7 +20,10 @@ package org.apache.kylin.stream.core.storage; import java.io.File; import java.io.IOException; +import java.util.Map; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.stream.core.model.StreamingMessage; import org.apache.kylin.stream.core.model.stats.SegmentStoreStats; import org.apache.kylin.stream.core.query.IStreamingGTSearcher; @@ -30,6 +33,8 @@ public interface IStreamingSegmentStore extends IStreamingGTSearcher { int addEvent(StreamingMessage event); + default void addExternalDict(Map<TblColRef, Dictionary<String>> dictMap){} + File getStorePath(); void persist(); diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java index fa9b9ab..580e054 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java @@ -26,21 +26,31 @@ import java.lang.reflect.Constructor; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.TimeZone; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; - +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.measure.bitmap.BitmapMeasureType; +import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentRange.TSRange; +import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.stream.core.consumer.IConsumerProvider; import org.apache.kylin.stream.core.consumer.StreamingConsumerChannel; +import org.apache.kylin.stream.core.dict.StreamingDictionaryClient; +import org.apache.kylin.stream.core.dict.StreamingDistributedDictionary; import org.apache.kylin.stream.core.model.StreamingMessage; import org.apache.kylin.stream.core.model.stats.LongLatencyInfo; import org.apache.kylin.stream.core.model.stats.SegmentStats; @@ -61,6 +71,7 @@ public class StreamingSegmentManager implements Closeable { private static Logger logger = LoggerFactory.getLogger(StreamingSegmentManager.class); private final String cubeName; private final CubeInstance cubeInstance; + private AtomicBoolean closed = new AtomicBoolean(false); /** * Cube window defines how streaming events are divided and put into different segments , for example 1 hour per segment(indexer). @@ -105,6 +116,8 @@ public class StreamingSegmentManager implements Closeable { private AtomicLong dropCounts = new AtomicLong(); private volatile long latestEventTime = 0; private volatile long latestEventIngestTime = 0; + private Map<TblColRef, Dictionary<String>> dictionaryMap = new HashMap<>(); + private StreamingDictionaryClient streamingDictionaryClient; public StreamingSegmentManager(String baseStorePath, CubeInstance cubeInstance, ISourcePositionHandler sourcePosHandler, IConsumerProvider consumerProvider) { this.baseStorePath = baseStorePath; @@ -125,6 +138,27 @@ public class StreamingSegmentManager implements Closeable { this.longLatencyInfo = new LongLatencyInfo(); this.checkPointStore = new CheckPointStore(cubeName, cubeDataFolder, cubeInstance.getConfig() .getStreamingCheckPointFileMaxNum()); + + // Prepare for realtime dictionary encoder. + CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); + List<MeasureDesc> bitmapMeasureList = cube.getDescriptor().getMeasures().stream() + .filter(measureDesc -> measureDesc.getFunction().getMeasureType() instanceof BitmapMeasureType) + .collect(Collectors.toList()); + if (!bitmapMeasureList.isEmpty()) { + List<String> realtimeDictColumn = bitmapMeasureList.stream() + .map(measureDesc -> measureDesc.getFunction().getParameter().getColRef().getIdentity()) + .collect(Collectors.toList()); + String str = String.join(", ", realtimeDictColumn); + logger.info("Find these columns {} need to be encoded realtime.", str); + List<TblColRef> tblColRefs = bitmapMeasureList.stream() + .map(measureDesc -> measureDesc.getFunction().getParameter().getColRef()) + .collect(Collectors.toList()); + + streamingDictionaryClient = new StreamingDictionaryClient(cubeName, + realtimeDictColumn.toArray(new String[0])); + tblColRefs.forEach(col -> dictionaryMap.put(col, + new StreamingDistributedDictionary(col.getIdentity(), streamingDictionaryClient))); + } } public void addEvent(StreamingMessage event) { @@ -365,6 +399,7 @@ public class StreamingSegmentManager implements Closeable { Constructor<IStreamingSegmentStore> constructor = clazz.getConstructor(String.class, CubeInstance.class, String.class); segmentStore = constructor.newInstance(baseStorePath, cubeInstance, segmentName); + segmentStore.addExternalDict(dictionaryMap); } catch (Exception e) { logger.warn("Fail to construct an instance for " + storeClassName + ". Will use the default store: ColumnarSegmentStore"); @@ -556,6 +591,10 @@ public class StreamingSegmentManager implements Closeable { @Override public void close() { + if (closed.get()){ + logger.debug("Already close it, skip."); + return; + } logger.warn("Closing Streaming Cube store, cubeName={}", cubeName); checkpoint(); logger.warn("{} ingested {} , dropped {}, long latency {}", cubeName, ingestCount.get(), dropCounts.get(), @@ -567,6 +606,10 @@ public class StreamingSegmentManager implements Closeable { logger.error("fail to close cube segment, segment :" + cubesegment.getSegmentName(), e); } } + if (streamingDictionaryClient != null) { + streamingDictionaryClient.close(); + } + closed.set(true); } public synchronized void checkpoint() { diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java index 7d61e52..14a92f7 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java @@ -24,6 +24,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -35,7 +36,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.io.FileUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.stream.core.exception.IllegalStorageException; import org.apache.kylin.stream.core.metrics.StreamingMetrics; import org.apache.kylin.stream.core.model.StreamingMessage; @@ -92,6 +95,8 @@ public class ColumnarSegmentStore implements IStreamingSegmentStore { private List<DataSegmentFragment> fragments = Lists.newCopyOnWriteArrayList(); protected int latestCheckpointFragment = 0; + private Map<TblColRef, Dictionary<String>> dictionaryMap; + public ColumnarSegmentStore(String baseStorePath, CubeInstance cubeInstance, String segmentName) { this.maxRowsInMemory = cubeInstance.getConfig().getStreamingIndexMaxRows(); this.baseStorePath = baseStorePath; @@ -139,6 +144,12 @@ public class ColumnarSegmentStore implements IStreamingSegmentStore { } @Override + public void addExternalDict(Map<TblColRef, Dictionary<String>> dictMap) { + this.dictionaryMap = dictMap; + this.activeMemoryStore.setDictionaryMap(dictMap); + } + + @Override public File getStorePath() { return dataSegmentFolder; } @@ -163,6 +174,7 @@ public class ColumnarSegmentStore implements IStreamingSegmentStore { newFragment = createNewFragment(); persistingMemoryStore = activeMemoryStore; activeMemoryStore = new SegmentMemoryStore(parsedStreamingCubeInfo, segmentName); + activeMemoryStore.setDictionaryMap(dictionaryMap); } finally { persistWriteLock.unlock(); } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java index 1b0bb6a..243fd05 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java @@ -28,7 +28,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; - +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.measure.MeasureAggregators; import org.apache.kylin.measure.topn.TopNAggregator; @@ -71,6 +71,12 @@ public class SegmentMemoryStore implements IStreamingGTSearcher { private long minEventTime = Long.MAX_VALUE; private long maxEventTime = 0; + private Map<TblColRef, Dictionary<String>> dictionaryMap; + + public void setDictionaryMap(Map<TblColRef, Dictionary<String>> dictionaryMap) { + this.dictionaryMap = dictionaryMap; + } + public SegmentMemoryStore(ParsedStreamingCubeInfo parsedStreamingCubeInfo, String segmentName) { this.parsedStreamingCubeInfo = parsedStreamingCubeInfo; this.segmentName = segmentName; @@ -84,7 +90,6 @@ public class SegmentMemoryStore implements IStreamingGTSearcher { StringArrayComparator.INSTANCE)); } } - } public int index(StreamingMessage event) { @@ -162,8 +167,7 @@ public class SegmentMemoryStore implements IStreamingGTSearcher { } inputToMeasure[i] = value; } - - return parsedStreamingCubeInfo.measureIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, null); + return parsedStreamingCubeInfo.measureIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap); } @SuppressWarnings("unchecked") diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java index 408a5e7a..95630c6 100644 --- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java +++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java @@ -608,6 +608,7 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis StreamingSegmentManager segmentManager = getStreamingSegmentManager(cubeName); if (segmentManager != null) { streamingSegmentManagerMap.remove(cubeName); + segmentManager.close(); segmentManager.purgeAllSegments(); } }