http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java 
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
index db1a170..f2edcb4 100644
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
@@ -1,285 +1,285 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import com.google.common.base.Strings;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.dict.StringBytesConverter;
-import org.apache.kylin.dict.TrieDictionary;
-import org.apache.kylin.dict.TrieDictionaryBuilder;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.ReadableTable;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- * @author yangli9
- */
-@SuppressWarnings("serial")
-@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = 
Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = 
Visibility.NONE)
-public class SnapshotTable extends RootPersistentEntity implements 
ReadableTable {
-
-    @JsonProperty("tableName")
-    private String tableName;
-    @JsonProperty("signature")
-    private TableSignature signature;
-    @JsonProperty("useDictionary")
-    private boolean useDictionary;
-
-    private ArrayList<int[]> rowIndices;
-    private Dictionary<String> dict;
-
-    // default constructor for JSON serialization
-    public SnapshotTable() {
-    }
-
-    SnapshotTable(ReadableTable table, String tableName) throws IOException {
-        this.tableName = tableName;
-        this.signature = table.getSignature();
-        this.useDictionary = true;
-    }
-
-    public void takeSnapshot(ReadableTable table, TableDesc tableDesc) throws 
IOException {
-        this.signature = table.getSignature();
-
-        int maxIndex = tableDesc.getMaxColumnIndex();
-
-        TrieDictionaryBuilder<String> b = new 
TrieDictionaryBuilder<String>(new StringBytesConverter());
-
-        TableReader reader = table.getReader();
-        try {
-            while (reader.next()) {
-                String[] row = reader.getRow();
-                if (row.length <= maxIndex) {
-                    throw new IllegalStateException("Bad hive table row, " + 
tableDesc + " expect " + (maxIndex + 1) + " columns, but got " + 
Arrays.toString(row));
-                }
-                for (ColumnDesc column : tableDesc.getColumns()) {
-                    String cell = row[column.getZeroBasedIndex()];
-                    if (cell != null)
-                        b.addValue(cell);
-                }
-            }
-        } finally {
-            IOUtils.closeQuietly(reader);
-        }
-
-        this.dict = b.build(0);
-
-        ArrayList<int[]> allRowIndices = new ArrayList<int[]>();
-        reader = table.getReader();
-        try {
-            while (reader.next()) {
-                String[] row = reader.getRow();
-                int[] rowIndex = new int[tableDesc.getColumnCount()];
-                for (ColumnDesc column : tableDesc.getColumns()) {
-                    rowIndex[column.getZeroBasedIndex()] = 
dict.getIdFromValue(row[column.getZeroBasedIndex()]);
-                }
-                allRowIndices.add(rowIndex);
-            }
-        } finally {
-            IOUtils.closeQuietly(reader);
-        }
-
-        this.rowIndices = allRowIndices;
-    }
-
-    public String getResourcePath() {
-        return getResourceDir() + "/" + uuid + ".snapshot";
-    }
-
-    public String getResourceDir() {
-        if (Strings.isNullOrEmpty(tableName)) {
-            return getOldResourceDir();
-        } else {
-            return ResourceStore.SNAPSHOT_RESOURCE_ROOT + "/" + tableName;
-        }
-    }
-
-    private String getOldResourceDir() {
-        return ResourceStore.SNAPSHOT_RESOURCE_ROOT + "/" + new 
File(signature.getPath()).getName();
-    }
-
-    @Override
-    public TableReader getReader() throws IOException {
-        return new TableReader() {
-
-            int i = -1;
-
-            @Override
-            public boolean next() throws IOException {
-                i++;
-                return i < rowIndices.size();
-            }
-
-            @Override
-            public String[] getRow() {
-                int[] rowIndex = rowIndices.get(i);
-                String[] row = new String[rowIndex.length];
-                for (int x = 0; x < row.length; x++) {
-                    row[x] = dict.getValueFromId(rowIndex[x]);
-                }
-                return row;
-            }
-
-            @Override
-            public void close() throws IOException {
-            }
-        };
-    }
-
-    @Override
-    public TableSignature getSignature() throws IOException {
-        return signature;
-    }
-
-    /**
-     * a naive implementation
-     *
-     * @return
-     */
-    @Override
-    public int hashCode() {
-        int[] parts = new int[this.rowIndices.size()];
-        for (int i = 0; i < parts.length; ++i)
-            parts[i] = Arrays.hashCode(this.rowIndices.get(i));
-        return Arrays.hashCode(parts);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if ((o instanceof SnapshotTable) == false)
-            return false;
-        SnapshotTable that = (SnapshotTable) o;
-
-        if (this.dict.equals(that.dict) == false)
-            return false;
-
-        //compare row by row
-        if (this.rowIndices.size() != that.rowIndices.size())
-            return false;
-        for (int i = 0; i < this.rowIndices.size(); ++i) {
-            if (!ArrayUtils.isEquals(this.rowIndices.get(i), 
that.rowIndices.get(i)))
-                return false;
-        }
-
-        return true;
-    }
-
-    private static String NULL_STR;
-    {
-        try {
-            // a special placeholder to indicate a NULL; 0, 9, 127, 255 are a 
few invisible ASCII characters
-            NULL_STR = new String(new byte[] { 0, 9, 127, (byte) 255 }, 
"ISO-8859-1");
-        } catch (UnsupportedEncodingException e) {
-            // does not happen
-        }
-    }
-
-    void writeData(DataOutput out) throws IOException {
-        out.writeInt(rowIndices.size());
-        if (rowIndices.size() > 0) {
-            int n = rowIndices.get(0).length;
-            out.writeInt(n);
-
-            if (this.useDictionary == true) {
-                dict.write(out);
-                for (int i = 0; i < rowIndices.size(); i++) {
-                    int[] row = rowIndices.get(i);
-                    for (int j = 0; j < n; j++) {
-                        out.writeInt(row[j]);
-                    }
-                }
-
-            } else {
-                for (int i = 0; i < rowIndices.size(); i++) {
-                    int[] row = rowIndices.get(i);
-                    for (int j = 0; j < n; j++) {
-                        // NULL_STR is tricky, but we don't want to break the 
current snapshots
-                        out.writeUTF(dict.getValueFromId(row[j]) == null ? 
NULL_STR : dict.getValueFromId(row[j]));
-                    }
-                }
-            }
-        }
-    }
-
-    void readData(DataInput in) throws IOException {
-        int rowNum = in.readInt();
-        if (rowNum > 0) {
-            int n = in.readInt();
-            rowIndices = new ArrayList<int[]>(rowNum);
-
-            if (this.useDictionary == true) {
-                this.dict = new TrieDictionary<String>();
-                dict.readFields(in);
-
-                for (int i = 0; i < rowNum; i++) {
-                    int[] row = new int[n];
-                    this.rowIndices.add(row);
-                    for (int j = 0; j < n; j++) {
-                        row[j] = in.readInt();
-                    }
-                }
-            } else {
-                List<String[]> rows = new ArrayList<String[]>(rowNum);
-                TrieDictionaryBuilder<String> b = new 
TrieDictionaryBuilder<String>(new StringBytesConverter());
-
-                for (int i = 0; i < rowNum; i++) {
-                    String[] row = new String[n];
-                    rows.add(row);
-                    for (int j = 0; j < n; j++) {
-                        row[j] = in.readUTF();
-                        // NULL_STR is tricky, but we don't want to break the 
current snapshots
-                        if (row[j].equals(NULL_STR))
-                            row[j] = null;
-
-                        b.addValue(row[j]);
-                    }
-                }
-                this.dict = b.build(0);
-                for (String[] row : rows) {
-                    int[] rowIndex = new int[n];
-                    for (int i = 0; i < n; i++) {
-                        rowIndex[i] = dict.getIdFromValue(row[i]);
-                    }
-                    this.rowIndices.add(rowIndex);
-                }
-            }
-        } else {
-            rowIndices = new ArrayList<int[]>();
-            dict = new TrieDictionary<String>();
-        }
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.common.base.Strings;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.dict.StringBytesConverter;
+import org.apache.kylin.dict.TrieDictionary;
+import org.apache.kylin.dict.TrieDictionaryBuilder;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * @author yangli9
+ */
+@SuppressWarnings("serial")
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = 
Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = 
Visibility.NONE)
+public class SnapshotTable extends RootPersistentEntity implements 
ReadableTable {
+
+    @JsonProperty("tableName")
+    private String tableName;
+    @JsonProperty("signature")
+    private TableSignature signature;
+    @JsonProperty("useDictionary")
+    private boolean useDictionary;
+
+    private ArrayList<int[]> rowIndices;
+    private Dictionary<String> dict;
+
+    // default constructor for JSON serialization
+    public SnapshotTable() {
+    }
+
+    SnapshotTable(ReadableTable table, String tableName) throws IOException {
+        this.tableName = tableName;
+        this.signature = table.getSignature();
+        this.useDictionary = true;
+    }
+
+    public void takeSnapshot(ReadableTable table, TableDesc tableDesc) throws 
IOException {
+        this.signature = table.getSignature();
+
+        int maxIndex = tableDesc.getMaxColumnIndex();
+
+        TrieDictionaryBuilder<String> b = new 
TrieDictionaryBuilder<String>(new StringBytesConverter());
+
+        TableReader reader = table.getReader();
+        try {
+            while (reader.next()) {
+                String[] row = reader.getRow();
+                if (row.length <= maxIndex) {
+                    throw new IllegalStateException("Bad hive table row, " + 
tableDesc + " expect " + (maxIndex + 1) + " columns, but got " + 
Arrays.toString(row));
+                }
+                for (ColumnDesc column : tableDesc.getColumns()) {
+                    String cell = row[column.getZeroBasedIndex()];
+                    if (cell != null)
+                        b.addValue(cell);
+                }
+            }
+        } finally {
+            IOUtils.closeQuietly(reader);
+        }
+
+        this.dict = b.build(0);
+
+        ArrayList<int[]> allRowIndices = new ArrayList<int[]>();
+        reader = table.getReader();
+        try {
+            while (reader.next()) {
+                String[] row = reader.getRow();
+                int[] rowIndex = new int[tableDesc.getColumnCount()];
+                for (ColumnDesc column : tableDesc.getColumns()) {
+                    rowIndex[column.getZeroBasedIndex()] = 
dict.getIdFromValue(row[column.getZeroBasedIndex()]);
+                }
+                allRowIndices.add(rowIndex);
+            }
+        } finally {
+            IOUtils.closeQuietly(reader);
+        }
+
+        this.rowIndices = allRowIndices;
+    }
+
+    public String getResourcePath() {
+        return getResourceDir() + "/" + uuid + ".snapshot";
+    }
+
+    public String getResourceDir() {
+        if (Strings.isNullOrEmpty(tableName)) {
+            return getOldResourceDir();
+        } else {
+            return ResourceStore.SNAPSHOT_RESOURCE_ROOT + "/" + tableName;
+        }
+    }
+
+    private String getOldResourceDir() {
+        return ResourceStore.SNAPSHOT_RESOURCE_ROOT + "/" + new 
File(signature.getPath()).getName();
+    }
+
+    @Override
+    public TableReader getReader() throws IOException {
+        return new TableReader() {
+
+            int i = -1;
+
+            @Override
+            public boolean next() throws IOException {
+                i++;
+                return i < rowIndices.size();
+            }
+
+            @Override
+            public String[] getRow() {
+                int[] rowIndex = rowIndices.get(i);
+                String[] row = new String[rowIndex.length];
+                for (int x = 0; x < row.length; x++) {
+                    row[x] = dict.getValueFromId(rowIndex[x]);
+                }
+                return row;
+            }
+
+            @Override
+            public void close() throws IOException {
+            }
+        };
+    }
+
+    @Override
+    public TableSignature getSignature() throws IOException {
+        return signature;
+    }
+
+    /**
+     * a naive implementation
+     *
+     * @return
+     */
+    @Override
+    public int hashCode() {
+        int[] parts = new int[this.rowIndices.size()];
+        for (int i = 0; i < parts.length; ++i)
+            parts[i] = Arrays.hashCode(this.rowIndices.get(i));
+        return Arrays.hashCode(parts);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if ((o instanceof SnapshotTable) == false)
+            return false;
+        SnapshotTable that = (SnapshotTable) o;
+
+        if (this.dict.equals(that.dict) == false)
+            return false;
+
+        //compare row by row
+        if (this.rowIndices.size() != that.rowIndices.size())
+            return false;
+        for (int i = 0; i < this.rowIndices.size(); ++i) {
+            if (!ArrayUtils.isEquals(this.rowIndices.get(i), 
that.rowIndices.get(i)))
+                return false;
+        }
+
+        return true;
+    }
+
+    private static String NULL_STR;
+    {
+        try {
+            // a special placeholder to indicate a NULL; 0, 9, 127, 255 are a 
few invisible ASCII characters
+            NULL_STR = new String(new byte[] { 0, 9, 127, (byte) 255 }, 
"ISO-8859-1");
+        } catch (UnsupportedEncodingException e) {
+            // does not happen
+        }
+    }
+
+    void writeData(DataOutput out) throws IOException {
+        out.writeInt(rowIndices.size());
+        if (rowIndices.size() > 0) {
+            int n = rowIndices.get(0).length;
+            out.writeInt(n);
+
+            if (this.useDictionary == true) {
+                dict.write(out);
+                for (int i = 0; i < rowIndices.size(); i++) {
+                    int[] row = rowIndices.get(i);
+                    for (int j = 0; j < n; j++) {
+                        out.writeInt(row[j]);
+                    }
+                }
+
+            } else {
+                for (int i = 0; i < rowIndices.size(); i++) {
+                    int[] row = rowIndices.get(i);
+                    for (int j = 0; j < n; j++) {
+                        // NULL_STR is tricky, but we don't want to break the 
current snapshots
+                        out.writeUTF(dict.getValueFromId(row[j]) == null ? 
NULL_STR : dict.getValueFromId(row[j]));
+                    }
+                }
+            }
+        }
+    }
+
+    void readData(DataInput in) throws IOException {
+        int rowNum = in.readInt();
+        if (rowNum > 0) {
+            int n = in.readInt();
+            rowIndices = new ArrayList<int[]>(rowNum);
+
+            if (this.useDictionary == true) {
+                this.dict = new TrieDictionary<String>();
+                dict.readFields(in);
+
+                for (int i = 0; i < rowNum; i++) {
+                    int[] row = new int[n];
+                    this.rowIndices.add(row);
+                    for (int j = 0; j < n; j++) {
+                        row[j] = in.readInt();
+                    }
+                }
+            } else {
+                List<String[]> rows = new ArrayList<String[]>(rowNum);
+                TrieDictionaryBuilder<String> b = new 
TrieDictionaryBuilder<String>(new StringBytesConverter());
+
+                for (int i = 0; i < rowNum; i++) {
+                    String[] row = new String[n];
+                    rows.add(row);
+                    for (int j = 0; j < n; j++) {
+                        row[j] = in.readUTF();
+                        // NULL_STR is tricky, but we don't want to break the 
current snapshots
+                        if (row[j].equals(NULL_STR))
+                            row[j] = null;
+
+                        b.addValue(row[j]);
+                    }
+                }
+                this.dict = b.build(0);
+                for (String[] row : rows) {
+                    int[] rowIndex = new int[n];
+                    for (int i = 0; i < n; i++) {
+                        rowIndex[i] = dict.getIdFromValue(row[i]);
+                    }
+                    this.rowIndices.add(rowIndex);
+                }
+            }
+        } else {
+            rowIndices = new ArrayList<int[]>();
+            dict = new TrieDictionary<String>();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTableSerializer.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTableSerializer.java
 
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTableSerializer.java
index f5663a5..02164d6 100644
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTableSerializer.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTableSerializer.java
@@ -1,79 +1,79 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.common.util.JsonUtil;
-
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * @author yangli9
- * 
- */
-public class SnapshotTableSerializer implements Serializer<SnapshotTable> {
-
-    public static final SnapshotTableSerializer FULL_SERIALIZER = new 
SnapshotTableSerializer(false);
-    public static final SnapshotTableSerializer INFO_SERIALIZER = new 
SnapshotTableSerializer(true);
-
-    private boolean infoOnly;
-
-    SnapshotTableSerializer(boolean infoOnly) {
-        this.infoOnly = infoOnly;
-    }
-
-    @Override
-    public void serialize(SnapshotTable obj, DataOutputStream out) throws 
IOException {
-        String json = JsonUtil.writeValueAsIndentString(obj);
-        out.writeUTF(json);
-
-        if (infoOnly == false)
-            obj.writeData(out);
-    }
-
-    @Override
-    public SnapshotTable deserialize(DataInputStream in) throws IOException {
-        String json = in.readUTF();
-        SnapshotTable obj = JsonUtil.readValue(json, SnapshotTable.class);
-
-        if (infoOnly == false)
-            obj.readData(in);
-
-        return obj;
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.JsonUtil;
+
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @author yangli9
+ * 
+ */
+public class SnapshotTableSerializer implements Serializer<SnapshotTable> {
+
+    public static final SnapshotTableSerializer FULL_SERIALIZER = new 
SnapshotTableSerializer(false);
+    public static final SnapshotTableSerializer INFO_SERIALIZER = new 
SnapshotTableSerializer(true);
+
+    private boolean infoOnly;
+
+    SnapshotTableSerializer(boolean infoOnly) {
+        this.infoOnly = infoOnly;
+    }
+
+    @Override
+    public void serialize(SnapshotTable obj, DataOutputStream out) throws 
IOException {
+        String json = JsonUtil.writeValueAsIndentString(obj);
+        out.writeUTF(json);
+
+        if (infoOnly == false)
+            obj.writeData(out);
+    }
+
+    @Override
+    public SnapshotTable deserialize(DataInputStream in) throws IOException {
+        String json = in.readUTF();
+        SnapshotTable obj = JsonUtil.readValue(json, SnapshotTable.class);
+
+        if (infoOnly == false)
+            obj.readData(in);
+
+        return obj;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
 
b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
index 4266f2a..5e1705a 100644
--- 
a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
+++ 
b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
@@ -59,7 +59,7 @@ public class AppendTrieDictionaryTest {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         config.setAppendDictEntrySize(50000);
         config.setAppendDictCacheSize(3);
-        config.setProperty("kylin.hdfs.working.dir", BASE_DIR);
+        config.setProperty("kylin.env.hdfs-working-dir", BASE_DIR);
     }
 
     @AfterClass

http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 3937a24..be07d76 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -57,9 +57,9 @@ import com.google.common.collect.Maps;
  * schedule the cubing jobs when several job server running with the same 
metadata.
  *
  * to enable the distributed job server, you need to set and update three 
configs in the kylin.properties:
- *  1. kylin.enable.scheduler=2
- *  2. 
kylin.job.controller.lock=org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock
- *  3. add all the job servers and query servers to the kylin.rest.servers
+ *  1. kylin.job.scheduler.default=2
+ *  2. 
kylin.job.lock=org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock
+ *  3. add all the job servers and query servers to the 
kylin.server.cluster-servers
  */
 public class DistributedScheduler implements Scheduler<AbstractExecutable>, 
ConnectionStateListener {
     private ExecutableManager executableManager;

http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
index 0fc96c3..500a5b5 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
@@ -1,52 +1,52 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata;
-
-/**
- * Constances to describe metadata and it's change.
- * 
- */
-public interface MetadataConstants {
-
-    public static final String FILE_SURFIX = ".json";
-
-    // Extended attribute keys
-    public static final String TABLE_EXD_STATUS_KEY = "EXD_STATUS";
-    public static final String TABLE_EXD_MINFS = "minFileSize";
-    public static final String TABLE_EXD_TNF = "totalNumberFiles";
-    public static final String TABLE_EXD_LOCATION = "location";
-    public static final String TABLE_EXD_LUT = "lastUpdateTime";
-    public static final String TABLE_EXD_LAT = "lastAccessTime";
-    public static final String TABLE_EXD_COLUMN = "columns";
-    public static final String TABLE_EXD_PC = "partitionColumns";
-    public static final String TABLE_EXD_MAXFS = "maxFileSize";
-    public static final String TABLE_EXD_IF = "inputformat";
-    public static final String TABLE_EXD_PARTITIONED = "partitioned";
-    public static final String TABLE_EXD_TABLENAME = "tableName";
-    public static final String TABLE_EXD_OWNER = "owner";
-    public static final String TABLE_EXD_TFS = "totalFileSize";
-    public static final String TABLE_EXD_OF = "outputformat";
-    /**
-     * The value is an array
-     */
-    public static final String TABLE_EXD_CARDINALITY = "cardinality";
-    public static final String TABLE_EXD_DELIM = "delim";
-    public static final String TABLE_EXD_DEFAULT_VALUE = "unknown";
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata;
+
+/**
+ * Constances to describe metadata and it's change.
+ * 
+ */
+public interface MetadataConstants {
+
+    public static final String FILE_SURFIX = ".json";
+
+    // Extended attribute keys
+    public static final String TABLE_EXD_STATUS_KEY = "EXD_STATUS";
+    public static final String TABLE_EXD_MINFS = "minFileSize";
+    public static final String TABLE_EXD_TNF = "totalNumberFiles";
+    public static final String TABLE_EXD_LOCATION = "location";
+    public static final String TABLE_EXD_LUT = "lastUpdateTime";
+    public static final String TABLE_EXD_LAT = "lastAccessTime";
+    public static final String TABLE_EXD_COLUMN = "columns";
+    public static final String TABLE_EXD_PC = "partitionColumns";
+    public static final String TABLE_EXD_MAXFS = "maxFileSize";
+    public static final String TABLE_EXD_IF = "inputformat";
+    public static final String TABLE_EXD_PARTITIONED = "partitioned";
+    public static final String TABLE_EXD_TABLENAME = "tableName";
+    public static final String TABLE_EXD_OWNER = "owner";
+    public static final String TABLE_EXD_TFS = "totalFileSize";
+    public static final String TABLE_EXD_OF = "outputformat";
+    /**
+     * The value is an array
+     */
+    public static final String TABLE_EXD_CARDINALITY = "cardinality";
+    public static final String TABLE_EXD_DELIM = "delim";
+    public static final String TABLE_EXD_DEFAULT_VALUE = "unknown";
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index 8d34cc0..6bc5771 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -102,7 +102,7 @@ public class Broadcaster {
 
         final String[] nodes = config.getRestServers();
         if (nodes == null || nodes.length < 1) {
-            logger.warn("There is no available rest server; check the 
'kylin.rest.servers' config");
+            logger.warn("There is no available rest server; check the 
'kylin.server.cluster-servers' config");
             broadcastEvents = null; // disable the broadcaster
             return;
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
index 630156f..5d0409a 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
@@ -1,77 +1,77 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.model;
-
-import org.apache.kylin.metadata.model.DataModelDesc.TableKind;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.model;
+
+import org.apache.kylin.metadata.model.DataModelDesc.TableKind;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = 
Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = 
Visibility.NONE)
-public class JoinTableDesc {
-
-    @JsonProperty("table")
-    private String table;
-
-    @JsonProperty("kind")
-    @JsonInclude(JsonInclude.Include.NON_NULL)
-    private TableKind kind = TableKind.LOOKUP;
-    
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = 
Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = 
Visibility.NONE)
+public class JoinTableDesc {
+
+    @JsonProperty("table")
+    private String table;
+
+    @JsonProperty("kind")
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private TableKind kind = TableKind.LOOKUP;
+    
     @JsonProperty("alias")
     @JsonInclude(JsonInclude.Include.NON_NULL)
     private String alias;
     
-    @JsonProperty("join")
-    private JoinDesc join;
-    
+    @JsonProperty("join")
+    private JoinDesc join;
+    
     private TableRef tableRef;
-
-    public String getTable() {
-        return table;
-    }
-
+
+    public String getTable() {
+        return table;
+    }
+
     void setTable(String table) {
-        this.table = table;
-    }
-
-    public TableKind getKind() {
-        return kind;
-    }
-    
+        this.table = table;
+    }
+
+    public TableKind getKind() {
+        return kind;
+    }
+    
     public String getAlias() {
         return alias;
     }
     
-    public JoinDesc getJoin() {
-        return join;
-    }
-
+    public JoinDesc getJoin() {
+        return join;
+    }
+
     public TableRef getTableRef() {
         return tableRef;
-    }
-
+    }
+
     void setTableRef(TableRef ref) {
         this.tableRef = ref;
-    }
-    
-    
-
-}
+    }
+    
+    
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
index 742cfba..c4042c6 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
@@ -1,43 +1,43 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.tuple;
-
-import java.util.List;
-
-import org.apache.kylin.metadata.model.TblColRef;
-
-/**
- * Tuple is a record row, contains multiple values being lookup by either field
- * (calcite notion) or column (kylin notion).
- * 
- * @author yangli9
- */
-public interface ITuple extends IEvaluatableTuple, Cloneable {
-
-    List<String> getAllFields();
-
-    List<TblColRef> getAllColumns();
-
-    Object[] getAllValues();
-
-    ITuple makeCopy();
-
-    // declared from IEvaluatableTuple:  public Object getValue(TblColRef col);
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.tuple;
+
+import java.util.List;
+
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * Tuple is a record row, contains multiple values being lookup by either field
+ * (calcite notion) or column (kylin notion).
+ * 
+ * @author yangli9
+ */
+public interface ITuple extends IEvaluatableTuple, Cloneable {
+
+    List<String> getAllFields();
+
+    List<TblColRef> getAllColumns();
+
+    Object[] getAllValues();
+
+    ITuple makeCopy();
+
+    // declared from IEvaluatableTuple:  public Object getValue(TblColRef col);
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java 
b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index b338b3c..719cab6 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -131,7 +131,7 @@ public class StorageContext {
         if (!realization.supportsLimitPushDown()) {
             logger.info("Not enabling limit push down because cube storage 
type not supported");
         } else if (tempPushDownLimit > pushDownLimitMax) {
-            logger.info("Not enabling limit push down because the 
limit(including offset) {} is larger than kylin.query.pushdown.limit.max {}", //
+            logger.info("Not enabling limit push down because the 
limit(including offset) {} is larger than kylin.query.max-limit-pushdown {}", //
                     tempPushDownLimit, pushDownLimitMax);
         } else {
             this.finalPushDownLimit = tempPushDownLimit;

http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
index 83fa32d..160338f 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
@@ -1,200 +1,200 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.translate;
-
-import java.util.List;
-import java.util.Set;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Array;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.kv.RowKeyColumnOrder;
-import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
-import org.apache.kylin.cube.model.CubeDesc.DeriveType;
-import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.StringCodeSystem;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * @author yangli9
- */
-public class DerivedFilterTranslator {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(DerivedFilterTranslator.class);
-
-    public static Pair<TupleFilter, Boolean> translate(LookupStringTable 
lookup, DeriveInfo hostInfo, CompareTupleFilter compf) {
-
-        TblColRef derivedCol = compf.getColumn();
-        TblColRef[] hostCols = hostInfo.columns;
-        TblColRef[] pkCols = 
hostInfo.dimension.getJoin().getPrimaryKeyColumns();
-
-        if (hostInfo.type == DeriveType.PK_FK) {
-            assert hostCols.length == 1;
-            CompareTupleFilter newComp = new 
CompareTupleFilter(compf.getOperator());
-            newComp.addChild(new ColumnTupleFilter(hostCols[0]));
-            newComp.addChild(new ConstantTupleFilter(compf.getValues()));
-            return new Pair<TupleFilter, Boolean>(newComp, false);
-        }
-
-        assert hostInfo.type == DeriveType.LOOKUP;
-        assert hostCols.length == pkCols.length;
-
-        int di = derivedCol.getColumnDesc().getZeroBasedIndex();
-        int[] pi = new int[pkCols.length];
-        int hn = hostCols.length;
-        for (int i = 0; i < hn; i++) {
-            pi[i] = pkCols[i].getColumnDesc().getZeroBasedIndex();
-        }
-
-        Set<Array<String>> satisfyingHostRecords = Sets.newHashSet();
-        SingleColumnTuple tuple = new SingleColumnTuple(derivedCol);
-        for (String[] row : lookup.getAllRows()) {
-            tuple.value = row[di];
-            if (compf.evaluate(tuple, StringCodeSystem.INSTANCE)) {
-                collect(row, pi, satisfyingHostRecords);
-            }
-        }
-
-        TupleFilter translated;
-        boolean loosened;
-        if (satisfyingHostRecords.size() > 
KylinConfig.getInstanceFromEnv().getDerivedInThreshold()) {
-            logger.info("Deciding to loosen filter on derived filter as host 
candidates number {} exceeds threshold {}", //
-                    satisfyingHostRecords.size(), 
KylinConfig.getInstanceFromEnv().getDerivedInThreshold()
-            );
-            translated = buildRangeFilter(hostCols, satisfyingHostRecords);
-            loosened = true;
-        } else {
-            translated = buildInFilter(hostCols, satisfyingHostRecords);
-            loosened = false;
-        }
-
-        return new Pair<TupleFilter, Boolean>(translated, loosened);
-    }
-
-    private static void collect(String[] row, int[] pi, Set<Array<String>> 
satisfyingHostRecords) {
-        // TODO when go beyond IN_THRESHOLD, only keep min/max is enough
-        String[] rec = new String[pi.length];
-        for (int i = 0; i < pi.length; i++) {
-            rec[i] = row[pi[i]];
-        }
-        satisfyingHostRecords.add(new Array<String>(rec));
-    }
-
-    private static TupleFilter buildInFilter(TblColRef[] hostCols, 
Set<Array<String>> satisfyingHostRecords) {
-        if (satisfyingHostRecords.size() == 0) {
-            return ConstantTupleFilter.FALSE;
-        }
-
-        int hn = hostCols.length;
-        if (hn == 1) {
-            CompareTupleFilter in = new 
CompareTupleFilter(FilterOperatorEnum.IN);
-            in.addChild(new ColumnTupleFilter(hostCols[0]));
-            in.addChild(new 
ConstantTupleFilter(asValues(satisfyingHostRecords)));
-            return in;
-        } else {
-            LogicalTupleFilter or = new 
LogicalTupleFilter(FilterOperatorEnum.OR);
-            for (Array<String> rec : satisfyingHostRecords) {
-                LogicalTupleFilter and = new 
LogicalTupleFilter(FilterOperatorEnum.AND);
-                for (int i = 0; i < hn; i++) {
-                    CompareTupleFilter eq = new 
CompareTupleFilter(FilterOperatorEnum.EQ);
-                    eq.addChild(new ColumnTupleFilter(hostCols[i]));
-                    eq.addChild(new ConstantTupleFilter(rec.data[i]));
-                    and.addChild(eq);
-                }
-                or.addChild(and);
-            }
-            return or;
-        }
-    }
-
-    private static List<String> asValues(Set<Array<String>> 
satisfyingHostRecords) {
-        List<String> values = 
Lists.newArrayListWithCapacity(satisfyingHostRecords.size());
-        for (Array<String> rec : satisfyingHostRecords) {
-            values.add(rec.data[0]);
-        }
-        return values;
-    }
-
-    private static LogicalTupleFilter buildRangeFilter(TblColRef[] hostCols, 
Set<Array<String>> satisfyingHostRecords) {
-        int hn = hostCols.length;
-        String[] min = new String[hn];
-        String[] max = new String[hn];
-        findMinMax(satisfyingHostRecords, hostCols, min, max);
-        LogicalTupleFilter and = new 
LogicalTupleFilter(FilterOperatorEnum.AND);
-        for (int i = 0; i < hn; i++) {
-            CompareTupleFilter compMin = new 
CompareTupleFilter(FilterOperatorEnum.GTE);
-            compMin.addChild(new ColumnTupleFilter(hostCols[i]));
-            compMin.addChild(new ConstantTupleFilter(min[i]));
-            and.addChild(compMin);
-            CompareTupleFilter compMax = new 
CompareTupleFilter(FilterOperatorEnum.LTE);
-            compMax.addChild(new ColumnTupleFilter(hostCols[i]));
-            compMax.addChild(new ConstantTupleFilter(max[i]));
-            and.addChild(compMax);
-        }
-        return and;
-    }
-
-    private static void findMinMax(Set<Array<String>> satisfyingHostRecords, 
TblColRef[] hostCols, String[] min, String[] max) {
-
-        RowKeyColumnOrder[] orders = new RowKeyColumnOrder[hostCols.length];
-        for (int i = 0; i < hostCols.length; i++) {
-            orders[i] = RowKeyColumnOrder.getInstance(hostCols[i].getType());
-        }
-
-        for (Array<String> rec : satisfyingHostRecords) {
-            String[] row = rec.data;
-            for (int i = 0; i < row.length; i++) {
-                min[i] = orders[i].min(min[i], row[i]);
-                max[i] = orders[i].max(max[i], row[i]);
-            }
-        }
-    }
-
-    private static class SingleColumnTuple implements IEvaluatableTuple {
-
-        private TblColRef col;
-        private String value;
-
-        SingleColumnTuple(TblColRef col) {
-            this.col = col;
-        }
-
-        @Override
-        public Object getValue(TblColRef col) {
-            if (this.col.equals(col))
-                return value;
-            else
-                throw new IllegalArgumentException("unexpected column " + col);
-        }
-
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.translate;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.kv.RowKeyColumnOrder;
+import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
+import org.apache.kylin.cube.model.CubeDesc.DeriveType;
+import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.StringCodeSystem;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * @author yangli9
+ */
+public class DerivedFilterTranslator {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(DerivedFilterTranslator.class);
+
+    public static Pair<TupleFilter, Boolean> translate(LookupStringTable 
lookup, DeriveInfo hostInfo, CompareTupleFilter compf) {
+
+        TblColRef derivedCol = compf.getColumn();
+        TblColRef[] hostCols = hostInfo.columns;
+        TblColRef[] pkCols = 
hostInfo.dimension.getJoin().getPrimaryKeyColumns();
+
+        if (hostInfo.type == DeriveType.PK_FK) {
+            assert hostCols.length == 1;
+            CompareTupleFilter newComp = new 
CompareTupleFilter(compf.getOperator());
+            newComp.addChild(new ColumnTupleFilter(hostCols[0]));
+            newComp.addChild(new ConstantTupleFilter(compf.getValues()));
+            return new Pair<TupleFilter, Boolean>(newComp, false);
+        }
+
+        assert hostInfo.type == DeriveType.LOOKUP;
+        assert hostCols.length == pkCols.length;
+
+        int di = derivedCol.getColumnDesc().getZeroBasedIndex();
+        int[] pi = new int[pkCols.length];
+        int hn = hostCols.length;
+        for (int i = 0; i < hn; i++) {
+            pi[i] = pkCols[i].getColumnDesc().getZeroBasedIndex();
+        }
+
+        Set<Array<String>> satisfyingHostRecords = Sets.newHashSet();
+        SingleColumnTuple tuple = new SingleColumnTuple(derivedCol);
+        for (String[] row : lookup.getAllRows()) {
+            tuple.value = row[di];
+            if (compf.evaluate(tuple, StringCodeSystem.INSTANCE)) {
+                collect(row, pi, satisfyingHostRecords);
+            }
+        }
+
+        TupleFilter translated;
+        boolean loosened;
+        if (satisfyingHostRecords.size() > 
KylinConfig.getInstanceFromEnv().getDerivedInThreshold()) {
+            logger.info("Deciding to loosen filter on derived filter as host 
candidates number {} exceeds threshold {}", //
+                    satisfyingHostRecords.size(), 
KylinConfig.getInstanceFromEnv().getDerivedInThreshold()
+            );
+            translated = buildRangeFilter(hostCols, satisfyingHostRecords);
+            loosened = true;
+        } else {
+            translated = buildInFilter(hostCols, satisfyingHostRecords);
+            loosened = false;
+        }
+
+        return new Pair<TupleFilter, Boolean>(translated, loosened);
+    }
+
+    private static void collect(String[] row, int[] pi, Set<Array<String>> 
satisfyingHostRecords) {
+        // TODO when go beyond IN_THRESHOLD, only keep min/max is enough
+        String[] rec = new String[pi.length];
+        for (int i = 0; i < pi.length; i++) {
+            rec[i] = row[pi[i]];
+        }
+        satisfyingHostRecords.add(new Array<String>(rec));
+    }
+
+    private static TupleFilter buildInFilter(TblColRef[] hostCols, 
Set<Array<String>> satisfyingHostRecords) {
+        if (satisfyingHostRecords.size() == 0) {
+            return ConstantTupleFilter.FALSE;
+        }
+
+        int hn = hostCols.length;
+        if (hn == 1) {
+            CompareTupleFilter in = new 
CompareTupleFilter(FilterOperatorEnum.IN);
+            in.addChild(new ColumnTupleFilter(hostCols[0]));
+            in.addChild(new 
ConstantTupleFilter(asValues(satisfyingHostRecords)));
+            return in;
+        } else {
+            LogicalTupleFilter or = new 
LogicalTupleFilter(FilterOperatorEnum.OR);
+            for (Array<String> rec : satisfyingHostRecords) {
+                LogicalTupleFilter and = new 
LogicalTupleFilter(FilterOperatorEnum.AND);
+                for (int i = 0; i < hn; i++) {
+                    CompareTupleFilter eq = new 
CompareTupleFilter(FilterOperatorEnum.EQ);
+                    eq.addChild(new ColumnTupleFilter(hostCols[i]));
+                    eq.addChild(new ConstantTupleFilter(rec.data[i]));
+                    and.addChild(eq);
+                }
+                or.addChild(and);
+            }
+            return or;
+        }
+    }
+
+    private static List<String> asValues(Set<Array<String>> 
satisfyingHostRecords) {
+        List<String> values = 
Lists.newArrayListWithCapacity(satisfyingHostRecords.size());
+        for (Array<String> rec : satisfyingHostRecords) {
+            values.add(rec.data[0]);
+        }
+        return values;
+    }
+
+    private static LogicalTupleFilter buildRangeFilter(TblColRef[] hostCols, 
Set<Array<String>> satisfyingHostRecords) {
+        int hn = hostCols.length;
+        String[] min = new String[hn];
+        String[] max = new String[hn];
+        findMinMax(satisfyingHostRecords, hostCols, min, max);
+        LogicalTupleFilter and = new 
LogicalTupleFilter(FilterOperatorEnum.AND);
+        for (int i = 0; i < hn; i++) {
+            CompareTupleFilter compMin = new 
CompareTupleFilter(FilterOperatorEnum.GTE);
+            compMin.addChild(new ColumnTupleFilter(hostCols[i]));
+            compMin.addChild(new ConstantTupleFilter(min[i]));
+            and.addChild(compMin);
+            CompareTupleFilter compMax = new 
CompareTupleFilter(FilterOperatorEnum.LTE);
+            compMax.addChild(new ColumnTupleFilter(hostCols[i]));
+            compMax.addChild(new ConstantTupleFilter(max[i]));
+            and.addChild(compMax);
+        }
+        return and;
+    }
+
+    private static void findMinMax(Set<Array<String>> satisfyingHostRecords, 
TblColRef[] hostCols, String[] min, String[] max) {
+
+        RowKeyColumnOrder[] orders = new RowKeyColumnOrder[hostCols.length];
+        for (int i = 0; i < hostCols.length; i++) {
+            orders[i] = RowKeyColumnOrder.getInstance(hostCols[i].getType());
+        }
+
+        for (Array<String> rec : satisfyingHostRecords) {
+            String[] row = rec.data;
+            for (int i = 0; i < row.length; i++) {
+                min[i] = orders[i].min(min[i], row[i]);
+                max[i] = orders[i].max(max[i], row[i]);
+            }
+        }
+    }
+
+    private static class SingleColumnTuple implements IEvaluatableTuple {
+
+        private TblColRef col;
+        private String value;
+
+        SingleColumnTuple(TblColRef col) {
+            this.col = col;
+        }
+
+        @Override
+        public Object getValue(TblColRef col) {
+            if (this.col.equals(col))
+                return value;
+            else
+                throw new IllegalArgumentException("unexpected column " + col);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java
index 1e05eb8..bfa398e 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java
@@ -1,132 +1,132 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.translate;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-public class FuzzyValueCombination {
-
-    private static class Dim<E> {
-        TblColRef col;
-        Set<E> values;
-    }
-
-    private static final Set SINGLE_NULL_SET = Sets.newHashSet();
-
-    static {
-        SINGLE_NULL_SET.add(null);
-    }
-
-    public static <E> List<Map<TblColRef, E>> calculate(Map<TblColRef, Set<E>> 
fuzzyValues, long cap) {
-        Collections.emptyMap();
-        Dim[] dims = toDims(fuzzyValues);
-        // If a query has many IN clause and each IN clause has many values, 
then it will easily generate 
-        // thousands of fuzzy keys. When there are lots of fuzzy keys, the 
scan performance is bottle necked 
-        // on it. So simply choose to abandon all fuzzy keys in this case.
-        if (exceedCap(dims, cap)) {
-            return Lists.newArrayList();
-        } else {
-            return combination(dims);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private static <E> List<Map<TblColRef, E>> combination(Dim[] dims) {
-
-        List<Map<TblColRef, E>> result = Lists.newArrayList();
-
-        int emptyDims = 0;
-        for (Dim dim : dims) {
-            if (dim.values.isEmpty()) {
-                dim.values = SINGLE_NULL_SET;
-                emptyDims++;
-            }
-        }
-        if (emptyDims == dims.length) {
-            return result;
-        }
-
-        Map<TblColRef, E> r = Maps.newHashMap();
-        Iterator<E>[] iters = new Iterator[dims.length];
-        int level = 0;
-        while (true) {
-            Dim dim = dims[level];
-            if (iters[level] == null) {
-                iters[level] = dim.values.iterator();
-            }
-
-            Iterator<E> it = iters[level];
-            if (it.hasNext() == false) {
-                if (level == 0)
-                    break;
-                r.remove(dim.col);
-                iters[level] = null;
-                level--;
-                continue;
-            }
-
-            r.put(dim.col, it.next());
-            if (level == dims.length - 1) {
-                result.add(new HashMap<TblColRef, E>(r));
-            } else {
-                level++;
-            }
-        }
-        return result;
-    }
-
-    private static <E> Dim[] toDims(Map<TblColRef, Set<E>> fuzzyValues) {
-        Dim[] dims = new Dim[fuzzyValues.size()];
-        int i = 0;
-        for (Entry<TblColRef, Set<E>> entry : fuzzyValues.entrySet()) {
-            dims[i] = new Dim();
-            dims[i].col = entry.getKey();
-            dims[i].values = entry.getValue();
-            if (dims[i].values == null)
-                dims[i].values = Collections.emptySet();
-            i++;
-        }
-        return dims;
-    }
-
-    private static boolean exceedCap(Dim[] dims, long cap) {
-        return combCount(dims) > cap;
-    }
-
-    private static long combCount(Dim[] dims) {
-        long count = 1;
-        for (Dim dim : dims) {
-            count *= Math.max(dim.values.size(), 1);
-        }
-        return count;
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.translate;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class FuzzyValueCombination {
+
+    private static class Dim<E> {
+        TblColRef col;
+        Set<E> values;
+    }
+
+    private static final Set SINGLE_NULL_SET = Sets.newHashSet();
+
+    static {
+        SINGLE_NULL_SET.add(null);
+    }
+
+    public static <E> List<Map<TblColRef, E>> calculate(Map<TblColRef, Set<E>> 
fuzzyValues, long cap) {
+        Collections.emptyMap();
+        Dim[] dims = toDims(fuzzyValues);
+        // If a query has many IN clause and each IN clause has many values, 
then it will easily generate 
+        // thousands of fuzzy keys. When there are lots of fuzzy keys, the 
scan performance is bottle necked 
+        // on it. So simply choose to abandon all fuzzy keys in this case.
+        if (exceedCap(dims, cap)) {
+            return Lists.newArrayList();
+        } else {
+            return combination(dims);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <E> List<Map<TblColRef, E>> combination(Dim[] dims) {
+
+        List<Map<TblColRef, E>> result = Lists.newArrayList();
+
+        int emptyDims = 0;
+        for (Dim dim : dims) {
+            if (dim.values.isEmpty()) {
+                dim.values = SINGLE_NULL_SET;
+                emptyDims++;
+            }
+        }
+        if (emptyDims == dims.length) {
+            return result;
+        }
+
+        Map<TblColRef, E> r = Maps.newHashMap();
+        Iterator<E>[] iters = new Iterator[dims.length];
+        int level = 0;
+        while (true) {
+            Dim dim = dims[level];
+            if (iters[level] == null) {
+                iters[level] = dim.values.iterator();
+            }
+
+            Iterator<E> it = iters[level];
+            if (it.hasNext() == false) {
+                if (level == 0)
+                    break;
+                r.remove(dim.col);
+                iters[level] = null;
+                level--;
+                continue;
+            }
+
+            r.put(dim.col, it.next());
+            if (level == dims.length - 1) {
+                result.add(new HashMap<TblColRef, E>(r));
+            } else {
+                level++;
+            }
+        }
+        return result;
+    }
+
+    private static <E> Dim[] toDims(Map<TblColRef, Set<E>> fuzzyValues) {
+        Dim[] dims = new Dim[fuzzyValues.size()];
+        int i = 0;
+        for (Entry<TblColRef, Set<E>> entry : fuzzyValues.entrySet()) {
+            dims[i] = new Dim();
+            dims[i].col = entry.getKey();
+            dims[i].values = entry.getValue();
+            if (dims[i].values == null)
+                dims[i].values = Collections.emptySet();
+            i++;
+        }
+        return dims;
+    }
+
+    private static boolean exceedCap(Dim[] dims, long cap) {
+        return combCount(dims) > cap;
+    }
+
+    private static long combCount(Dim[] dims) {
+        long count = 1;
+        for (Dim dim : dims) {
+            count *= Math.max(dim.values.size(), 1);
+        }
+        return count;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
index 2193eab..074b271 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
@@ -1,77 +1,77 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
 import java.io.EOFException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.source.ReadableTable;
-
-/**
- */
-public class DFSFileTable implements ReadableTable {
-
-    public static final String DELIM_AUTO = "auto";
-    public static final String DELIM_COMMA = ",";
-
-    String path;
-    String delim;
-    int nColumns;
-
-    public DFSFileTable(String path, int nColumns) {
-        this(path, DELIM_AUTO, nColumns);
-    }
-
-    public DFSFileTable(String path, String delim, int nColumns) {
-        this.path = path;
-        this.delim = delim;
-        this.nColumns = nColumns;
-    }
-
-    public String getColumnDelimeter() {
-        return delim;
-    }
-
-    @Override
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.source.ReadableTable;
+
+/**
+ */
+public class DFSFileTable implements ReadableTable {
+
+    public static final String DELIM_AUTO = "auto";
+    public static final String DELIM_COMMA = ",";
+
+    String path;
+    String delim;
+    int nColumns;
+
+    public DFSFileTable(String path, int nColumns) {
+        this(path, DELIM_AUTO, nColumns);
+    }
+
+    public DFSFileTable(String path, String delim, int nColumns) {
+        this.path = path;
+        this.delim = delim;
+        this.nColumns = nColumns;
+    }
+
+    public String getColumnDelimeter() {
+        return delim;
+    }
+
+    @Override
     @Deprecated
-    public TableReader getReader() throws IOException {
-        return new DFSFileTableReader(path, delim, nColumns);
-    }
-
-    @Override
-    public TableSignature getSignature() throws IOException {
-        try {
-            Pair<Long, Long> sizeAndLastModified = 
getSizeAndLastModified(path);
-            return new TableSignature(path, sizeAndLastModified.getFirst(), 
sizeAndLastModified.getSecond());
-        } catch (FileNotFoundException ex) {
-            return null;
-        }
-    }
-
+    public TableReader getReader() throws IOException {
+        return new DFSFileTableReader(path, delim, nColumns);
+    }
+
+    @Override
+    public TableSignature getSignature() throws IOException {
+        try {
+            Pair<Long, Long> sizeAndLastModified = 
getSizeAndLastModified(path);
+            return new TableSignature(path, sizeAndLastModified.getFirst(), 
sizeAndLastModified.getSecond());
+        } catch (FileNotFoundException ex) {
+            return null;
+        }
+    }
+
     public Collection<TableReader> getReaders() {
         ArrayList<TableReader> readers = new ArrayList<>();
         try {
@@ -95,33 +95,33 @@ public class DFSFileTable implements ReadableTable {
         return readers;
     }
 
-    @Override
-    public String toString() {
-        return path;
-    }
-
-    public static Pair<Long, Long> getSizeAndLastModified(String path) throws 
IOException {
-        FileSystem fs = HadoopUtil.getFileSystem(path);
-
-        // get all contained files if path is directory
-        ArrayList<FileStatus> allFiles = new ArrayList<>();
-        FileStatus status = fs.getFileStatus(new Path(path));
-        if (status.isFile()) {
-            allFiles.add(status);
-        } else {
-            FileStatus[] listStatus = fs.listStatus(new Path(path));
-            allFiles.addAll(Arrays.asList(listStatus));
-        }
-
-        long size = 0;
-        long lastModified = 0;
-        for (FileStatus file : allFiles) {
-            size += file.getLen();
-            lastModified = Math.max(lastModified, file.getModificationTime());
-        }
-
-        return Pair.newPair(size, lastModified);
-    }
+    @Override
+    public String toString() {
+        return path;
+    }
+
+    public static Pair<Long, Long> getSizeAndLastModified(String path) throws 
IOException {
+        FileSystem fs = HadoopUtil.getFileSystem(path);
+
+        // get all contained files if path is directory
+        ArrayList<FileStatus> allFiles = new ArrayList<>();
+        FileStatus status = fs.getFileStatus(new Path(path));
+        if (status.isFile()) {
+            allFiles.add(status);
+        } else {
+            FileStatus[] listStatus = fs.listStatus(new Path(path));
+            allFiles.addAll(Arrays.asList(listStatus));
+        }
+
+        long size = 0;
+        long lastModified = 0;
+        for (FileStatus file : allFiles) {
+            size += file.getLen();
+            lastModified = Math.max(lastModified, file.getModificationTime());
+        }
+
+        return Pair.newPair(size, lastModified);
+    }
 
     private boolean isExceptionSayingNotSeqFile(IOException e) {
         if (e.getMessage() != null && e.getMessage().contains("not a 
SequenceFile"))
@@ -132,4 +132,4 @@ public class DFSFileTable implements ReadableTable {
 
         return false;
     }
-}
+}

Reply via email to