This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 4f3d8e70124349b121e920b93a0683303014ef11
Author: XiaoxiangYu <hit_la...@126.com>
AuthorDate: Fri Sep 20 23:23:42 2019 +0800

    KYLIN-4141 Build Global Dictionary in no time
    
    Currently, realtime OLAP do not support COUNT_DISTINCT(bitmap) because of 
lack the ability of encoding string at once. Here I want to use RocksDB & HBase 
as implementation of streaming distributed dictionary to do that.
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   7 +
 stream-core/pom.xml                                |   5 +
 .../stream/core/dict/LocalDictionaryStore.java     | 144 ++++++++++++++
 .../stream/core/dict/RemoteDictionaryStore.java    | 215 ++++++++++++++++++++
 .../core/dict/StreamingDictionaryClient.java       | 216 +++++++++++++++++++++
 .../core/dict/StreamingDistributedDictionary.java  |  91 +++++++++
 .../core/storage/IStreamingSegmentStore.java       |   5 +
 .../core/storage/StreamingSegmentManager.java      |  45 ++++-
 .../storage/columnar/ColumnarSegmentStore.java     |  12 ++
 .../core/storage/columnar/SegmentMemoryStore.java  |  12 +-
 .../kylin/stream/server/StreamingServer.java       |   1 +
 11 files changed, 748 insertions(+), 5 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 9d6dc41..a0a1d38 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2224,6 +2224,13 @@ public abstract class KylinConfigBase implements 
Serializable {
         return getOptional("kylin.stream.metrics.option", "");
     }
 
+    /**
+     * whether to print encode integer value for count distinct string value, 
only for debug/test purpose
+     */
+    public boolean isPrintRealtimeDictEnabled() {
+        return 
Boolean.parseBoolean(getOptional("kylin.stream.print-realtime-dict-enabled", 
"false"));
+    }
+
     public long getStreamMetricsInterval() {
         return Long.parseLong(getOptional("kylin.stream.metrics.interval", 
"5"));
     }
