This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch sync in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 54bdae63cfca137c02cdf8749f93d152a9ac2010 Author: lidongsjtu <lid...@apache.org> AuthorDate: Tue Apr 3 11:11:36 2018 +0800 KYLIN-3315 allow each project to set its own source --- .../test/java/org/apache/kylin/job/DeployUtil.java | 4 +- .../java/org/apache/kylin/cube/CubeManager.java | 4 +- .../org/apache/kylin/dict/lookup/SnapshotCLI.java | 4 +- .../kylin/metadata/TableMetadataManager.java | 4 +- .../kylin/metadata/model/ExternalFilterDesc.java | 21 ++- .../apache/kylin/metadata/model/ISourceAware.java | 4 + .../org/apache/kylin/metadata/model/TableDesc.java | 13 +- .../kylin/metadata/project/ProjectInstance.java | 7 +- .../main/java/org/apache/kylin/source/ISource.java | 8 +- .../org/apache/kylin/source/SourceFactory.java | 62 --------- .../org/apache/kylin/source/SourceManager.java | 154 +++++++++++++++++++++ .../java/org/apache/kylin/engine/mr/MRUtil.java | 10 +- .../kylin/engine/mr/common/JobRelatedMetaUtil.java | 4 +- .../kylin/provision/BuildCubeWithStream.java | 4 +- .../org/apache/kylin/source/SourceManagerTest.java | 61 ++++++++ .../source/hive/ITHiveSourceTableLoaderTest.java | 4 +- .../kylin/source/hive/ITSnapshotManagerTest.java | 4 +- .../source/jdbc/ITJdbcSourceTableLoaderTest.java | 9 +- .../kylin/source/jdbc/ITJdbcTableReaderTest.java | 5 + .../kylin/query/schema/OLAPSchemaFactory.java | 25 ++-- .../org/apache/kylin/query/util/PushDownUtil.java | 3 +- .../kylin/rest/controller/TableController.java | 8 +- .../apache/kylin/rest/job/StorageCleanupJob.java | 4 +- .../org/apache/kylin/rest/service/JobService.java | 4 +- .../apache/kylin/rest/service/TableService.java | 55 ++++---- .../org/apache/kylin/source/hive/HiveSource.java | 12 +- .../org/apache/kylin/source/jdbc/JdbcSource.java | 11 +- .../org/apache/kylin/source/kafka/KafkaSource.java | 47 +++++-- .../tool/metrics/systemcube/KylinTableCreator.java | 15 +- 29 files changed, 401 insertions(+), 169 deletions(-) diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java index a418dc9..524c2e4 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -47,7 +47,7 @@ import org.apache.kylin.metadata.model.DataModelManager; import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.ISampleDataDeployer; -import org.apache.kylin.source.SourceFactory; +import org.apache.kylin.source.SourceManager; import org.apache.kylin.source.datagen.ModelDataGenerator; import org.apache.kylin.source.kafka.TimedJsonStreamParser; import org.apache.maven.model.Model; @@ -231,7 +231,7 @@ public class DeployUtil { } tempDir.deleteOnExit(); - ISampleDataDeployer sampleDataDeployer = SourceFactory.getSource(model.getRootFactTable().getTableDesc()) + ISampleDataDeployer sampleDataDeployer = SourceManager.getSource(model.getRootFactTable().getTableDesc()) .getSampleDataDeployer(); // create hive tables diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index b8deadb..fc2ad3d 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -69,7 +69,7 @@ import org.apache.kylin.metadata.realization.IRealizationProvider; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.source.IReadableTable; -import org.apache.kylin.source.SourceFactory; +import org.apache.kylin.source.SourceManager; import org.apache.kylin.source.SourcePartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1052,7 +1052,7 @@ public class CubeManager implements IRealizationProvider { SnapshotManager snapshotMgr = getSnapshotManager(); TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject())); - IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc); + IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc); SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc); segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath()); diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java index 2093d23..f965d18 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java @@ -23,7 +23,7 @@ import java.io.IOException; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.metadata.TableMetadataManager; import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.source.SourceFactory; +import org.apache.kylin.source.SourceManager; public class SnapshotCLI { @@ -42,7 +42,7 @@ public class SnapshotCLI { if (tableDesc == null) throw new IllegalArgumentException("Not table found by " + table); - SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(SourceFactory.createReadableTable(tableDesc), tableDesc, overwriteUUID); + SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(SourceManager.createReadableTable(tableDesc), tableDesc, overwriteUUID); System.out.println("resource path updated: " + snapshot.getResourcePath()); } } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java index 42233b7..116e210 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java @@ -118,7 +118,7 @@ public class TableMetadataManager { @Override protected TableDesc initEntityAfterReload(TableDesc t, String resourceName) { String prj = TableDesc.parseResourcePath(resourceName).getSecond(); - t.init(prj); + t.init(config, prj); return t; } }; @@ -237,7 +237,7 @@ public class TableMetadataManager { public void saveSourceTable(TableDesc srcTable, String prj) throws IOException { try (AutoLock lock = srcTableMapLock.lockForWrite()) { - srcTable.init(prj); + srcTable.init(config, prj); srcTableCrud.save(srcTable); } } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ExternalFilterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ExternalFilterDesc.java index 35018c7..7ef84aa 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ExternalFilterDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ExternalFilterDesc.java @@ -18,6 +18,7 @@ package org.apache.kylin.metadata.model; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.metadata.filter.function.Functions; @@ -62,7 +63,7 @@ public class ExternalFilterDesc extends RootPersistentEntity implements ISourceA public String resourceName() { return name; } - + public String getFilterResourceIdentifier() { return filterResourceIdentifier; } @@ -94,7 +95,8 @@ public class ExternalFilterDesc extends RootPersistentEntity implements ISourceA @Override public String toString() { - return "ExternalFilterDesc [ name=" + name + " filter table resource identifier " + this.filterResourceIdentifier + "]"; + return "ExternalFilterDesc [ name=" + name + " filter table resource identifier " + + this.filterResourceIdentifier + "]"; } /** create a mockup table for unit test */ @@ -104,11 +106,6 @@ public class ExternalFilterDesc extends RootPersistentEntity implements ISourceA return mockup; } - @Override - public int getSourceType() { - return sourceType; - } - public void setSourceType(int sourceType) { this.sourceType = sourceType; } @@ -120,4 +117,14 @@ public class ExternalFilterDesc extends RootPersistentEntity implements ISourceA public void setDescription(String description) { this.description = description; } + + @Override + public int getSourceType() { + return sourceType; + } + + @Override + public KylinConfig getConfig() { + return null; + } } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java index 7ab1bca..eab3e2c 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java @@ -18,6 +18,8 @@ package org.apache.kylin.metadata.model; +import org.apache.kylin.common.KylinConfig; + public interface ISourceAware { public static final int ID_HIVE = 0; @@ -27,4 +29,6 @@ public interface ISourceAware { public static final int ID_JDBC = 8; int getSourceType(); + + KylinConfig getConfig(); } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java index be278de..a9e9877 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.util.Pair; @@ -98,6 +99,7 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware { private String dataGen; private String project; + private KylinConfig config; private DatabaseDesc database = new DatabaseDesc(); private String identity = null; private boolean isBorrowedFromGlobal = false; @@ -121,6 +123,7 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware { } this.project = other.project; + this.config = other.config; this.database.setName(other.getDatabase()); this.identity = other.identity; } @@ -287,9 +290,10 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware { return dataGen; } - public void init(String project) { + public void init(KylinConfig config, String project) { this.project = project; - + this.config = config; + if (name != null) name = name.toUpperCase(); @@ -372,6 +376,11 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware { return sourceType; } + @Override + public KylinConfig getConfig() { + return config; + } + public void setSourceType(int sourceType) { this.sourceType = sourceType; } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java index 9b7aaf2..45622f3 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java @@ -31,6 +31,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; +import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.realization.RealizationType; import com.fasterxml.jackson.annotation.JsonAutoDetect; @@ -47,7 +48,7 @@ import com.google.common.collect.Lists; */ @SuppressWarnings("serial") @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class ProjectInstance extends RootPersistentEntity { +public class ProjectInstance extends RootPersistentEntity implements ISourceAware { public static final String DEFAULT_PROJECT_NAME = "default"; @@ -338,4 +339,8 @@ public class ProjectInstance extends RootPersistentEntity { return "ProjectDesc [name=" + name + "]"; } + @Override + public int getSourceType() { + return getConfig().getDefaultSource(); + } } diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java index 42548ae..2c5a922 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java @@ -18,13 +18,15 @@ package org.apache.kylin.source; +import java.io.Closeable; + import org.apache.kylin.metadata.model.IBuildable; import org.apache.kylin.metadata.model.TableDesc; /** * Represents a kind of source to Kylin, like Hive. */ -public interface ISource { +public interface ISource extends Closeable { /** * Return an explorer to sync table metadata from the data source. @@ -41,13 +43,13 @@ public interface ISource { * Return a ReadableTable that can iterate through the rows of given table. */ IReadableTable createReadableTable(TableDesc tableDesc); - + /** * Give the source a chance to enrich a SourcePartition before build start. * Particularly, Kafka source use this chance to define start/end offsets within each partition. */ SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition); - + /** * Return an object that is responsible for deploying sample (CSV) data to the source database. * For testing purpose. diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java deleted file mode 100644 index 365b505..0000000 --- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.source; - -import java.util.List; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.ImplementationSwitch; -import org.apache.kylin.metadata.model.ISourceAware; -import org.apache.kylin.metadata.model.TableDesc; - -public class SourceFactory { - - // Use thread-local because KylinConfig can be thread-local and implementation might be different among multiple threads. - private static ThreadLocal<ImplementationSwitch<ISource>> sources = new ThreadLocal<>(); - - private static ISource getSource(int sourceType) { - ImplementationSwitch<ISource> current = sources.get(); - if (current == null) { - current = new ImplementationSwitch<>(KylinConfig.getInstanceFromEnv().getSourceEngines(), ISource.class); - sources.set(current); - } - return current.get(sourceType); - } - - public static ISource getDefaultSource() { - return getSource(KylinConfig.getInstanceFromEnv().getDefaultSource()); - } - - public static ISource getSource(ISourceAware aware) { - return getSource(aware.getSourceType()); - } - - public static IReadableTable createReadableTable(TableDesc table) { - return getSource(table).createReadableTable(table); - } - - public static <T> T createEngineAdapter(ISourceAware table, Class<T> engineInterface) { - return getSource(table).adaptToBuildEngine(engineInterface); - } - - public static List<String> getMRDependentResources(TableDesc table) { - return getSource(table).getSourceMetadataExplorer().getRelatedKylinResources(table); - } - -} diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java new file mode 100644 index 0000000..62c4368 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.metadata.model.ISourceAware; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.project.ProjectManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + +public class SourceManager { + private static final Logger logger = LoggerFactory.getLogger(SourceManager.class); + + private final KylinConfig systemConfig; + private final Cache<String, ISource> sourceMap; + + public static SourceManager getInstance(KylinConfig config) { + return config.getManager(SourceManager.class); + } + + // called by reflection + static SourceManager newInstance(KylinConfig config) throws IOException { + return new SourceManager(config); + } + + // ============================================ + + private SourceManager(KylinConfig config) { + this.systemConfig = config; + this.sourceMap = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.DAYS) + .removalListener(new RemovalListener<String, ISource>() { + @Override + public void onRemoval(RemovalNotification<String, ISource> entry) { + ISource s = entry.getValue(); + if (s != null) { + try { + s.close(); + } catch (Throwable e) { + logger.error("Failed to close ISource: {}", s.getClass().getName(), e); + } + } + } + }).build(); + } + + public ISource getCachedSource(ISourceAware aware) { + String key = createSourceCacheKey(aware); + ISource source = sourceMap.getIfPresent(key); + if (source != null) + return source; + + synchronized (this) { + source = sourceMap.getIfPresent(key); + if (source != null) + return source; + + source = createSource(aware); + sourceMap.put(key, source); + return source; + } + } + + public ISource getProjectSource(String projectName) { + ProjectInstance projectInstance = ProjectManager.getInstance(systemConfig).getProject(projectName); + if (projectInstance != null) + return getCachedSource(projectInstance); + else + return getDefaultSource(); + } + + private String createSourceCacheKey(ISourceAware aware) { + StringBuilder builder = new StringBuilder(); + builder.append(aware.getSourceType()).append('|'); + + KylinConfig config = aware.getConfig(); + builder.append(config.getJdbcSourceConnectionUrl()).append('|'); + builder.append(config.getJdbcSourceDriver()).append('|'); + builder.append(config.getJdbcSourceUser()).append('|'); + builder.append(config.getJdbcSourceFieldDelimiter()).append('|'); + builder.append(config.getJdbcSourceDialect()).append('|'); + return builder.toString(); // jdbc password not needed, because url+user should be identical. + } + + private ISource createSource(ISourceAware aware) { + String sourceClazz = systemConfig.getSourceEngines().get(aware.getSourceType()); + try { + return ClassUtil.forName(sourceClazz, ISource.class).getDeclaredConstructor(KylinConfig.class) + .newInstance(aware.getConfig()); + } catch (Throwable e) { + logger.error("Failed to create source: SourceType={}", aware.getSourceType(), e); + return null; + } + } + + // ========================================================== + + public static ISource getSource(ISourceAware aware) { + return getInstance(aware.getConfig()).getCachedSource(aware); + } + + public static ISource getDefaultSource() { + final KylinConfig sysConfig = KylinConfig.getInstanceFromEnv(); + return getSource(new ISourceAware() { + @Override + public int getSourceType() { + return sysConfig.getDefaultSource(); + } + + @Override + public KylinConfig getConfig() { + return sysConfig; + } + }); + } + + public static IReadableTable createReadableTable(TableDesc table) { + return getSource(table).createReadableTable(table); + } + + public static <T> T createEngineAdapter(ISourceAware table, Class<T> engineInterface) { + return getSource(table).adaptToBuildEngine(engineInterface); + } + + public static List<String> getMRDependentResources(TableDesc table) { + return getSource(table).getSourceMetadataExplorer().getRelatedKylinResources(table); + } +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java index 124e5e7..3a9d0ed 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java @@ -33,23 +33,23 @@ import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2; import org.apache.kylin.metadata.TableMetadataManager; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.source.SourceFactory; +import org.apache.kylin.source.SourceManager; import org.apache.kylin.storage.StorageFactory; public class MRUtil { public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) { IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg); - return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc); + return SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc); } public static IMRTableInputFormat getTableInputFormat(String tableName, String prj) { TableDesc t = getTableDesc(tableName, prj); - return SourceFactory.createEngineAdapter(t, IMRInput.class).getTableInputFormat(t); + return SourceManager.createEngineAdapter(t, IMRInput.class).getTableInputFormat(t); } public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) { - return SourceFactory.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc); + return SourceManager.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc); } private static TableDesc getTableDesc(String tableName, String prj) { @@ -73,7 +73,7 @@ public class MRUtil { } public static IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) { - return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg); + return SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg); } public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java index c34245b..2cd1841 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java @@ -24,7 +24,7 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableRef; -import org.apache.kylin.source.SourceFactory; +import org.apache.kylin.source.SourceManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +47,7 @@ public class JobRelatedMetaUtil { for (TableRef tableRef : cube.getDescriptor().getModel().getAllTables()) { TableDesc table = tableRef.getTableDesc(); dumpList.add(table.getResourcePath()); - dumpList.addAll(SourceFactory.getMRDependentResources(table)); + dumpList.addAll(SourceManager.getMRDependentResources(table)); } return dumpList; diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index 181e8b9..216ccc1 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -62,7 +62,7 @@ import org.apache.kylin.metadata.streaming.StreamingConfig; import org.apache.kylin.metadata.streaming.StreamingManager; import org.apache.kylin.rest.job.StorageCleanupJob; import org.apache.kylin.source.ISource; -import org.apache.kylin.source.SourceFactory; +import org.apache.kylin.source.SourceManager; import org.apache.kylin.source.SourcePartition; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.config.BrokerConfig; @@ -282,7 +282,7 @@ public class BuildCubeWithStream { protected ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception { CubeInstance cubeInstance = cubeManager.getCube(cubeName); - ISource source = SourceFactory.getSource(cubeInstance); + ISource source = SourceManager.getSource(cubeInstance); SourcePartition partition = source.enrichSourcePartitionBeforeBuild(cubeInstance, new SourcePartition(null, new SegmentRange(startOffset, endOffset), null, null)); CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), partition); diff --git a/kylin-it/src/test/java/org/apache/kylin/source/SourceManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/source/SourceManagerTest.java new file mode 100644 index 0000000..1d7440e --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/source/SourceManagerTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.metadata.model.ISourceAware; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class SourceManagerTest extends LocalFileMetadataTestCase { + @BeforeClass + public static void beforeClass() { + staticCreateTestMetadata(); + } + + @AfterClass + public static void afterClass() throws Exception { + cleanAfterClass(); + } + + @Test + public void testGetSource() { + final KylinConfig config = getTestConfig(); + SourceManager sourceManager = SourceManager.getInstance(config); + ISource source = sourceManager.getCachedSource(new ISourceAware() { + @Override + public int getSourceType() { + return config.getDefaultSource(); + } + + @Override + public KylinConfig getConfig() { + return config; + } + }); + + Assert.assertEquals(config.getSourceEngines().get(config.getDefaultSource()), source.getClass().getName()); + Assert.assertEquals(source, SourceManager.getDefaultSource()); + Assert.assertEquals(source, SourceManager.getInstance(getTestConfig()).getProjectSource(null)); + } + +} diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java index 8e57bed..a5aea1b 100644 --- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java @@ -26,7 +26,7 @@ import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.source.ISource; import org.apache.kylin.source.ISourceMetadataExplorer; -import org.apache.kylin.source.SourceFactory; +import org.apache.kylin.source.SourceManager; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -45,7 +45,7 @@ public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase { @Test public void test() throws Exception { - ISource source = SourceFactory.getDefaultSource(); + ISource source = SourceManager.getDefaultSource(); ISourceMetadataExplorer explr = source.getSourceMetadataExplorer(); Pair<TableDesc, TableExtDesc> pair; diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java index 384aa95..031da29 100644 --- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java @@ -28,7 +28,7 @@ import org.apache.kylin.metadata.TableMetadataManager; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.source.IReadableTable; import org.apache.kylin.source.IReadableTable.TableReader; -import org.apache.kylin.source.SourceFactory; +import org.apache.kylin.source.SourceManager; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -56,7 +56,7 @@ public class ITSnapshotManagerTest extends HBaseMetadataTestCase { public void basicTest() throws Exception { String tableName = "EDW.TEST_SITES"; TableDesc tableDesc = TableMetadataManager.getInstance(getTestConfig()).getTableDesc(tableName, "default"); - IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc); + IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc); String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc).getResourcePath(); snapshotMgr.wipeoutCache(); diff --git a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java index 3869cb6..557e2e7 100644 --- a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java @@ -35,7 +35,7 @@ import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.query.H2Database; import org.apache.kylin.source.ISource; import org.apache.kylin.source.ISourceMetadataExplorer; -import org.apache.kylin.source.SourceFactory; +import org.apache.kylin.source.SourceManager; import org.apache.kylin.source.datagen.ModelDataGenerator; import org.junit.After; import org.junit.Before; @@ -94,7 +94,7 @@ public class ITJdbcSourceTableLoaderTest extends LocalFileMetadataTestCase imple @Test public void test() throws Exception { - ISource source = SourceFactory.getSource(new ITJdbcSourceTableLoaderTest()); + ISource source = SourceManager.getSource(new ITJdbcSourceTableLoaderTest()); ISourceMetadataExplorer explr = source.getSourceMetadataExplorer(); Pair<TableDesc, TableExtDesc> pair; @@ -111,4 +111,9 @@ public class ITJdbcSourceTableLoaderTest extends LocalFileMetadataTestCase imple return ISourceAware.ID_JDBC; } + @Override + public KylinConfig getConfig() { + return config; + } + } diff --git a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java index 4a5bfe4..4441178 100644 --- a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java @@ -106,4 +106,9 @@ public class ITJdbcTableReaderTest extends LocalFileMetadataTestCase implements return ISourceAware.ID_JDBC; } + @Override + public KylinConfig getConfig() { + return getTestConfig(); + } + } diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java index 25baf55..a1935fe 100644 --- a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java +++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java @@ -49,19 +49,22 @@ public class OLAPSchemaFactory implements SchemaFactory { @Override public Schema create(SchemaPlus parentSchema, String schemaName, Map<String, Object> operand) { String project = (String) operand.get(SCHEMA_PROJECT); - Schema newSchema = new OLAPSchema(project, schemaName, exposeMore()); + Schema newSchema = new OLAPSchema(project, schemaName, exposeMore(project)); return newSchema; } private static Map<String, File> cachedJsons = Maps.newConcurrentMap(); - public static boolean exposeMore() { - return KylinConfig.getInstanceFromEnv().isPushDownEnabled(); + public static boolean exposeMore(String project) { + return ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project).getConfig() + .isPushDownEnabled(); } public static File createTempOLAPJson(String project, KylinConfig config) { - Collection<TableDesc> tables = ProjectManager.getInstance(config).listExposedTables(project, exposeMore()); + ProjectManager projectManager = ProjectManager.getInstance(config); + KylinConfig projConfig = projectManager.getProject(project).getConfig(); + Collection<TableDesc> tables = projectManager.listExposedTables(project, exposeMore(project)); // "database" in TableDesc correspond to our schema // the logic to decide which schema to be "default" in calcite: @@ -92,17 +95,16 @@ public class OLAPSchemaFactory implements SchemaFactory { int counter = 0; - - + String schemaFactory = projConfig.getSchemaFactory(); for (String schemaName : schemaCounts.keySet()) { out.append(" {\n"); out.append(" \"type\": \"custom\",\n"); out.append(" \"name\": \"" + schemaName + "\",\n"); - out.append(" \"factory\": \"" + KylinConfig.getInstanceFromEnv().getSchemaFactory()+ "\",\n"); + out.append(" \"factory\": \"" + schemaFactory + "\",\n"); out.append(" \"operand\": {\n"); out.append(" \"" + SCHEMA_PROJECT + "\": \"" + project + "\"\n"); out.append(" },\n"); - createOLAPSchemaFunctions(out); + createOLAPSchemaFunctions(projConfig.getUDFs(), out); out.append(" }\n"); if (++counter != schemaCounts.size()) { @@ -132,9 +134,12 @@ public class OLAPSchemaFactory implements SchemaFactory { } } - private static void createOLAPSchemaFunctions(StringBuilder out) throws IOException { + private static void createOLAPSchemaFunctions(Map<String, String> definedUdfs, StringBuilder out) + throws IOException { Map<String, String> udfs = Maps.newHashMap(); - udfs.putAll(KylinConfig.getInstanceFromEnv().getUDFs()); + if (definedUdfs != null) + udfs.putAll(definedUdfs); + for (Entry<String, Class<?>> entry : MeasureTypeFactory.getUDAFs().entrySet()) { udfs.put(entry.getKey(), entry.getValue().getName()); } diff --git a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java index cfe16c0..7c88141 100644 --- a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java +++ b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java @@ -47,6 +47,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.metadata.model.tool.CalciteParser; +import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; import org.apache.kylin.metadata.realization.NoRealizationFoundException; import org.apache.kylin.metadata.realization.RoutingIndicatorException; @@ -74,7 +75,7 @@ public class PushDownUtil { private static Pair<List<List<String>>, List<SelectedColumnMeta>> tryPushDownQuery(String project, String sql, String defaultSchema, SQLException sqlException, boolean isSelect, boolean isPrepare) throws Exception { - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + KylinConfig kylinConfig = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project).getConfig(); if (!kylinConfig.isPushDownEnabled()) return null; diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java index 35849f0..7ada8cc 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java @@ -182,9 +182,9 @@ public class TableController extends BasicController { */ @RequestMapping(value = "/hive", method = { RequestMethod.GET }, produces = { "application/json" }) @ResponseBody - private List<String> showHiveDatabases() throws IOException { + private List<String> showHiveDatabases(@RequestParam(value = "project", required = false) String project) throws IOException { try { - return tableService.getHiveDbNames(); + return tableService.getSourceDbNames(project); } catch (Throwable e) { logger.error(e.getLocalizedMessage(), e); throw new InternalErrorException(e.getLocalizedMessage()); @@ -199,9 +199,9 @@ public class TableController extends BasicController { */ @RequestMapping(value = "/hive/{database}", method = { RequestMethod.GET }, produces = { "application/json" }) @ResponseBody - private List<String> showHiveTables(@PathVariable String database) throws IOException { + private List<String> showHiveTables(@PathVariable String database, @RequestParam(value = "project", required = false) String project) throws IOException { try { - return tableService.getHiveTableNames(database); + return tableService.getSourceTableNames(project, database); } catch (Throwable e) { logger.error(e.getLocalizedMessage(), e); throw new InternalErrorException(e.getLocalizedMessage()); diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java index 59bd21f..114050c 100755 --- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java +++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java @@ -52,7 +52,7 @@ import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.source.ISourceMetadataExplorer; -import org.apache.kylin.source.SourceFactory; +import org.apache.kylin.source.SourceManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,7 +133,7 @@ public class StorageCleanupJob extends AbstractApplication { } protected List<String> getHiveTables() throws Exception { - ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer(); + ISourceMetadataExplorer explr = SourceManager.getDefaultSource().getSourceMetadataExplorer(); return explr.listTables(KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable()); } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 6afc568..4317ed5 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -61,7 +61,7 @@ import org.apache.kylin.rest.msg.Message; import org.apache.kylin.rest.msg.MsgPicker; import org.apache.kylin.rest.util.AclEvaluate; import org.apache.kylin.source.ISource; -import org.apache.kylin.source.SourceFactory; +import org.apache.kylin.source.SourceManager; import org.apache.kylin.source.SourcePartition; import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; import org.slf4j.Logger; @@ -230,7 +230,7 @@ public class JobService extends BasicService implements InitializingBean { CubeSegment newSeg = null; try { if (buildType == CubeBuildTypeEnum.BUILD) { - ISource source = SourceFactory.getSource(cube); + ISource source = SourceManager.getSource(cube); SourcePartition src = new SourcePartition(tsRange, segRange, sourcePartitionOffsetStart, sourcePartitionOffsetEnd); src = source.enrichSourcePartitionBeforeBuild(cube, src); diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java index d737a6a..ace1686 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java @@ -29,8 +29,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import javax.annotation.Nullable; - import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.Pair; @@ -40,11 +38,11 @@ import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.TableMetadataManager; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableExtDesc; +import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.streaming.StreamingConfig; import org.apache.kylin.rest.exception.BadRequestException; import org.apache.kylin.rest.msg.Message; @@ -52,7 +50,7 @@ import org.apache.kylin.rest.msg.MsgPicker; import org.apache.kylin.rest.response.TableDescResponse; import org.apache.kylin.rest.util.AclEvaluate; import org.apache.kylin.source.ISourceMetadataExplorer; -import org.apache.kylin.source.SourceFactory; +import org.apache.kylin.source.SourceManager; import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob; import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob; import org.apache.kylin.source.kafka.config.KafkaConfig; @@ -64,8 +62,6 @@ import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Component; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.SetMultimap; @@ -136,7 +132,7 @@ public class TableService extends BasicService { for (Pair<TableDesc, TableExtDesc> pair : allMeta) { TableDesc tableDesc = pair.getFirst(); TableExtDesc extDesc = pair.getSecond(); - + TableDesc origTable = metaMgr.getTableDesc(tableDesc.getIdentity(), project); if (origTable == null || origTable.getProject() == null) { tableDesc.setUuid(UUID.randomUUID().toString()); @@ -157,7 +153,7 @@ public class TableService extends BasicService { } extDesc.init(project); metaMgr.saveTableExt(extDesc, project); - + saved.add(tableDesc.getIdentity()); } @@ -176,14 +172,15 @@ public class TableService extends BasicService { // load all tables first List<Pair<TableDesc, TableExtDesc>> allMeta = Lists.newArrayList(); - ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer(); + ProjectInstance projectInstance = getProjectManager().getProject(project); + ISourceMetadataExplorer explr = SourceManager.getSource(projectInstance).getSourceMetadataExplorer(); for (Map.Entry<String, String> entry : db2tables.entries()) { Pair<TableDesc, TableExtDesc> pair = explr.loadTableMetadata(entry.getKey(), entry.getValue(), project); TableDesc tableDesc = pair.getFirst(); Preconditions.checkState(tableDesc.getDatabase().equals(entry.getKey().toUpperCase())); Preconditions.checkState(tableDesc.getName().equals(entry.getValue().toUpperCase())); - Preconditions.checkState(tableDesc.getIdentity().equals(entry.getKey().toUpperCase() + "." + entry - .getValue().toUpperCase())); + Preconditions.checkState(tableDesc.getIdentity() + .equals(entry.getKey().toUpperCase() + "." + entry.getValue().toUpperCase())); TableExtDesc extDesc = pair.getSecond(); Preconditions.checkState(tableDesc.getIdentity().equals(extDesc.getIdentity())); allMeta.add(pair); @@ -191,7 +188,8 @@ public class TableService extends BasicService { return allMeta; } - public Map<String, String[]> loadHiveTables(String[] tableNames, String project, boolean isNeedProfile) throws Exception { + public Map<String, String[]> loadHiveTables(String[] tableNames, String project, boolean isNeedProfile) + throws Exception { aclEvaluate.checkProjectAdminPermission(project); String submitter = SecurityContextHolder.getContext().getAuthentication().getName(); Map<String, String[]> result = new HashMap<String, String[]>(); @@ -258,13 +256,14 @@ public class TableService extends BasicService { tableName = normalizeHiveTableName(tableName); TableDesc desc = getTableManager().getTableDesc(tableName, project); - + // unload of legacy global table is not supported for now if (desc == null || desc.getProject() == null) { - logger.warn("Unload Table {} in Project {} failed, could not find TableDesc or related Project", tableName, project); + logger.warn("Unload Table {} in Project {} failed, could not find TableDesc or related Project", tableName, + project); return false; } - + tableType = desc.getSourceType(); if (!modelService.isTableInModel(desc, project)) { @@ -274,7 +273,7 @@ public class TableService extends BasicService { List<String> models = modelService.getModelsUsingTable(desc, project); throw new BadRequestException(String.format(msg.getTABLE_IN_USE_BY_MODEL(), models)); } - + // it is a project local table, ready to remove since no model is using it within the project TableMetadataManager metaMgr = getTableManager(); metaMgr.removeTableExt(tableName, project); @@ -313,30 +312,27 @@ public class TableService extends BasicService { /** * + * @param project * @return * @throws Exception */ - public List<String> getHiveDbNames() throws Exception { - ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer(); + public List<String> getSourceDbNames(String project) throws Exception { + ISourceMetadataExplorer explr = SourceManager.getInstance(getConfig()).getProjectSource(project) + .getSourceMetadataExplorer(); return explr.listDatabases(); } /** * + * @param project * @param database * @return * @throws Exception */ - public List<String> getHiveTableNames(String database) throws Exception { - ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer(); - List<String> hiveTableNames = explr.listTables(database); - Iterable<String> kylinApplicationTableNames = Iterables.filter(hiveTableNames, new Predicate<String>() { - @Override - public boolean apply(@Nullable String input) { - return input != null && !input.startsWith(MetadataConstants.KYLIN_INTERMEDIATE_PREFIX); - } - }); - return Lists.newArrayList(kylinApplicationTableNames); + public List<String> getSourceTableNames(String project, String database) throws Exception { + ISourceMetadataExplorer explr = SourceManager.getInstance(getConfig()).getProjectSource(project) + .getSourceMetadataExplorer(); + return explr.listTables(database); } private TableDescResponse cloneTableDesc(TableDesc table, String prj) { @@ -355,7 +351,8 @@ public class TableService extends BasicService { if (cards.length > i) { cardinality.put(columnDesc.getName(), Long.parseLong(cards[i])); } else { - logger.error("The result cardinality is not identical with hive table metadata, cardinality : " + scard + " column array length: " + cdescs.length); + logger.error("The result cardinality is not identical with hive table metadata, cardinality : " + + scard + " column array length: " + cdescs.length); break; } } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java index 129098c..58bd2c3 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java @@ -18,6 +18,8 @@ package org.apache.kylin.source.hive; +import java.io.IOException; + import org.apache.kylin.common.KylinConfig; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.metadata.model.IBuildable; @@ -28,8 +30,10 @@ import org.apache.kylin.source.ISource; import org.apache.kylin.source.ISourceMetadataExplorer; import org.apache.kylin.source.SourcePartition; -//used by reflection public class HiveSource implements ISource { + //used by reflection + public HiveSource(KylinConfig config) { + } @Override public ISourceMetadataExplorer getSourceMetadataExplorer() { @@ -53,7 +57,7 @@ public class HiveSource implements ISource { if (tableDesc.isView()) { KylinConfig config = KylinConfig.getInstanceFromEnv(); String tableName = tableDesc.getMaterializedName(); - + tableDesc = new TableDesc(); tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable()); tableDesc.setName(tableName); @@ -75,4 +79,8 @@ public class HiveSource implements ISource { return new HiveMetadataExplorer(); } + @Override + public void close() throws IOException { + // not needed + } } diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java index 5e06f90..ae3bbc5 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java @@ -18,6 +18,9 @@ package org.apache.kylin.source.jdbc; +import java.io.IOException; + +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.metadata.model.IBuildable; import org.apache.kylin.metadata.model.TableDesc; @@ -27,8 +30,10 @@ import org.apache.kylin.source.ISource; import org.apache.kylin.source.ISourceMetadataExplorer; import org.apache.kylin.source.SourcePartition; -//used by reflection public class JdbcSource implements ISource { + //used by reflection + public JdbcSource(KylinConfig config) { + } @Override public ISourceMetadataExplorer getSourceMetadataExplorer() { @@ -62,4 +67,8 @@ public class JdbcSource implements ISource { return new JdbcExplorer(); } + @Override + public void close() throws IOException { + // not needed + } } diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index 1142243..0ab83c6 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -18,6 +18,7 @@ package org.apache.kylin.source.kafka; +import java.io.IOException; import java.util.List; import java.util.Map; @@ -46,11 +47,14 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; -//used by reflection public class KafkaSource implements ISource { private static final Logger logger = LoggerFactory.getLogger(KafkaSource.class); + //used by reflection + public KafkaSource(KylinConfig config) { + } + @SuppressWarnings("unchecked") @Override public <I> I adaptToBuildEngine(Class<I> engineInterface) { @@ -75,20 +79,25 @@ public class KafkaSource implements ISource { if (range == null || range.start.v.equals(0L)) { final CubeSegment last = cube.getLastSegment(); if (last != null) { - logger.debug("Last segment exists, continue from last segment " + last.getName() + "'s end position: " + last.getSourcePartitionOffsetEnd()); + logger.debug("Last segment exists, continue from last segment " + last.getName() + "'s end position: " + + last.getSourcePartitionOffsetEnd()); // from last seg's end position result.setSourcePartitionOffsetStart(last.getSourcePartitionOffsetEnd()); - } else if (cube.getDescriptor().getPartitionOffsetStart() != null && cube.getDescriptor().getPartitionOffsetStart().size() > 0) { - logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: " + cube.getDescriptor().getPartitionOffsetStart()); + } else if (cube.getDescriptor().getPartitionOffsetStart() != null + && cube.getDescriptor().getPartitionOffsetStart().size() > 0) { + logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: " + + cube.getDescriptor().getPartitionOffsetStart()); result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart()); } else { // from the topic's earliest offset; - logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset."); + logger.debug( + "Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset."); result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube)); } } - final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable()); + final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()) + .getKafkaConfig(cube.getRootFactTable()); final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig); final String topic = kafkaConfig.getTopic(); try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) { @@ -111,7 +120,9 @@ public class KafkaSource implements ISource { for (Integer partitionId : latestOffsets.keySet()) { if (result.getSourcePartitionOffsetStart().containsKey(partitionId)) { if (result.getSourcePartitionOffsetStart().get(partitionId) > latestOffsets.get(partitionId)) { - throw new IllegalArgumentException("Partition " + partitionId + " end offset (" + latestOffsets.get(partitionId) + ") is smaller than start offset ( " + result.getSourcePartitionOffsetStart().get(partitionId) + ")"); + throw new IllegalArgumentException("Partition " + partitionId + " end offset (" + + latestOffsets.get(partitionId) + ") is smaller than start offset ( " + + result.getSourcePartitionOffsetStart().get(partitionId) + ")"); } } else { throw new IllegalStateException("New partition added in between, retry."); @@ -129,7 +140,8 @@ public class KafkaSource implements ISource { } if (totalStartOffset > totalEndOffset) { - throw new IllegalArgumentException("Illegal offset: start: " + totalStartOffset + ", end: " + totalEndOffset); + throw new IllegalArgumentException( + "Illegal offset: start: " + totalStartOffset + ", end: " + totalEndOffset); } if (totalStartOffset == totalEndOffset) { @@ -155,7 +167,8 @@ public class KafkaSource implements ISource { if (startOffset > 0) { if (sourcePartitionOffsetStart == null || sourcePartitionOffsetStart.size() == 0) { - throw new IllegalArgumentException("When 'startOffset' is > 0, need provide each partition's start offset"); + throw new IllegalArgumentException( + "When 'startOffset' is > 0, need provide each partition's start offset"); } long totalOffset = 0; @@ -164,13 +177,15 @@ public class KafkaSource implements ISource { } if (totalOffset != startOffset) { - throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'"); + throw new IllegalArgumentException( + "Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'"); } } if (endOffset > 0 && endOffset != Long.MAX_VALUE) { if (sourcePartitionOffsetEnd == null || sourcePartitionOffsetEnd.size() == 0) { - throw new IllegalArgumentException("When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset"); + throw new IllegalArgumentException( + "When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset"); } long totalOffset = 0; @@ -179,7 +194,8 @@ public class KafkaSource implements ISource { } if (totalOffset != endOffset) { - throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'"); + throw new IllegalArgumentException( + "Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'"); } } } @@ -199,7 +215,8 @@ public class KafkaSource implements ISource { } @Override - public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table, String prj) throws Exception { + public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table, String prj) + throws Exception { throw new UnsupportedOperationException(); } @@ -223,4 +240,8 @@ public class KafkaSource implements ISource { throw new UnsupportedOperationException(); } + @Override + public void close() throws IOException { + // not needed + } } diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java index 8aac466..a2a0616 100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java @@ -54,38 +54,39 @@ public class KylinTableCreator { List<Pair<String, String>> columns = Lists.newLinkedList(); columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQuery()); columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable()); - return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectQuery(), columns); + return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectQuery(), columns); } public static TableDesc generateKylinTableForMetricsQueryCube(KylinConfig kylinConfig, SinkTool sinkTool) { List<Pair<String, String>> columns = Lists.newLinkedList(); columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryCube()); columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable()); - return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectQueryCube(), columns); + return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectQueryCube(), columns); } public static TableDesc generateKylinTableForMetricsQueryRPC(KylinConfig kylinConfig, SinkTool sinkTool) { List<Pair<String, String>> columns = Lists.newLinkedList(); columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryRPC()); columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable()); - return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectQueryRpcCall(), columns); + return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectQueryRpcCall(), columns); } public static TableDesc generateKylinTableForMetricsJob(KylinConfig kylinConfig, SinkTool sinkTool) { List<Pair<String, String>> columns = Lists.newLinkedList(); columns.addAll(HiveTableCreator.getHiveColumnsForMetricsJob()); columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable()); - return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectJob(), columns); + return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectJob(), columns); } public static TableDesc generateKylinTableForMetricsJobException(KylinConfig kylinConfig, SinkTool sinkTool) { List<Pair<String, String>> columns = Lists.newLinkedList(); columns.addAll(HiveTableCreator.getHiveColumnsForMetricsJobException()); columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable()); - return generateKylinTable(sinkTool, kylinConfig.getKylinMetricsSubjectJobException(), columns); + return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectJobException(), columns); } - public static TableDesc generateKylinTable(SinkTool sinkTool, String subject, List<Pair<String, String>> columns) { + public static TableDesc generateKylinTable(KylinConfig kylinConfig, SinkTool sinkTool, String subject, + List<Pair<String, String>> columns) { TableDesc kylinTable = new TableDesc(); Pair<String, String> tableNameSplits = ActiveReservoirReporter @@ -107,7 +108,7 @@ public class KylinTableCreator { } kylinTable.setColumns(columnDescs); - kylinTable.init(MetricsManager.SYSTEM_PROJECT); + kylinTable.init(kylinConfig, MetricsManager.SYSTEM_PROJECT); return kylinTable; } -- To stop receiving notification emails like this one, please contact liy...@apache.org.