http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java index db1a170..f2edcb4 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java @@ -1,285 +1,285 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict.lookup; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.File; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import com.google.common.base.Strings; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.ArrayUtils; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.RootPersistentEntity; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.dict.StringBytesConverter; -import org.apache.kylin.dict.TrieDictionary; -import org.apache.kylin.dict.TrieDictionaryBuilder; -import org.apache.kylin.metadata.model.ColumnDesc; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.source.ReadableTable; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; -import com.fasterxml.jackson.annotation.JsonProperty; - -/** - * @author yangli9 - */ -@SuppressWarnings("serial") -@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class SnapshotTable extends RootPersistentEntity implements ReadableTable { - - @JsonProperty("tableName") - private String tableName; - @JsonProperty("signature") - private TableSignature signature; - @JsonProperty("useDictionary") - private boolean useDictionary; - - private ArrayList<int[]> rowIndices; - private Dictionary<String> dict; - - // default constructor for JSON serialization - public SnapshotTable() { - } - - SnapshotTable(ReadableTable table, String tableName) throws IOException { - this.tableName = tableName; - this.signature = table.getSignature(); - this.useDictionary = true; - } - - public void takeSnapshot(ReadableTable table, TableDesc tableDesc) throws IOException { - this.signature = table.getSignature(); - - int maxIndex = tableDesc.getMaxColumnIndex(); - - TrieDictionaryBuilder<String> b = new TrieDictionaryBuilder<String>(new StringBytesConverter()); - - TableReader reader = table.getReader(); - try { - while (reader.next()) { - String[] row = reader.getRow(); - if (row.length <= maxIndex) { - throw new IllegalStateException("Bad hive table row, " + tableDesc + " expect " + (maxIndex + 1) + " columns, but got " + Arrays.toString(row)); - } - for (ColumnDesc column : tableDesc.getColumns()) { - String cell = row[column.getZeroBasedIndex()]; - if (cell != null) - b.addValue(cell); - } - } - } finally { - IOUtils.closeQuietly(reader); - } - - this.dict = b.build(0); - - ArrayList<int[]> allRowIndices = new ArrayList<int[]>(); - reader = table.getReader(); - try { - while (reader.next()) { - String[] row = reader.getRow(); - int[] rowIndex = new int[tableDesc.getColumnCount()]; - for (ColumnDesc column : tableDesc.getColumns()) { - rowIndex[column.getZeroBasedIndex()] = dict.getIdFromValue(row[column.getZeroBasedIndex()]); - } - allRowIndices.add(rowIndex); - } - } finally { - IOUtils.closeQuietly(reader); - } - - this.rowIndices = allRowIndices; - } - - public String getResourcePath() { - return getResourceDir() + "/" + uuid + ".snapshot"; - } - - public String getResourceDir() { - if (Strings.isNullOrEmpty(tableName)) { - return getOldResourceDir(); - } else { - return ResourceStore.SNAPSHOT_RESOURCE_ROOT + "/" + tableName; - } - } - - private String getOldResourceDir() { - return ResourceStore.SNAPSHOT_RESOURCE_ROOT + "/" + new File(signature.getPath()).getName(); - } - - @Override - public TableReader getReader() throws IOException { - return new TableReader() { - - int i = -1; - - @Override - public boolean next() throws IOException { - i++; - return i < rowIndices.size(); - } - - @Override - public String[] getRow() { - int[] rowIndex = rowIndices.get(i); - String[] row = new String[rowIndex.length]; - for (int x = 0; x < row.length; x++) { - row[x] = dict.getValueFromId(rowIndex[x]); - } - return row; - } - - @Override - public void close() throws IOException { - } - }; - } - - @Override - public TableSignature getSignature() throws IOException { - return signature; - } - - /** - * a naive implementation - * - * @return - */ - @Override - public int hashCode() { - int[] parts = new int[this.rowIndices.size()]; - for (int i = 0; i < parts.length; ++i) - parts[i] = Arrays.hashCode(this.rowIndices.get(i)); - return Arrays.hashCode(parts); - } - - @Override - public boolean equals(Object o) { - if ((o instanceof SnapshotTable) == false) - return false; - SnapshotTable that = (SnapshotTable) o; - - if (this.dict.equals(that.dict) == false) - return false; - - //compare row by row - if (this.rowIndices.size() != that.rowIndices.size()) - return false; - for (int i = 0; i < this.rowIndices.size(); ++i) { - if (!ArrayUtils.isEquals(this.rowIndices.get(i), that.rowIndices.get(i))) - return false; - } - - return true; - } - - private static String NULL_STR; - { - try { - // a special placeholder to indicate a NULL; 0, 9, 127, 255 are a few invisible ASCII characters - NULL_STR = new String(new byte[] { 0, 9, 127, (byte) 255 }, "ISO-8859-1"); - } catch (UnsupportedEncodingException e) { - // does not happen - } - } - - void writeData(DataOutput out) throws IOException { - out.writeInt(rowIndices.size()); - if (rowIndices.size() > 0) { - int n = rowIndices.get(0).length; - out.writeInt(n); - - if (this.useDictionary == true) { - dict.write(out); - for (int i = 0; i < rowIndices.size(); i++) { - int[] row = rowIndices.get(i); - for (int j = 0; j < n; j++) { - out.writeInt(row[j]); - } - } - - } else { - for (int i = 0; i < rowIndices.size(); i++) { - int[] row = rowIndices.get(i); - for (int j = 0; j < n; j++) { - // NULL_STR is tricky, but we don't want to break the current snapshots - out.writeUTF(dict.getValueFromId(row[j]) == null ? NULL_STR : dict.getValueFromId(row[j])); - } - } - } - } - } - - void readData(DataInput in) throws IOException { - int rowNum = in.readInt(); - if (rowNum > 0) { - int n = in.readInt(); - rowIndices = new ArrayList<int[]>(rowNum); - - if (this.useDictionary == true) { - this.dict = new TrieDictionary<String>(); - dict.readFields(in); - - for (int i = 0; i < rowNum; i++) { - int[] row = new int[n]; - this.rowIndices.add(row); - for (int j = 0; j < n; j++) { - row[j] = in.readInt(); - } - } - } else { - List<String[]> rows = new ArrayList<String[]>(rowNum); - TrieDictionaryBuilder<String> b = new TrieDictionaryBuilder<String>(new StringBytesConverter()); - - for (int i = 0; i < rowNum; i++) { - String[] row = new String[n]; - rows.add(row); - for (int j = 0; j < n; j++) { - row[j] = in.readUTF(); - // NULL_STR is tricky, but we don't want to break the current snapshots - if (row[j].equals(NULL_STR)) - row[j] = null; - - b.addValue(row[j]); - } - } - this.dict = b.build(0); - for (String[] row : rows) { - int[] rowIndex = new int[n]; - for (int i = 0; i < n; i++) { - rowIndex[i] = dict.getIdFromValue(row[i]); - } - this.rowIndices.add(rowIndex); - } - } - } else { - rowIndices = new ArrayList<int[]>(); - dict = new TrieDictionary<String>(); - } - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict.lookup; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.File; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import com.google.common.base.Strings; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.ArrayUtils; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.RootPersistentEntity; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.dict.StringBytesConverter; +import org.apache.kylin.dict.TrieDictionary; +import org.apache.kylin.dict.TrieDictionaryBuilder; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.source.ReadableTable; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * @author yangli9 + */ +@SuppressWarnings("serial") +@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) +public class SnapshotTable extends RootPersistentEntity implements ReadableTable { + + @JsonProperty("tableName") + private String tableName; + @JsonProperty("signature") + private TableSignature signature; + @JsonProperty("useDictionary") + private boolean useDictionary; + + private ArrayList<int[]> rowIndices; + private Dictionary<String> dict; + + // default constructor for JSON serialization + public SnapshotTable() { + } + + SnapshotTable(ReadableTable table, String tableName) throws IOException { + this.tableName = tableName; + this.signature = table.getSignature(); + this.useDictionary = true; + } + + public void takeSnapshot(ReadableTable table, TableDesc tableDesc) throws IOException { + this.signature = table.getSignature(); + + int maxIndex = tableDesc.getMaxColumnIndex(); + + TrieDictionaryBuilder<String> b = new TrieDictionaryBuilder<String>(new StringBytesConverter()); + + TableReader reader = table.getReader(); + try { + while (reader.next()) { + String[] row = reader.getRow(); + if (row.length <= maxIndex) { + throw new IllegalStateException("Bad hive table row, " + tableDesc + " expect " + (maxIndex + 1) + " columns, but got " + Arrays.toString(row)); + } + for (ColumnDesc column : tableDesc.getColumns()) { + String cell = row[column.getZeroBasedIndex()]; + if (cell != null) + b.addValue(cell); + } + } + } finally { + IOUtils.closeQuietly(reader); + } + + this.dict = b.build(0); + + ArrayList<int[]> allRowIndices = new ArrayList<int[]>(); + reader = table.getReader(); + try { + while (reader.next()) { + String[] row = reader.getRow(); + int[] rowIndex = new int[tableDesc.getColumnCount()]; + for (ColumnDesc column : tableDesc.getColumns()) { + rowIndex[column.getZeroBasedIndex()] = dict.getIdFromValue(row[column.getZeroBasedIndex()]); + } + allRowIndices.add(rowIndex); + } + } finally { + IOUtils.closeQuietly(reader); + } + + this.rowIndices = allRowIndices; + } + + public String getResourcePath() { + return getResourceDir() + "/" + uuid + ".snapshot"; + } + + public String getResourceDir() { + if (Strings.isNullOrEmpty(tableName)) { + return getOldResourceDir(); + } else { + return ResourceStore.SNAPSHOT_RESOURCE_ROOT + "/" + tableName; + } + } + + private String getOldResourceDir() { + return ResourceStore.SNAPSHOT_RESOURCE_ROOT + "/" + new File(signature.getPath()).getName(); + } + + @Override + public TableReader getReader() throws IOException { + return new TableReader() { + + int i = -1; + + @Override + public boolean next() throws IOException { + i++; + return i < rowIndices.size(); + } + + @Override + public String[] getRow() { + int[] rowIndex = rowIndices.get(i); + String[] row = new String[rowIndex.length]; + for (int x = 0; x < row.length; x++) { + row[x] = dict.getValueFromId(rowIndex[x]); + } + return row; + } + + @Override + public void close() throws IOException { + } + }; + } + + @Override + public TableSignature getSignature() throws IOException { + return signature; + } + + /** + * a naive implementation + * + * @return + */ + @Override + public int hashCode() { + int[] parts = new int[this.rowIndices.size()]; + for (int i = 0; i < parts.length; ++i) + parts[i] = Arrays.hashCode(this.rowIndices.get(i)); + return Arrays.hashCode(parts); + } + + @Override + public boolean equals(Object o) { + if ((o instanceof SnapshotTable) == false) + return false; + SnapshotTable that = (SnapshotTable) o; + + if (this.dict.equals(that.dict) == false) + return false; + + //compare row by row + if (this.rowIndices.size() != that.rowIndices.size()) + return false; + for (int i = 0; i < this.rowIndices.size(); ++i) { + if (!ArrayUtils.isEquals(this.rowIndices.get(i), that.rowIndices.get(i))) + return false; + } + + return true; + } + + private static String NULL_STR; + { + try { + // a special placeholder to indicate a NULL; 0, 9, 127, 255 are a few invisible ASCII characters + NULL_STR = new String(new byte[] { 0, 9, 127, (byte) 255 }, "ISO-8859-1"); + } catch (UnsupportedEncodingException e) { + // does not happen + } + } + + void writeData(DataOutput out) throws IOException { + out.writeInt(rowIndices.size()); + if (rowIndices.size() > 0) { + int n = rowIndices.get(0).length; + out.writeInt(n); + + if (this.useDictionary == true) { + dict.write(out); + for (int i = 0; i < rowIndices.size(); i++) { + int[] row = rowIndices.get(i); + for (int j = 0; j < n; j++) { + out.writeInt(row[j]); + } + } + + } else { + for (int i = 0; i < rowIndices.size(); i++) { + int[] row = rowIndices.get(i); + for (int j = 0; j < n; j++) { + // NULL_STR is tricky, but we don't want to break the current snapshots + out.writeUTF(dict.getValueFromId(row[j]) == null ? NULL_STR : dict.getValueFromId(row[j])); + } + } + } + } + } + + void readData(DataInput in) throws IOException { + int rowNum = in.readInt(); + if (rowNum > 0) { + int n = in.readInt(); + rowIndices = new ArrayList<int[]>(rowNum); + + if (this.useDictionary == true) { + this.dict = new TrieDictionary<String>(); + dict.readFields(in); + + for (int i = 0; i < rowNum; i++) { + int[] row = new int[n]; + this.rowIndices.add(row); + for (int j = 0; j < n; j++) { + row[j] = in.readInt(); + } + } + } else { + List<String[]> rows = new ArrayList<String[]>(rowNum); + TrieDictionaryBuilder<String> b = new TrieDictionaryBuilder<String>(new StringBytesConverter()); + + for (int i = 0; i < rowNum; i++) { + String[] row = new String[n]; + rows.add(row); + for (int j = 0; j < n; j++) { + row[j] = in.readUTF(); + // NULL_STR is tricky, but we don't want to break the current snapshots + if (row[j].equals(NULL_STR)) + row[j] = null; + + b.addValue(row[j]); + } + } + this.dict = b.build(0); + for (String[] row : rows) { + int[] rowIndex = new int[n]; + for (int i = 0; i < n; i++) { + rowIndex[i] = dict.getIdFromValue(row[i]); + } + this.rowIndices.add(rowIndex); + } + } + } else { + rowIndices = new ArrayList<int[]>(); + dict = new TrieDictionary<String>(); + } + } + +}
http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTableSerializer.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTableSerializer.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTableSerializer.java index f5663a5..02164d6 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTableSerializer.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTableSerializer.java @@ -1,79 +1,79 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict.lookup; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.kylin.common.persistence.Serializer; -import org.apache.kylin.common.util.JsonUtil; - -/* - * Copyright 2013-2014 eBay Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * @author yangli9 - * - */ -public class SnapshotTableSerializer implements Serializer<SnapshotTable> { - - public static final SnapshotTableSerializer FULL_SERIALIZER = new SnapshotTableSerializer(false); - public static final SnapshotTableSerializer INFO_SERIALIZER = new SnapshotTableSerializer(true); - - private boolean infoOnly; - - SnapshotTableSerializer(boolean infoOnly) { - this.infoOnly = infoOnly; - } - - @Override - public void serialize(SnapshotTable obj, DataOutputStream out) throws IOException { - String json = JsonUtil.writeValueAsIndentString(obj); - out.writeUTF(json); - - if (infoOnly == false) - obj.writeData(out); - } - - @Override - public SnapshotTable deserialize(DataInputStream in) throws IOException { - String json = in.readUTF(); - SnapshotTable obj = JsonUtil.readValue(json, SnapshotTable.class); - - if (infoOnly == false) - obj.readData(in); - - return obj; - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict.lookup; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.kylin.common.persistence.Serializer; +import org.apache.kylin.common.util.JsonUtil; + +/* + * Copyright 2013-2014 eBay Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @author yangli9 + * + */ +public class SnapshotTableSerializer implements Serializer<SnapshotTable> { + + public static final SnapshotTableSerializer FULL_SERIALIZER = new SnapshotTableSerializer(false); + public static final SnapshotTableSerializer INFO_SERIALIZER = new SnapshotTableSerializer(true); + + private boolean infoOnly; + + SnapshotTableSerializer(boolean infoOnly) { + this.infoOnly = infoOnly; + } + + @Override + public void serialize(SnapshotTable obj, DataOutputStream out) throws IOException { + String json = JsonUtil.writeValueAsIndentString(obj); + out.writeUTF(json); + + if (infoOnly == false) + obj.writeData(out); + } + + @Override + public SnapshotTable deserialize(DataInputStream in) throws IOException { + String json = in.readUTF(); + SnapshotTable obj = JsonUtil.readValue(json, SnapshotTable.class); + + if (infoOnly == false) + obj.readData(in); + + return obj; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java index 4266f2a..5e1705a 100644 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java @@ -59,7 +59,7 @@ public class AppendTrieDictionaryTest { KylinConfig config = KylinConfig.getInstanceFromEnv(); config.setAppendDictEntrySize(50000); config.setAppendDictCacheSize(3); - config.setProperty("kylin.hdfs.working.dir", BASE_DIR); + config.setProperty("kylin.env.hdfs-working-dir", BASE_DIR); } @AfterClass http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java index 3937a24..be07d76 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -57,9 +57,9 @@ import com.google.common.collect.Maps; * schedule the cubing jobs when several job server running with the same metadata. * * to enable the distributed job server, you need to set and update three configs in the kylin.properties: - * 1. kylin.enable.scheduler=2 - * 2. kylin.job.controller.lock=org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock - * 3. add all the job servers and query servers to the kylin.rest.servers + * 1. kylin.job.scheduler.default=2 + * 2. kylin.job.lock=org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock + * 3. add all the job servers and query servers to the kylin.server.cluster-servers */ public class DistributedScheduler implements Scheduler<AbstractExecutable>, ConnectionStateListener { private ExecutableManager executableManager; http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java index 0fc96c3..500a5b5 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java @@ -1,52 +1,52 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.metadata; - -/** - * Constances to describe metadata and it's change. - * - */ -public interface MetadataConstants { - - public static final String FILE_SURFIX = ".json"; - - // Extended attribute keys - public static final String TABLE_EXD_STATUS_KEY = "EXD_STATUS"; - public static final String TABLE_EXD_MINFS = "minFileSize"; - public static final String TABLE_EXD_TNF = "totalNumberFiles"; - public static final String TABLE_EXD_LOCATION = "location"; - public static final String TABLE_EXD_LUT = "lastUpdateTime"; - public static final String TABLE_EXD_LAT = "lastAccessTime"; - public static final String TABLE_EXD_COLUMN = "columns"; - public static final String TABLE_EXD_PC = "partitionColumns"; - public static final String TABLE_EXD_MAXFS = "maxFileSize"; - public static final String TABLE_EXD_IF = "inputformat"; - public static final String TABLE_EXD_PARTITIONED = "partitioned"; - public static final String TABLE_EXD_TABLENAME = "tableName"; - public static final String TABLE_EXD_OWNER = "owner"; - public static final String TABLE_EXD_TFS = "totalFileSize"; - public static final String TABLE_EXD_OF = "outputformat"; - /** - * The value is an array - */ - public static final String TABLE_EXD_CARDINALITY = "cardinality"; - public static final String TABLE_EXD_DELIM = "delim"; - public static final String TABLE_EXD_DEFAULT_VALUE = "unknown"; - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metadata; + +/** + * Constances to describe metadata and it's change. + * + */ +public interface MetadataConstants { + + public static final String FILE_SURFIX = ".json"; + + // Extended attribute keys + public static final String TABLE_EXD_STATUS_KEY = "EXD_STATUS"; + public static final String TABLE_EXD_MINFS = "minFileSize"; + public static final String TABLE_EXD_TNF = "totalNumberFiles"; + public static final String TABLE_EXD_LOCATION = "location"; + public static final String TABLE_EXD_LUT = "lastUpdateTime"; + public static final String TABLE_EXD_LAT = "lastAccessTime"; + public static final String TABLE_EXD_COLUMN = "columns"; + public static final String TABLE_EXD_PC = "partitionColumns"; + public static final String TABLE_EXD_MAXFS = "maxFileSize"; + public static final String TABLE_EXD_IF = "inputformat"; + public static final String TABLE_EXD_PARTITIONED = "partitioned"; + public static final String TABLE_EXD_TABLENAME = "tableName"; + public static final String TABLE_EXD_OWNER = "owner"; + public static final String TABLE_EXD_TFS = "totalFileSize"; + public static final String TABLE_EXD_OF = "outputformat"; + /** + * The value is an array + */ + public static final String TABLE_EXD_CARDINALITY = "cardinality"; + public static final String TABLE_EXD_DELIM = "delim"; + public static final String TABLE_EXD_DEFAULT_VALUE = "unknown"; + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java index 8d34cc0..6bc5771 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java @@ -102,7 +102,7 @@ public class Broadcaster { final String[] nodes = config.getRestServers(); if (nodes == null || nodes.length < 1) { - logger.warn("There is no available rest server; check the 'kylin.rest.servers' config"); + logger.warn("There is no available rest server; check the 'kylin.server.cluster-servers' config"); broadcastEvents = null; // disable the broadcaster return; } http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java index 630156f..5d0409a 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java @@ -1,77 +1,77 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.metadata.model; - -import org.apache.kylin.metadata.model.DataModelDesc.TableKind; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metadata.model; + +import org.apache.kylin.metadata.model.DataModelDesc.TableKind; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; -import com.fasterxml.jackson.annotation.JsonProperty; - -@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class JoinTableDesc { - - @JsonProperty("table") - private String table; - - @JsonProperty("kind") - @JsonInclude(JsonInclude.Include.NON_NULL) - private TableKind kind = TableKind.LOOKUP; - +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.JsonProperty; + +@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) +public class JoinTableDesc { + + @JsonProperty("table") + private String table; + + @JsonProperty("kind") + @JsonInclude(JsonInclude.Include.NON_NULL) + private TableKind kind = TableKind.LOOKUP; + @JsonProperty("alias") @JsonInclude(JsonInclude.Include.NON_NULL) private String alias; - @JsonProperty("join") - private JoinDesc join; - + @JsonProperty("join") + private JoinDesc join; + private TableRef tableRef; - - public String getTable() { - return table; - } - + + public String getTable() { + return table; + } + void setTable(String table) { - this.table = table; - } - - public TableKind getKind() { - return kind; - } - + this.table = table; + } + + public TableKind getKind() { + return kind; + } + public String getAlias() { return alias; } - public JoinDesc getJoin() { - return join; - } - + public JoinDesc getJoin() { + return join; + } + public TableRef getTableRef() { return tableRef; - } - + } + void setTableRef(TableRef ref) { this.tableRef = ref; - } - - - -} + } + + + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java index 742cfba..c4042c6 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java @@ -1,43 +1,43 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.metadata.tuple; - -import java.util.List; - -import org.apache.kylin.metadata.model.TblColRef; - -/** - * Tuple is a record row, contains multiple values being lookup by either field - * (calcite notion) or column (kylin notion). - * - * @author yangli9 - */ -public interface ITuple extends IEvaluatableTuple, Cloneable { - - List<String> getAllFields(); - - List<TblColRef> getAllColumns(); - - Object[] getAllValues(); - - ITuple makeCopy(); - - // declared from IEvaluatableTuple: public Object getValue(TblColRef col); - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metadata.tuple; + +import java.util.List; + +import org.apache.kylin.metadata.model.TblColRef; + +/** + * Tuple is a record row, contains multiple values being lookup by either field + * (calcite notion) or column (kylin notion). + * + * @author yangli9 + */ +public interface ITuple extends IEvaluatableTuple, Cloneable { + + List<String> getAllFields(); + + List<TblColRef> getAllColumns(); + + Object[] getAllValues(); + + ITuple makeCopy(); + + // declared from IEvaluatableTuple: public Object getValue(TblColRef col); + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java index b338b3c..719cab6 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java @@ -131,7 +131,7 @@ public class StorageContext { if (!realization.supportsLimitPushDown()) { logger.info("Not enabling limit push down because cube storage type not supported"); } else if (tempPushDownLimit > pushDownLimitMax) { - logger.info("Not enabling limit push down because the limit(including offset) {} is larger than kylin.query.pushdown.limit.max {}", // + logger.info("Not enabling limit push down because the limit(including offset) {} is larger than kylin.query.max-limit-pushdown {}", // tempPushDownLimit, pushDownLimitMax); } else { this.finalPushDownLimit = tempPushDownLimit; http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java index 83fa32d..160338f 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java @@ -1,200 +1,200 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.translate; - -import java.util.List; -import java.util.Set; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Array; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.kv.RowKeyColumnOrder; -import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; -import org.apache.kylin.cube.model.CubeDesc.DeriveType; -import org.apache.kylin.dict.lookup.LookupStringTable; -import org.apache.kylin.metadata.filter.ColumnTupleFilter; -import org.apache.kylin.metadata.filter.CompareTupleFilter; -import org.apache.kylin.metadata.filter.ConstantTupleFilter; -import org.apache.kylin.metadata.filter.LogicalTupleFilter; -import org.apache.kylin.metadata.filter.StringCodeSystem; -import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.tuple.IEvaluatableTuple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -/** - * @author yangli9 - */ -public class DerivedFilterTranslator { - - private static final Logger logger = LoggerFactory.getLogger(DerivedFilterTranslator.class); - - public static Pair<TupleFilter, Boolean> translate(LookupStringTable lookup, DeriveInfo hostInfo, CompareTupleFilter compf) { - - TblColRef derivedCol = compf.getColumn(); - TblColRef[] hostCols = hostInfo.columns; - TblColRef[] pkCols = hostInfo.dimension.getJoin().getPrimaryKeyColumns(); - - if (hostInfo.type == DeriveType.PK_FK) { - assert hostCols.length == 1; - CompareTupleFilter newComp = new CompareTupleFilter(compf.getOperator()); - newComp.addChild(new ColumnTupleFilter(hostCols[0])); - newComp.addChild(new ConstantTupleFilter(compf.getValues())); - return new Pair<TupleFilter, Boolean>(newComp, false); - } - - assert hostInfo.type == DeriveType.LOOKUP; - assert hostCols.length == pkCols.length; - - int di = derivedCol.getColumnDesc().getZeroBasedIndex(); - int[] pi = new int[pkCols.length]; - int hn = hostCols.length; - for (int i = 0; i < hn; i++) { - pi[i] = pkCols[i].getColumnDesc().getZeroBasedIndex(); - } - - Set<Array<String>> satisfyingHostRecords = Sets.newHashSet(); - SingleColumnTuple tuple = new SingleColumnTuple(derivedCol); - for (String[] row : lookup.getAllRows()) { - tuple.value = row[di]; - if (compf.evaluate(tuple, StringCodeSystem.INSTANCE)) { - collect(row, pi, satisfyingHostRecords); - } - } - - TupleFilter translated; - boolean loosened; - if (satisfyingHostRecords.size() > KylinConfig.getInstanceFromEnv().getDerivedInThreshold()) { - logger.info("Deciding to loosen filter on derived filter as host candidates number {} exceeds threshold {}", // - satisfyingHostRecords.size(), KylinConfig.getInstanceFromEnv().getDerivedInThreshold() - ); - translated = buildRangeFilter(hostCols, satisfyingHostRecords); - loosened = true; - } else { - translated = buildInFilter(hostCols, satisfyingHostRecords); - loosened = false; - } - - return new Pair<TupleFilter, Boolean>(translated, loosened); - } - - private static void collect(String[] row, int[] pi, Set<Array<String>> satisfyingHostRecords) { - // TODO when go beyond IN_THRESHOLD, only keep min/max is enough - String[] rec = new String[pi.length]; - for (int i = 0; i < pi.length; i++) { - rec[i] = row[pi[i]]; - } - satisfyingHostRecords.add(new Array<String>(rec)); - } - - private static TupleFilter buildInFilter(TblColRef[] hostCols, Set<Array<String>> satisfyingHostRecords) { - if (satisfyingHostRecords.size() == 0) { - return ConstantTupleFilter.FALSE; - } - - int hn = hostCols.length; - if (hn == 1) { - CompareTupleFilter in = new CompareTupleFilter(FilterOperatorEnum.IN); - in.addChild(new ColumnTupleFilter(hostCols[0])); - in.addChild(new ConstantTupleFilter(asValues(satisfyingHostRecords))); - return in; - } else { - LogicalTupleFilter or = new LogicalTupleFilter(FilterOperatorEnum.OR); - for (Array<String> rec : satisfyingHostRecords) { - LogicalTupleFilter and = new LogicalTupleFilter(FilterOperatorEnum.AND); - for (int i = 0; i < hn; i++) { - CompareTupleFilter eq = new CompareTupleFilter(FilterOperatorEnum.EQ); - eq.addChild(new ColumnTupleFilter(hostCols[i])); - eq.addChild(new ConstantTupleFilter(rec.data[i])); - and.addChild(eq); - } - or.addChild(and); - } - return or; - } - } - - private static List<String> asValues(Set<Array<String>> satisfyingHostRecords) { - List<String> values = Lists.newArrayListWithCapacity(satisfyingHostRecords.size()); - for (Array<String> rec : satisfyingHostRecords) { - values.add(rec.data[0]); - } - return values; - } - - private static LogicalTupleFilter buildRangeFilter(TblColRef[] hostCols, Set<Array<String>> satisfyingHostRecords) { - int hn = hostCols.length; - String[] min = new String[hn]; - String[] max = new String[hn]; - findMinMax(satisfyingHostRecords, hostCols, min, max); - LogicalTupleFilter and = new LogicalTupleFilter(FilterOperatorEnum.AND); - for (int i = 0; i < hn; i++) { - CompareTupleFilter compMin = new CompareTupleFilter(FilterOperatorEnum.GTE); - compMin.addChild(new ColumnTupleFilter(hostCols[i])); - compMin.addChild(new ConstantTupleFilter(min[i])); - and.addChild(compMin); - CompareTupleFilter compMax = new CompareTupleFilter(FilterOperatorEnum.LTE); - compMax.addChild(new ColumnTupleFilter(hostCols[i])); - compMax.addChild(new ConstantTupleFilter(max[i])); - and.addChild(compMax); - } - return and; - } - - private static void findMinMax(Set<Array<String>> satisfyingHostRecords, TblColRef[] hostCols, String[] min, String[] max) { - - RowKeyColumnOrder[] orders = new RowKeyColumnOrder[hostCols.length]; - for (int i = 0; i < hostCols.length; i++) { - orders[i] = RowKeyColumnOrder.getInstance(hostCols[i].getType()); - } - - for (Array<String> rec : satisfyingHostRecords) { - String[] row = rec.data; - for (int i = 0; i < row.length; i++) { - min[i] = orders[i].min(min[i], row[i]); - max[i] = orders[i].max(max[i], row[i]); - } - } - } - - private static class SingleColumnTuple implements IEvaluatableTuple { - - private TblColRef col; - private String value; - - SingleColumnTuple(TblColRef col) { - this.col = col; - } - - @Override - public Object getValue(TblColRef col) { - if (this.col.equals(col)) - return value; - else - throw new IllegalArgumentException("unexpected column " + col); - } - - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.translate; + +import java.util.List; +import java.util.Set; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Array; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.kv.RowKeyColumnOrder; +import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; +import org.apache.kylin.cube.model.CubeDesc.DeriveType; +import org.apache.kylin.dict.lookup.LookupStringTable; +import org.apache.kylin.metadata.filter.ColumnTupleFilter; +import org.apache.kylin.metadata.filter.CompareTupleFilter; +import org.apache.kylin.metadata.filter.ConstantTupleFilter; +import org.apache.kylin.metadata.filter.LogicalTupleFilter; +import org.apache.kylin.metadata.filter.StringCodeSystem; +import org.apache.kylin.metadata.filter.TupleFilter; +import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.tuple.IEvaluatableTuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + * @author yangli9 + */ +public class DerivedFilterTranslator { + + private static final Logger logger = LoggerFactory.getLogger(DerivedFilterTranslator.class); + + public static Pair<TupleFilter, Boolean> translate(LookupStringTable lookup, DeriveInfo hostInfo, CompareTupleFilter compf) { + + TblColRef derivedCol = compf.getColumn(); + TblColRef[] hostCols = hostInfo.columns; + TblColRef[] pkCols = hostInfo.dimension.getJoin().getPrimaryKeyColumns(); + + if (hostInfo.type == DeriveType.PK_FK) { + assert hostCols.length == 1; + CompareTupleFilter newComp = new CompareTupleFilter(compf.getOperator()); + newComp.addChild(new ColumnTupleFilter(hostCols[0])); + newComp.addChild(new ConstantTupleFilter(compf.getValues())); + return new Pair<TupleFilter, Boolean>(newComp, false); + } + + assert hostInfo.type == DeriveType.LOOKUP; + assert hostCols.length == pkCols.length; + + int di = derivedCol.getColumnDesc().getZeroBasedIndex(); + int[] pi = new int[pkCols.length]; + int hn = hostCols.length; + for (int i = 0; i < hn; i++) { + pi[i] = pkCols[i].getColumnDesc().getZeroBasedIndex(); + } + + Set<Array<String>> satisfyingHostRecords = Sets.newHashSet(); + SingleColumnTuple tuple = new SingleColumnTuple(derivedCol); + for (String[] row : lookup.getAllRows()) { + tuple.value = row[di]; + if (compf.evaluate(tuple, StringCodeSystem.INSTANCE)) { + collect(row, pi, satisfyingHostRecords); + } + } + + TupleFilter translated; + boolean loosened; + if (satisfyingHostRecords.size() > KylinConfig.getInstanceFromEnv().getDerivedInThreshold()) { + logger.info("Deciding to loosen filter on derived filter as host candidates number {} exceeds threshold {}", // + satisfyingHostRecords.size(), KylinConfig.getInstanceFromEnv().getDerivedInThreshold() + ); + translated = buildRangeFilter(hostCols, satisfyingHostRecords); + loosened = true; + } else { + translated = buildInFilter(hostCols, satisfyingHostRecords); + loosened = false; + } + + return new Pair<TupleFilter, Boolean>(translated, loosened); + } + + private static void collect(String[] row, int[] pi, Set<Array<String>> satisfyingHostRecords) { + // TODO when go beyond IN_THRESHOLD, only keep min/max is enough + String[] rec = new String[pi.length]; + for (int i = 0; i < pi.length; i++) { + rec[i] = row[pi[i]]; + } + satisfyingHostRecords.add(new Array<String>(rec)); + } + + private static TupleFilter buildInFilter(TblColRef[] hostCols, Set<Array<String>> satisfyingHostRecords) { + if (satisfyingHostRecords.size() == 0) { + return ConstantTupleFilter.FALSE; + } + + int hn = hostCols.length; + if (hn == 1) { + CompareTupleFilter in = new CompareTupleFilter(FilterOperatorEnum.IN); + in.addChild(new ColumnTupleFilter(hostCols[0])); + in.addChild(new ConstantTupleFilter(asValues(satisfyingHostRecords))); + return in; + } else { + LogicalTupleFilter or = new LogicalTupleFilter(FilterOperatorEnum.OR); + for (Array<String> rec : satisfyingHostRecords) { + LogicalTupleFilter and = new LogicalTupleFilter(FilterOperatorEnum.AND); + for (int i = 0; i < hn; i++) { + CompareTupleFilter eq = new CompareTupleFilter(FilterOperatorEnum.EQ); + eq.addChild(new ColumnTupleFilter(hostCols[i])); + eq.addChild(new ConstantTupleFilter(rec.data[i])); + and.addChild(eq); + } + or.addChild(and); + } + return or; + } + } + + private static List<String> asValues(Set<Array<String>> satisfyingHostRecords) { + List<String> values = Lists.newArrayListWithCapacity(satisfyingHostRecords.size()); + for (Array<String> rec : satisfyingHostRecords) { + values.add(rec.data[0]); + } + return values; + } + + private static LogicalTupleFilter buildRangeFilter(TblColRef[] hostCols, Set<Array<String>> satisfyingHostRecords) { + int hn = hostCols.length; + String[] min = new String[hn]; + String[] max = new String[hn]; + findMinMax(satisfyingHostRecords, hostCols, min, max); + LogicalTupleFilter and = new LogicalTupleFilter(FilterOperatorEnum.AND); + for (int i = 0; i < hn; i++) { + CompareTupleFilter compMin = new CompareTupleFilter(FilterOperatorEnum.GTE); + compMin.addChild(new ColumnTupleFilter(hostCols[i])); + compMin.addChild(new ConstantTupleFilter(min[i])); + and.addChild(compMin); + CompareTupleFilter compMax = new CompareTupleFilter(FilterOperatorEnum.LTE); + compMax.addChild(new ColumnTupleFilter(hostCols[i])); + compMax.addChild(new ConstantTupleFilter(max[i])); + and.addChild(compMax); + } + return and; + } + + private static void findMinMax(Set<Array<String>> satisfyingHostRecords, TblColRef[] hostCols, String[] min, String[] max) { + + RowKeyColumnOrder[] orders = new RowKeyColumnOrder[hostCols.length]; + for (int i = 0; i < hostCols.length; i++) { + orders[i] = RowKeyColumnOrder.getInstance(hostCols[i].getType()); + } + + for (Array<String> rec : satisfyingHostRecords) { + String[] row = rec.data; + for (int i = 0; i < row.length; i++) { + min[i] = orders[i].min(min[i], row[i]); + max[i] = orders[i].max(max[i], row[i]); + } + } + } + + private static class SingleColumnTuple implements IEvaluatableTuple { + + private TblColRef col; + private String value; + + SingleColumnTuple(TblColRef col) { + this.col = col; + } + + @Override + public Object getValue(TblColRef col) { + if (this.col.equals(col)) + return value; + else + throw new IllegalArgumentException("unexpected column " + col); + } + + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java index 1e05eb8..bfa398e 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java @@ -1,132 +1,132 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.translate; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.kylin.metadata.model.TblColRef; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -public class FuzzyValueCombination { - - private static class Dim<E> { - TblColRef col; - Set<E> values; - } - - private static final Set SINGLE_NULL_SET = Sets.newHashSet(); - - static { - SINGLE_NULL_SET.add(null); - } - - public static <E> List<Map<TblColRef, E>> calculate(Map<TblColRef, Set<E>> fuzzyValues, long cap) { - Collections.emptyMap(); - Dim[] dims = toDims(fuzzyValues); - // If a query has many IN clause and each IN clause has many values, then it will easily generate - // thousands of fuzzy keys. When there are lots of fuzzy keys, the scan performance is bottle necked - // on it. So simply choose to abandon all fuzzy keys in this case. - if (exceedCap(dims, cap)) { - return Lists.newArrayList(); - } else { - return combination(dims); - } - } - - @SuppressWarnings("unchecked") - private static <E> List<Map<TblColRef, E>> combination(Dim[] dims) { - - List<Map<TblColRef, E>> result = Lists.newArrayList(); - - int emptyDims = 0; - for (Dim dim : dims) { - if (dim.values.isEmpty()) { - dim.values = SINGLE_NULL_SET; - emptyDims++; - } - } - if (emptyDims == dims.length) { - return result; - } - - Map<TblColRef, E> r = Maps.newHashMap(); - Iterator<E>[] iters = new Iterator[dims.length]; - int level = 0; - while (true) { - Dim dim = dims[level]; - if (iters[level] == null) { - iters[level] = dim.values.iterator(); - } - - Iterator<E> it = iters[level]; - if (it.hasNext() == false) { - if (level == 0) - break; - r.remove(dim.col); - iters[level] = null; - level--; - continue; - } - - r.put(dim.col, it.next()); - if (level == dims.length - 1) { - result.add(new HashMap<TblColRef, E>(r)); - } else { - level++; - } - } - return result; - } - - private static <E> Dim[] toDims(Map<TblColRef, Set<E>> fuzzyValues) { - Dim[] dims = new Dim[fuzzyValues.size()]; - int i = 0; - for (Entry<TblColRef, Set<E>> entry : fuzzyValues.entrySet()) { - dims[i] = new Dim(); - dims[i].col = entry.getKey(); - dims[i].values = entry.getValue(); - if (dims[i].values == null) - dims[i].values = Collections.emptySet(); - i++; - } - return dims; - } - - private static boolean exceedCap(Dim[] dims, long cap) { - return combCount(dims) > cap; - } - - private static long combCount(Dim[] dims) { - long count = 1; - for (Dim dim : dims) { - count *= Math.max(dim.values.size(), 1); - } - return count; - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.translate; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.kylin.metadata.model.TblColRef; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class FuzzyValueCombination { + + private static class Dim<E> { + TblColRef col; + Set<E> values; + } + + private static final Set SINGLE_NULL_SET = Sets.newHashSet(); + + static { + SINGLE_NULL_SET.add(null); + } + + public static <E> List<Map<TblColRef, E>> calculate(Map<TblColRef, Set<E>> fuzzyValues, long cap) { + Collections.emptyMap(); + Dim[] dims = toDims(fuzzyValues); + // If a query has many IN clause and each IN clause has many values, then it will easily generate + // thousands of fuzzy keys. When there are lots of fuzzy keys, the scan performance is bottle necked + // on it. So simply choose to abandon all fuzzy keys in this case. + if (exceedCap(dims, cap)) { + return Lists.newArrayList(); + } else { + return combination(dims); + } + } + + @SuppressWarnings("unchecked") + private static <E> List<Map<TblColRef, E>> combination(Dim[] dims) { + + List<Map<TblColRef, E>> result = Lists.newArrayList(); + + int emptyDims = 0; + for (Dim dim : dims) { + if (dim.values.isEmpty()) { + dim.values = SINGLE_NULL_SET; + emptyDims++; + } + } + if (emptyDims == dims.length) { + return result; + } + + Map<TblColRef, E> r = Maps.newHashMap(); + Iterator<E>[] iters = new Iterator[dims.length]; + int level = 0; + while (true) { + Dim dim = dims[level]; + if (iters[level] == null) { + iters[level] = dim.values.iterator(); + } + + Iterator<E> it = iters[level]; + if (it.hasNext() == false) { + if (level == 0) + break; + r.remove(dim.col); + iters[level] = null; + level--; + continue; + } + + r.put(dim.col, it.next()); + if (level == dims.length - 1) { + result.add(new HashMap<TblColRef, E>(r)); + } else { + level++; + } + } + return result; + } + + private static <E> Dim[] toDims(Map<TblColRef, Set<E>> fuzzyValues) { + Dim[] dims = new Dim[fuzzyValues.size()]; + int i = 0; + for (Entry<TblColRef, Set<E>> entry : fuzzyValues.entrySet()) { + dims[i] = new Dim(); + dims[i].col = entry.getKey(); + dims[i].values = entry.getValue(); + if (dims[i].values == null) + dims[i].values = Collections.emptySet(); + i++; + } + return dims; + } + + private static boolean exceedCap(Dim[] dims, long cap) { + return combCount(dims) > cap; + } + + private static long combCount(Dim[] dims) { + long count = 1; + for (Dim dim : dims) { + count *= Math.max(dim.values.size(), 1); + } + return count; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java index 2193eab..074b271 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java @@ -1,77 +1,77 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.mr; - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr; + import java.io.EOFException; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.source.ReadableTable; - -/** - */ -public class DFSFileTable implements ReadableTable { - - public static final String DELIM_AUTO = "auto"; - public static final String DELIM_COMMA = ","; - - String path; - String delim; - int nColumns; - - public DFSFileTable(String path, int nColumns) { - this(path, DELIM_AUTO, nColumns); - } - - public DFSFileTable(String path, String delim, int nColumns) { - this.path = path; - this.delim = delim; - this.nColumns = nColumns; - } - - public String getColumnDelimeter() { - return delim; - } - - @Override + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.source.ReadableTable; + +/** + */ +public class DFSFileTable implements ReadableTable { + + public static final String DELIM_AUTO = "auto"; + public static final String DELIM_COMMA = ","; + + String path; + String delim; + int nColumns; + + public DFSFileTable(String path, int nColumns) { + this(path, DELIM_AUTO, nColumns); + } + + public DFSFileTable(String path, String delim, int nColumns) { + this.path = path; + this.delim = delim; + this.nColumns = nColumns; + } + + public String getColumnDelimeter() { + return delim; + } + + @Override @Deprecated - public TableReader getReader() throws IOException { - return new DFSFileTableReader(path, delim, nColumns); - } - - @Override - public TableSignature getSignature() throws IOException { - try { - Pair<Long, Long> sizeAndLastModified = getSizeAndLastModified(path); - return new TableSignature(path, sizeAndLastModified.getFirst(), sizeAndLastModified.getSecond()); - } catch (FileNotFoundException ex) { - return null; - } - } - + public TableReader getReader() throws IOException { + return new DFSFileTableReader(path, delim, nColumns); + } + + @Override + public TableSignature getSignature() throws IOException { + try { + Pair<Long, Long> sizeAndLastModified = getSizeAndLastModified(path); + return new TableSignature(path, sizeAndLastModified.getFirst(), sizeAndLastModified.getSecond()); + } catch (FileNotFoundException ex) { + return null; + } + } + public Collection<TableReader> getReaders() { ArrayList<TableReader> readers = new ArrayList<>(); try { @@ -95,33 +95,33 @@ public class DFSFileTable implements ReadableTable { return readers; } - @Override - public String toString() { - return path; - } - - public static Pair<Long, Long> getSizeAndLastModified(String path) throws IOException { - FileSystem fs = HadoopUtil.getFileSystem(path); - - // get all contained files if path is directory - ArrayList<FileStatus> allFiles = new ArrayList<>(); - FileStatus status = fs.getFileStatus(new Path(path)); - if (status.isFile()) { - allFiles.add(status); - } else { - FileStatus[] listStatus = fs.listStatus(new Path(path)); - allFiles.addAll(Arrays.asList(listStatus)); - } - - long size = 0; - long lastModified = 0; - for (FileStatus file : allFiles) { - size += file.getLen(); - lastModified = Math.max(lastModified, file.getModificationTime()); - } - - return Pair.newPair(size, lastModified); - } + @Override + public String toString() { + return path; + } + + public static Pair<Long, Long> getSizeAndLastModified(String path) throws IOException { + FileSystem fs = HadoopUtil.getFileSystem(path); + + // get all contained files if path is directory + ArrayList<FileStatus> allFiles = new ArrayList<>(); + FileStatus status = fs.getFileStatus(new Path(path)); + if (status.isFile()) { + allFiles.add(status); + } else { + FileStatus[] listStatus = fs.listStatus(new Path(path)); + allFiles.addAll(Arrays.asList(listStatus)); + } + + long size = 0; + long lastModified = 0; + for (FileStatus file : allFiles) { + size += file.getLen(); + lastModified = Math.max(lastModified, file.getModificationTime()); + } + + return Pair.newPair(size, lastModified); + } private boolean isExceptionSayingNotSeqFile(IOException e) { if (e.getMessage() != null && e.getMessage().contains("not a SequenceFile")) @@ -132,4 +132,4 @@ public class DFSFileTable implements ReadableTable { return false; } -} +}