diff --git a/stream-core/pom.xml b/stream-core/pom.xml
index e6fa05c..4cd1dcd 100644
--- a/stream-core/pom.xml
+++ b/stream-core/pom.xml
@@ -54,6 +54,11 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <scope>provided</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.curator</groupId>
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/dict/LocalDictionaryStore.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/dict/LocalDictionaryStore.java
new file mode 100644
index 0000000..f84ea0c
--- /dev/null
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/dict/LocalDictionaryStore.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.stream.core.dict;
+
+import com.google.common.base.Preconditions;
+import org.apache.kylin.common.util.ByteArray;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.util.SizeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kylin.stream.core.dict.StreamingDictionaryClient.ID_FOR_EXCEPTION;
+import static 
org.apache.kylin.stream.core.dict.StreamingDictionaryClient.ID_UNKNOWN;
+
+/**
+ * Please refer to https://github.com/facebook/rocksdb/wiki/RocksJava-Basics
+ */
+public class LocalDictionaryStore implements Closeable {
+    private static Logger logger = 
LoggerFactory.getLogger(LocalDictionaryStore.class);
+    private RocksDB db;
+    private File dictPath;
+    private String baseStorePath = "DictCache";
+    private Map<ByteArray, ColumnFamilyHandle> columnFamilyHandleMap = new 
HashMap<>();
+    String cubeName;
+
+    public LocalDictionaryStore(String tableColumn) {
+        this.dictPath = new File(baseStorePath, tableColumn);
+        this.cubeName = tableColumn;
+    }
+
+    public void init(String[] cfs) throws Exception {
+        logger.debug("Checking streaming dict local store for {} at {}.", 
cubeName, String.join(", ", cfs));
+        if (!dictPath.exists() && dictPath.mkdirs()) {
+            logger.warn("Create {} failed.", dictPath);
+        }
+        // maybe following options is naive, should improve in the future
+        try (DBOptions options = new DBOptions()
+                .setCreateIfMissing(true)
+                .setCreateMissingColumnFamilies(true)
+                .setMaxBackgroundCompactions(5)
+                .setWritableFileMaxBufferSize(400 * SizeUnit.KB)) {
+            String dataPath = dictPath.getAbsolutePath() + "/data";
+            List<ColumnFamilyDescriptor> columnFamilyDescriptorList = new 
ArrayList<>();
+            List<ColumnFamilyHandle> columnFamilyHandleList = new 
ArrayList<>(); // to be fill in
+            for (String family : cfs) {
+                ColumnFamilyDescriptor columnFamilyDescriptor = new 
ColumnFamilyDescriptor(
+                        family.getBytes(StandardCharsets.UTF_8));
+                columnFamilyDescriptorList.add(columnFamilyDescriptor);
+            }
+            logger.debug("Try to open rocksdb {}.", dataPath);
+            db = RocksDB.open(options, dataPath, columnFamilyDescriptorList, 
columnFamilyHandleList);
+            Preconditions.checkNotNull(db, "RocksDB cannot created for some 
reasons.");
+            for (int i = 0; i < columnFamilyHandleList.size(); i++) {
+                columnFamilyHandleMap.put(new 
ByteArray(cfs[i].getBytes(StandardCharsets.UTF_8)),
+                        columnFamilyHandleList.get(i));
+            }
+        } catch (Exception e) {
+            logger.error("Init rocks db failed.", e);
+            throw e;
+        }
+        logger.debug("Init local dict succeed.");
+    }
+
+    public boolean put(ByteArray column, String key, Integer value) {
+        try {
+            ColumnFamilyHandle handle = columnFamilyHandleMap.get(column);
+            Preconditions.checkNotNull(handle,
+                    new String(column.array(), StandardCharsets.UTF_8) + " 
cannot find matched handle.");
+            db.put(handle, key.getBytes(StandardCharsets.UTF_8), 
value.toString().getBytes(StandardCharsets.UTF_8));
+            return true;
+        } catch (Exception rdbe) {
+            logger.error("Put failed.", rdbe);
+            return false;
+        }
+    }
+
+    /**
+     * Try get dictId for value.
+     *
+     * @return ID_UNKNOWN if not exists locally; ID_FOR_EXCEPTION for 
Exception;
+     *      other positive value for real dictId
+     */
+    public int encode(ByteArray column, String value) {
+        byte[] values;
+        try {
+            ColumnFamilyHandle handle = columnFamilyHandleMap.get(column);
+            Preconditions.checkNotNull(handle,
+                    new String(column.array(), StandardCharsets.UTF_8) + " 
cannot find matched handle.");
+            values = db.get(handle, value.getBytes(StandardCharsets.UTF_8));
+        } catch (Exception rdbe) {
+            logger.error("Can not get from rocksDB.", rdbe);
+            return ID_FOR_EXCEPTION;
+        }
+        if (values != null) {
+            if (values.length == 0) {
+                logger.warn("Empty values for {}", value);
+                return ID_UNKNOWN;
+            } else {
+                try {
+                    return Integer.parseInt(new String(values, 
StandardCharsets.UTF_8));
+                } catch (Exception e) {
+                    logger.error("parseInt " + new 
ByteArray(values).toString(), e);
+                    return ID_FOR_EXCEPTION;
+                }
+            }
+        }
+        return ID_UNKNOWN;
+    }
+
+    @Override
+    public void close() {
+        logger.debug("Close rocks db.");
+        for (ColumnFamilyHandle familyHandle : columnFamilyHandleMap.values()) 
{
+            familyHandle.close();
+        }
+        db.close();
+    }
+}
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/dict/RemoteDictionaryStore.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/dict/RemoteDictionaryStore.java
new file mode 100644
index 0000000..0d65090
--- /dev/null
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/dict/RemoteDictionaryStore.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.stream.core.dict;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.lock.DistributedLock;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static 
org.apache.kylin.stream.core.dict.StreamingDictionaryClient.ID_FOR_EMPTY_STR;
+import static 
org.apache.kylin.stream.core.dict.StreamingDictionaryClient.ID_FOR_EXCEPTION;
+import static 
org.apache.kylin.stream.core.dict.StreamingDictionaryClient.ID_UNKNOWN;
+
+/**
+ * Used HBase as remote dictionary store
+ * Need drop this table manually when you don't need it.
+ */
+public class RemoteDictionaryStore {
+    private static Logger logger = 
LoggerFactory.getLogger(RemoteDictionaryStore.class);
+
+    private final byte[] hbaseTableName;
+    private final String tableName;
+    private final byte[] encodeQualifierName = 
"encode_value".getBytes(StandardCharsets.UTF_8);
+    private final byte[] tsQualifierName = 
"ts".getBytes(StandardCharsets.UTF_8);
+    private Table table;
+    private boolean printValue = 
KylinConfig.getInstanceFromEnv().isPrintRealtimeDictEnabled();
+
+    public RemoteDictionaryStore(String cubeName) {
+        hbaseTableName = cubeName.getBytes(StandardCharsets.UTF_8);
+        tableName = cubeName;
+    }
+
+    public void init(String[] cfs) throws IOException {
+        logger.debug("Checking streaming remote store for {} at {}.", 
tableName, String.join(", ", cfs));
+        Connection conn = getConnection();
+        Admin admin = conn.getAdmin();
+        HTableDescriptor desc = new 
HTableDescriptor(TableName.valueOf(hbaseTableName));
+        for (String family : cfs) {
+            HColumnDescriptor fd = new HColumnDescriptor(family);
+            desc.addFamily(fd);
+        }
+        DistributedLock lock = 
KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentProcess();
+        try {
+            boolean locked = lock.lock(lockPath());
+            if (locked && 
!admin.tableExists(TableName.valueOf(hbaseTableName))) {
+                logger.info("Create htable with {}.", desc);
+                admin.createTable(desc);
+            } else {
+                logger.info("Table exists or cannot fetch lock {}", desc);
+            }
+        } finally {
+            admin.close();
+            if (lock != null && lock.isLockedByMe(lockPath())) {
+                lock.unlock(lockPath());
+            }
+        }
+        table = conn.getTable(TableName.valueOf(hbaseTableName));
+    }
+
+    /**
+     * <pre>
+     * 1. when size of rowkeyStr is zero, return ID_FOR_EMPTY_STR
+     * 2. when checkPrevious set to true
+     *      1. when rowkeyStr exists in HBase and related value equals to 
expectedValue return expectedValue (side effect is put putValue)
+     *      2. else return ID_UNKNOWN
+     * 3. when checkPrevious set to false
+     *      1. when rowkeyStr not exists in HBase return putValue (side effect 
is put putValue)
+     *      2. else return ID_UNKNOWN
+     * 4. when meet non IOException, return ID_FOR_EXCEPTION
+     * 5. when meet IOException, retry foever
+     * </pre>
+     */
+    public int checkAndPutWithRetry(ByteArray columnFamily, String rowkeyStr, 
int expectedValue, int putValue,
+            boolean checkPrevious) {
+        IOException hbaseSideException;
+        int retryTimes = 0;
+        int encoedId = ID_FOR_EXCEPTION;
+        do {
+            try {
+                encoedId = checkAndPut(columnFamily, rowkeyStr, expectedValue, 
putValue, checkPrevious);
+                hbaseSideException = null;
+            } catch (IOException e) {
+                logger.error("CheckAndPut failed at " + rowkeyStr + ", 
columnFamily "
+                        + new String(columnFamily.array(), 
StandardCharsets.UTF_8), e);
+                hbaseSideException = e;
+                retryTimes++;
+                try {
+                    long sleep = 1000L * (retryTimes <= 10 ? retryTimes : 10);
+                    logger.debug("Sleep to wait set succeed for {} ms.", 
sleep);
+                    Thread.sleep(sleep);
+                } catch (InterruptedException ie) {
+                    // DO NOTHING
+                }
+            }
+        } while (hbaseSideException != null);
+        return encoedId;
+    }
+
+    int checkAndPut(ByteArray columnFamily, String rowkeyStr, int 
expectedValue, int putValue, boolean checkPrevious)
+            throws IOException {
+        byte[] rowkey = rowkeyStr.getBytes(StandardCharsets.UTF_8);
+        if (rowkey.length == 0) {
+            return ID_FOR_EMPTY_STR;
+        }
+        byte[] valueByte = 
Integer.toString(putValue).getBytes(StandardCharsets.UTF_8);
+        Put put = new Put(rowkey);
+        put.addColumn(columnFamily.array(), encodeQualifierName, valueByte);
+        put.addColumn(columnFamily.array(), tsQualifierName, 
Bytes.toBytes(System.currentTimeMillis()));
+        boolean hasPut = table.checkAndPut(rowkey, columnFamily.array(), 
encodeQualifierName,
+                checkPrevious ? 
Integer.toString(expectedValue).getBytes(StandardCharsets.UTF_8) : null, put);
+        if (hasPut) {
+            if (printValue) {
+                logger.debug("Encode {} to {}", rowkeyStr, putValue);
+            }
+            return putValue;
+        } else {
+            return ID_UNKNOWN;
+        }
+    }
+
+    /**
+     * Retry forever
+     */
+    public int encodeWithRetry(ByteArray column, String rowkeyStr) {
+        IOException hbaseSideException;
+        int retryTimes = 0;
+        int encoedId = ID_UNKNOWN;
+        do {
+            try {
+                encoedId = encode(column, rowkeyStr);
+                hbaseSideException = null;
+            } catch (IOException e) {
+                logger.error("Encode failed at " + rowkeyStr + ", column "
+                        + new String(column.array(), StandardCharsets.UTF_8), 
e);
+                hbaseSideException = e;
+                retryTimes++;
+                try {
+                    long sleep = 1000L * (retryTimes <= 10 ? retryTimes : 10);
+                    logger.debug("Sleep to wait set succeed for {} ms.", 
sleep);
+                    Thread.sleep(sleep);
+                } catch (InterruptedException ie) {
+                    // DO NOTHING
+                }
+            }
+        } while (hbaseSideException != null);
+        return encoedId;
+    }
+
+    /**
+     * Get encode integer from remote dictionary store.
+     */
+    int encode(ByteArray column, String rowkeyStr) throws IOException {
+        byte[] rowkey = rowkeyStr.getBytes(StandardCharsets.UTF_8);
+        if (rowkey.length == 0) {
+            return ID_FOR_EMPTY_STR;
+        }
+        Get get = new Get(rowkey);
+        Result res = table.get(get);
+        byte[] resBytes = res.getValue(column.array(), encodeQualifierName);
+        byte[] tsBytes = res.getValue(column.array(), tsQualifierName);
+        String realId = new String(resBytes, StandardCharsets.UTF_8);
+        String ts = new String(tsBytes, StandardCharsets.UTF_8);
+        if (printValue) {
+            logger.debug("Encode {} to {} [{}]", rowkeyStr, realId, ts);
+        }
+        return Integer.parseInt(realId);
+    }
+
+    static Connection getConnection() {
+        Configuration conf = 
HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration());
+        try {
+            return ConnectionFactory.createConnection(conf);
+        } catch (IOException ioe) {
+            throw new IllegalStateException("Cannot connect to HBase.", ioe);
+        }
+    }
+
+    private String lockPath() {
+        return "/realtime/create_global_dict_table/" + tableName;
+    }
+}
\ No newline at end of file
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/dict/StreamingDictionaryClient.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/dict/StreamingDictionaryClient.java
new file mode 100644
index 0000000..c9d4412
--- /dev/null
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/dict/StreamingDictionaryClient.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.stream.core.dict;
+
+import org.apache.kylin.common.Closeable;
+import org.apache.kylin.common.util.ByteArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Locale;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * <pre/>
+ * Used to encode string into integer for each cube at on time, it connect 
both local&remote dict store.
+ * One table/dir for each cube, one column family for each column in cube.
+ * Not thread-safe.
+ *
+ * For each receiver, it will ask allocation for a range of integer for String 
encode. Range will be [startDictId, startDictId + DICT_ID_RANGE_LEN);
+ * [startDictId, startDictId + offset) is currently be used as dictId, 
[startDictId + offset, startDictId + DICT_ID_RANGE_LEN) is not used.
+ *
+ * ID_FOR_EMPTY_STR is for empty string, from MIN_ID_FOR_NO_EMPTY_STR to 
MAX_ID_FOR_NO_EMPTY_STR is for non-empty string.
+ * </pre>
+ */
+public class StreamingDictionaryClient implements Closeable {
+
+    private static Logger logger = 
LoggerFactory.getLogger(StreamingDictionaryClient.class);
+
+    private static final ByteArray defaultCf = new 
ByteArray("default".getBytes(StandardCharsets.UTF_8));
+    private static final String GLOBAL_START_ID = "GLOBAL_START_ID";
+    public static final String MSG_TEMPLATE = "Dictionary Client Info:  
ExceptionLocal:%9d,  ExceptionRemote:%9d, HitLocal:%9d,  MissLocal:%9d,  
ALL:%9d .";
+
+    public static final int DICT_ID_RANGE_LEN = 30000;
+
+    public static final int ID_UNKNOWN = -2;
+    public static final int ID_FOR_EXCEPTION = -1;
+    public static final int ID_FOR_EMPTY_STR = 0;
+    public static final int MIN_ID_FOR_NO_EMPTY_STR = 1;
+    public static final int MAX_ID_FOR_NO_EMPTY_STR = Integer.MAX_VALUE - 
DICT_ID_RANGE_LEN - 10;
+
+    private int startDictId = -1;
+    private int offset = 0;
+
+    /** Local stats for cache hit/miss/error */
+    private AtomicLong encodeCounter = new AtomicLong();
+    private AtomicLong hitLocal = new AtomicLong();
+    private AtomicLong missLocal = new AtomicLong();
+    private AtomicLong errorLocal = new AtomicLong();
+    private AtomicLong errorRemote = new AtomicLong();
+    private long lastCheck = System.currentTimeMillis();
+    private long lastTotalError = 0;
+
+    /** Local Storage Layer*/
+    private LocalDictionaryStore localStore;
+
+    /** Remote Storage Layer*/
+    private RemoteDictionaryStore remoteStore;
+
+    public StreamingDictionaryClient(String cubeName, String[] columns) {
+        String[] columnFamily = getCf(columns);
+        try {
+            localStore = new LocalDictionaryStore(cubeName);
+            remoteStore = new RemoteDictionaryStore(cubeName);
+            remoteStore.init(columnFamily);
+            localStore.init(columnFamily);
+        } catch (Exception e) {
+            throw new RuntimeException("Init dictionary failed.", e);
+        }
+    }
+
+    /**
+     * Encoded a string into integer.
+     * Not thread-safe.
+     */
+    public int encode(ByteArray column, String value) {
+        // pre check
+        checkDictIdRange();
+        printStat();
+
+        // fetch from local
+        int localId = localStore.encode(column, value);
+        int expected = startDictId + offset;
+        if (localId >= ID_FOR_EMPTY_STR) {
+            hitLocal.addAndGet(1);
+            return localId;
+        }
+
+        // fetch from remote
+        int remoteId;
+        if (localId == ID_FOR_EXCEPTION) {
+            errorLocal.addAndGet(1);
+        } else {
+            missLocal.addAndGet(1);
+        }
+
+        remoteId = remoteStore.checkAndPutWithRetry(column, value, ID_UNKNOWN, 
expected, false);
+        if (remoteId == ID_FOR_EXCEPTION) {
+            // should be better here
+            errorRemote.addAndGet(1);
+            return MIN_ID_FOR_NO_EMPTY_STR;
+        } else if (remoteId == ID_UNKNOWN) {
+            // remote exists, fetch from remote
+            remoteId = remoteStore.encodeWithRetry(column, value);
+        } else {
+            // remote not exists, put to remote, advance offset
+            offset++;
+        }
+
+        // set back to local cache
+        if (remoteId > ID_FOR_EMPTY_STR && !localStore.put(column, value, 
remoteId)) {
+            errorLocal.addAndGet(1);
+        }
+        return remoteId;
+    }
+
+    
//=====================================================================================
+    //================================= Internal method 
===================================
+
+    void checkDictIdRange() {
+        // init startDictId
+        if (startDictId == -1) {
+            logger.debug("Init dict range.");
+            int res = remoteStore.checkAndPutWithRetry(defaultCf, 
GLOBAL_START_ID, MIN_ID_FOR_NO_EMPTY_STR,
+                    MIN_ID_FOR_NO_EMPTY_STR, false);
+            if (res != ID_UNKNOWN) {
+                logger.debug("First dictId in global.");
+                startDictId = MIN_ID_FOR_NO_EMPTY_STR;
+            } else {
+                startDictId = findStartId();
+                logger.debug("After allcated, current startDictId is {}.", 
startDictId);
+            }
+        }
+
+        // need to ask for another range
+        if (offset >= DICT_ID_RANGE_LEN - 1) {
+            logger.debug("Ask for another dictId range. Current startDictId is 
{}.", startDictId);
+            startDictId = findStartId();
+            logger.debug("After allcated, current startDictId is {}.", 
startDictId);
+            offset = 0;
+        }
+
+        if (startDictId >= MAX_ID_FOR_NO_EMPTY_STR) {
+            // do something here to fix overflow
+        }
+    }
+
+    /**
+     * Try to find a exclusive dictId range for current process.
+     */
+    int findStartId() {
+        int finalV = ID_UNKNOWN;
+        int oldV = remoteStore.encodeWithRetry(defaultCf, GLOBAL_START_ID);
+        boolean hasPut = false;
+        while (!hasPut) {
+            int res = remoteStore.checkAndPutWithRetry(defaultCf, 
GLOBAL_START_ID, oldV, oldV + DICT_ID_RANGE_LEN,
+                    true);
+            if (res == ID_UNKNOWN) { // put failed
+                oldV = remoteStore.encodeWithRetry(defaultCf, GLOBAL_START_ID);
+            } else { // put success
+                finalV = res;
+                hasPut = true;
+            }
+        }
+        return finalV;
+    }
+
+    /**
+     * Create column family for each column in one cube
+     */
+    private String[] getCf(String[] columns) {
+        String[] cfs = new String[columns.length + 1];
+        cfs[0] = "default"; // RocksDB need it
+        int idx = 1;
+        for (String col : columns) {
+            cfs[idx++] = col;
+        }
+        return cfs;
+    }
+
+    private void printStat() {
+        long curr = encodeCounter.addAndGet(1);
+        if (System.currentTimeMillis() - lastCheck >= 10000) {
+            long totalError = errorRemote.get() + errorLocal.get();
+            String msg = String.format(Locale.ROOT, MSG_TEMPLATE, 
errorLocal.get(), errorRemote.get(), hitLocal.get(),
+                    missLocal.get(), curr);
+            if (totalError > lastTotalError) {
+                logger.warn("Exception in dict\n {}", msg);
+                lastTotalError = totalError;
+            } else {
+                logger.info(msg);
+            }
+            lastCheck = System.currentTimeMillis();
+        }
+    }
+
+    @Override
+    public void close() {
+        localStore.close();
+    }
+}
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/dict/StreamingDistributedDictionary.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/dict/StreamingDistributedDictionary.java
new file mode 100644
index 0000000..8b4eca7
--- /dev/null
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/dict/StreamingDistributedDictionary.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.stream.core.dict;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Dictionary;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Encode string into integer in real-time and distributed way.
+ */
+public class StreamingDistributedDictionary extends Dictionary<String> {
+
+    private ByteArray cubeColumn;
+    private transient StreamingDictionaryClient streamingDictionaryClient;
+
+    public StreamingDistributedDictionary(String cubeColumn, 
StreamingDictionaryClient streamingDictionaryClient) {
+        this.streamingDictionaryClient = streamingDictionaryClient;
+        this.cubeColumn = new 
ByteArray(cubeColumn.getBytes(StandardCharsets.UTF_8));
+    }
+
+    @Override
+    public int getMinId() {
+        return Integer.MIN_VALUE;
+    }
+
+    @Override
+    public int getMaxId() {
+        return Integer.MAX_VALUE;
+    }
+
+    @Override
+    public int getSizeOfId() {
+        return 0;
+    }
+
+    @Override
+    public int getSizeOfValue() {
+        return 0;
+    }
+
+    @Override
+    public boolean contains(Dictionary<?> another) {
+        return false;
+    }
+
+    @Override
+    protected int getIdFromValueImpl(String value, int roundingFlag) {
+        return streamingDictionaryClient.encode(cubeColumn, value);
+    }
+
+    @Override
+    protected String getValueFromIdImpl(int id) {
+        return "";
+    }
+
+    @Override
+    public void dump(PrintStream out) {
+        throw new UnsupportedOperationException("Do not dump me.");
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        throw new UnsupportedOperationException("Do not copy me.");
+    }
+
+    @Override
+    public void readFields(DataInput in) {
+        throw new UnsupportedOperationException("Do not read me.");
+    }
+}
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/IStreamingSegmentStore.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/IStreamingSegmentStore.java
index a5e082b..adc49a5 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/IStreamingSegmentStore.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/IStreamingSegmentStore.java
@@ -20,7 +20,10 @@ package org.apache.kylin.stream.core.storage;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Map;
 
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.stream.core.model.StreamingMessage;
 import org.apache.kylin.stream.core.model.stats.SegmentStoreStats;
 import org.apache.kylin.stream.core.query.IStreamingGTSearcher;
@@ -30,6 +33,8 @@ public interface IStreamingSegmentStore extends 
IStreamingGTSearcher {
 
     int addEvent(StreamingMessage event);
 
+    default void addExternalDict(Map<TblColRef, Dictionary<String>> dictMap){}
+
     File getStorePath();
 
     void persist();
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
index fa9b9ab..580e054 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
@@ -26,21 +26,31 @@ import java.lang.reflect.Constructor;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TimeZone;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.measure.bitmap.BitmapMeasureType;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.stream.core.consumer.IConsumerProvider;
 import org.apache.kylin.stream.core.consumer.StreamingConsumerChannel;
+import org.apache.kylin.stream.core.dict.StreamingDictionaryClient;
+import org.apache.kylin.stream.core.dict.StreamingDistributedDictionary;
 import org.apache.kylin.stream.core.model.StreamingMessage;
 import org.apache.kylin.stream.core.model.stats.LongLatencyInfo;
 import org.apache.kylin.stream.core.model.stats.SegmentStats;
@@ -61,6 +71,7 @@ public class StreamingSegmentManager implements Closeable {
     private static Logger logger = 
LoggerFactory.getLogger(StreamingSegmentManager.class);
     private final String cubeName;
     private final CubeInstance cubeInstance;
+    private AtomicBoolean closed = new AtomicBoolean(false);
 
     /**
      * Cube window defines how streaming events are divided and put into 
different segments , for example 1 hour per segment(indexer).
@@ -105,6 +116,8 @@ public class StreamingSegmentManager implements Closeable {
     private AtomicLong dropCounts = new AtomicLong();
     private volatile long latestEventTime = 0;
     private volatile long latestEventIngestTime = 0;
+    private Map<TblColRef, Dictionary<String>> dictionaryMap = new HashMap<>();
+    private StreamingDictionaryClient streamingDictionaryClient;
 
     public StreamingSegmentManager(String baseStorePath, CubeInstance 
cubeInstance, ISourcePositionHandler sourcePosHandler, IConsumerProvider 
consumerProvider) {
         this.baseStorePath = baseStorePath;
@@ -125,6 +138,27 @@ public class StreamingSegmentManager implements Closeable {
         this.longLatencyInfo = new LongLatencyInfo();
         this.checkPointStore = new CheckPointStore(cubeName, cubeDataFolder, 
cubeInstance.getConfig()
                 .getStreamingCheckPointFileMaxNum());
+
+        // Prepare for realtime dictionary encoder.
+        CubeInstance cube = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+        List<MeasureDesc> bitmapMeasureList = 
cube.getDescriptor().getMeasures().stream()
+                .filter(measureDesc -> 
measureDesc.getFunction().getMeasureType() instanceof BitmapMeasureType)
+                .collect(Collectors.toList());
+        if (!bitmapMeasureList.isEmpty()) {
+            List<String> realtimeDictColumn = bitmapMeasureList.stream()
+                    .map(measureDesc -> 
measureDesc.getFunction().getParameter().getColRef().getIdentity())
+                    .collect(Collectors.toList());
+            String str = String.join(", ", realtimeDictColumn);
+            logger.info("Find these columns {} need to be encoded realtime.", 
str);
+            List<TblColRef> tblColRefs = bitmapMeasureList.stream()
+                    .map(measureDesc -> 
measureDesc.getFunction().getParameter().getColRef())
+                    .collect(Collectors.toList());
+
+            streamingDictionaryClient = new StreamingDictionaryClient(cubeName,
+                    realtimeDictColumn.toArray(new String[0]));
+            tblColRefs.forEach(col -> dictionaryMap.put(col,
+                    new StreamingDistributedDictionary(col.getIdentity(), 
streamingDictionaryClient)));
+        }
     }
 
     public void addEvent(StreamingMessage event) {
@@ -365,6 +399,7 @@ public class StreamingSegmentManager implements Closeable {
             Constructor<IStreamingSegmentStore> constructor = 
clazz.getConstructor(String.class, CubeInstance.class,
                     String.class);
             segmentStore = constructor.newInstance(baseStorePath, 
cubeInstance, segmentName);
+            segmentStore.addExternalDict(dictionaryMap);
         } catch (Exception e) {
             logger.warn("Fail to construct an instance for " + storeClassName
                     + ". Will use the default store: ColumnarSegmentStore");
@@ -556,6 +591,10 @@ public class StreamingSegmentManager implements Closeable {
 
     @Override
     public void close() {
+        if (closed.get()){
+            logger.debug("Already close it, skip.");
+            return;
+        }
         logger.warn("Closing Streaming Cube store, cubeName={}", cubeName);
         checkpoint();
         logger.warn("{} ingested {} , dropped {}, long latency {}", cubeName, 
ingestCount.get(), dropCounts.get(),
@@ -567,6 +606,10 @@ public class StreamingSegmentManager implements Closeable {
                 logger.error("fail to close cube segment, segment :" + 
cubesegment.getSegmentName(), e);
             }
         }
+        if (streamingDictionaryClient != null) {
+            streamingDictionaryClient.close();
+        }
+        closed.set(true);
     }
 
     public synchronized void checkpoint() {
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java
index 7d61e52..14a92f7 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java
@@ -24,6 +24,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -35,7 +36,9 @@ import 
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 import org.apache.commons.io.FileUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.stream.core.exception.IllegalStorageException;
 import org.apache.kylin.stream.core.metrics.StreamingMetrics;
 import org.apache.kylin.stream.core.model.StreamingMessage;
@@ -92,6 +95,8 @@ public class ColumnarSegmentStore implements 
IStreamingSegmentStore {
     private List<DataSegmentFragment> fragments = 
Lists.newCopyOnWriteArrayList();
     protected int latestCheckpointFragment = 0;
 
+    private Map<TblColRef, Dictionary<String>> dictionaryMap;
+
     public ColumnarSegmentStore(String baseStorePath, CubeInstance 
cubeInstance, String segmentName) {
         this.maxRowsInMemory = 
cubeInstance.getConfig().getStreamingIndexMaxRows();
         this.baseStorePath = baseStorePath;
@@ -139,6 +144,12 @@ public class ColumnarSegmentStore implements 
IStreamingSegmentStore {
     }
 
     @Override
+    public void addExternalDict(Map<TblColRef, Dictionary<String>> dictMap) {
+        this.dictionaryMap = dictMap;
+        this.activeMemoryStore.setDictionaryMap(dictMap);
+    }
+
+    @Override
     public File getStorePath() {
         return dataSegmentFolder;
     }
@@ -163,6 +174,7 @@ public class ColumnarSegmentStore implements 
IStreamingSegmentStore {
             newFragment = createNewFragment();
             persistingMemoryStore = activeMemoryStore;
             activeMemoryStore = new 
SegmentMemoryStore(parsedStreamingCubeInfo, segmentName);
+            activeMemoryStore.setDictionaryMap(dictionaryMap);
         } finally {
             persistWriteLock.unlock();
         }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java
index 1b0bb6a..243fd05 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java
@@ -28,7 +28,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
-
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.measure.MeasureAggregators;
 import org.apache.kylin.measure.topn.TopNAggregator;
@@ -71,6 +71,12 @@ public class SegmentMemoryStore implements 
IStreamingGTSearcher {
     private long minEventTime = Long.MAX_VALUE;
     private long maxEventTime = 0;
 
+    private Map<TblColRef, Dictionary<String>> dictionaryMap;
+
+    public void setDictionaryMap(Map<TblColRef, Dictionary<String>> 
dictionaryMap) {
+        this.dictionaryMap = dictionaryMap;
+    }
+
     public SegmentMemoryStore(ParsedStreamingCubeInfo parsedStreamingCubeInfo, 
String segmentName) {
         this.parsedStreamingCubeInfo = parsedStreamingCubeInfo;
         this.segmentName = segmentName;
@@ -84,7 +90,6 @@ public class SegmentMemoryStore implements 
IStreamingGTSearcher {
                         StringArrayComparator.INSTANCE));
             }
         }
-
     }
 
     public int index(StreamingMessage event) {
@@ -162,8 +167,7 @@ public class SegmentMemoryStore implements 
IStreamingGTSearcher {
             }
             inputToMeasure[i] = value;
         }
-
-        return 
parsedStreamingCubeInfo.measureIngesters[idxOfMeasure].valueOf(inputToMeasure, 
measure, null);
+        return 
parsedStreamingCubeInfo.measureIngesters[idxOfMeasure].valueOf(inputToMeasure, 
measure, dictionaryMap);
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
 
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
index 408a5e7a..95630c6 100644
--- 
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
+++ 
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
@@ -608,6 +608,7 @@ public class StreamingServer implements 
ReplicaSetLeaderSelector.LeaderChangeLis
         StreamingSegmentManager segmentManager = 
getStreamingSegmentManager(cubeName);
         if (segmentManager != null) {
             streamingSegmentManagerMap.remove(cubeName);
+            segmentManager.close();
             segmentManager.purgeAllSegments();
         }
     }

Reply via email to