This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit b2781a7076bf99fe413d8e9e57525de77122f6d0 Author: XiaoxiangYu <x...@apache.org> AuthorDate: Mon Aug 22 18:36:12 2022 +0800 KYLIN-5229 Use Hashmap in PropertiesDelegate other than Properties --- .../service/task/QueryHistoryTaskScheduler.java | 2 +- .../apache/kylin/common/PropertiesDelegate.java | 48 ++++++++++++++-------- .../job/impl/threadpool/NDefaultScheduler.java | 14 ++++--- 3 files changed, 40 insertions(+), 24 deletions(-) diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java index 4219fc538c..977dcfb5b8 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java @@ -94,7 +94,7 @@ public class QueryHistoryTaskScheduler { queryHistoryAccelerateRunner = new QueryHistoryAccelerateRunner(false); queryHistoryMetaUpdateRunner = new QueryHistoryMetaUpdateRunner(); if (querySmartSupporter == null && SpringContext.getApplicationContext() != null - && !KylinConfig.vendor().equals("kylin")) { + && KylinConfig.vendor().equals("kyligence")) { querySmartSupporter = SpringContext.getBean(QuerySmartSupporter.class); } log.debug("New QueryHistoryAccelerateScheduler created by project {}", project); diff --git a/src/core-common/src/main/java/org/apache/kylin/common/PropertiesDelegate.java b/src/core-common/src/main/java/org/apache/kylin/common/PropertiesDelegate.java index 4714bfb0fb..671a20d0b1 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/PropertiesDelegate.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/PropertiesDelegate.java @@ -18,36 +18,42 @@ package org.apache.kylin.common; +import com.google.common.collect.Maps; +import io.kyligence.config.core.loader.IExternalConfigLoader; +import io.kyligence.config.external.loader.NacosExternalConfigLoader; +import lombok.EqualsAndHashCode; + +import java.util.Collections; import java.util.Enumeration; import java.util.Map; import java.util.Properties; import java.util.Set; - -import io.kyligence.config.core.loader.IExternalConfigLoader; -import lombok.EqualsAndHashCode; +import java.util.concurrent.ConcurrentMap; @EqualsAndHashCode public class PropertiesDelegate extends Properties { @EqualsAndHashCode.Include - private final Properties properties; + private final ConcurrentMap<Object, Object> properties = Maps.newConcurrentMap(); @EqualsAndHashCode.Include private final transient IExternalConfigLoader configLoader; public PropertiesDelegate(Properties properties, IExternalConfigLoader configLoader) { - this.properties = properties; + if(configLoader != null) this.properties.putAll(configLoader.getProperties()); + this.properties.putAll(properties); this.configLoader = configLoader; } - public synchronized void reloadProperties(Properties properties) { + public void reloadProperties(Properties properties) { this.properties.clear(); + if(configLoader != null) this.properties.putAll(configLoader.getProperties()); this.properties.putAll(properties); } @Override public String getProperty(String key) { - String property = this.properties.getProperty(key); + String property = (String) this.properties.get(key); if (property == null && this.configLoader != null) { return configLoader.getProperty(key); } @@ -64,12 +70,12 @@ public class PropertiesDelegate extends Properties { } @Override - public synchronized Object put(Object key, Object value) { + public Object put(Object key, Object value) { return this.properties.put(key, value); } @Override - public synchronized Object setProperty(String key, String value) { + public Object setProperty(String key, String value) { return this.put(key, value); } @@ -79,21 +85,29 @@ public class PropertiesDelegate extends Properties { } @Override - public synchronized int size() { + public int size() { return getAllProperties().size(); } @Override - public synchronized Enumeration<Object> keys() { - return getAllProperties().keys(); + public Enumeration<Object> keys() { + return Collections.enumeration(getAllProperties().keySet()); } - private synchronized Properties getAllProperties() { - Properties propertiesView = new Properties(); - if (this.configLoader != null) { + private ConcurrentMap<Object, Object> getAllProperties() { + // When KylinExternalConfigLoader is enabled, properties is static + if (configLoader == null || configLoader.getClass().equals(KylinExternalConfigLoader.class)) { + return properties; + } else if (configLoader.getClass().equals(NacosExternalConfigLoader.class)) { + // When NacosExternalConfigLoader enabled, fetch config entries from remote for each call + // TODO: Kylin should call remote server in periodically, otherwise query concurrency + // maybe impacted badly + ConcurrentMap<Object, Object> propertiesView = Maps.newConcurrentMap(); propertiesView.putAll(this.configLoader.getProperties()); + propertiesView.putAll(this.properties); + return propertiesView; + } else { + throw new IllegalArgumentException(configLoader.getClass() + " is not supported "); } - propertiesView.putAll(this.properties); - return propertiesView; } } diff --git a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java index 8297b92239..2cd9cc0993 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java @@ -172,15 +172,17 @@ public class NDefaultScheduler implements Scheduler<AbstractExecutable> { logger.info("Fetching jobs every {} seconds", pollSecond); val fetcher = new FetcherRunner(this, jobPool, fetcherPool); - if (config.isCheckQuotaStorageEnabled()) { - fetcherPool.scheduleWithFixedDelay(new QuotaStorageCheckRunner(this), RandomUtils.nextInt(0, pollSecond), + fetcherPool.scheduleWithFixedDelay(new JobCheckRunner(this), RandomUtils.nextInt(0, pollSecond), pollSecond, + TimeUnit.SECONDS); + if(KylinConfig.vendor().equals("kyligence")) { + fetcherPool.scheduleWithFixedDelay(new LicenseCapacityCheckRunner(this), RandomUtils.nextInt(0, pollSecond), pollSecond, TimeUnit.SECONDS); + if (config.isCheckQuotaStorageEnabled()) { + fetcherPool.scheduleWithFixedDelay(new QuotaStorageCheckRunner(this), RandomUtils.nextInt(0, pollSecond), + pollSecond, TimeUnit.SECONDS); + } } - fetcherPool.scheduleWithFixedDelay(new JobCheckRunner(this), RandomUtils.nextInt(0, pollSecond), pollSecond, - TimeUnit.SECONDS); - fetcherPool.scheduleWithFixedDelay(new LicenseCapacityCheckRunner(this), RandomUtils.nextInt(0, pollSecond), - pollSecond, TimeUnit.SECONDS); fetcherPool.scheduleWithFixedDelay(fetcher, RandomUtils.nextInt(0, pollSecond), pollSecond, TimeUnit.SECONDS); hasStarted.set(true); }