This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new 8a21389 KYLIN-4906 Support query/job server dynamic register and discovery in kylin4 8a21389 is described below commit 8a21389f685dfbbcd9c9254e578d7e8dcf70f078 Author: zhengshengjun <shengjun_zh...@sina.com> AuthorDate: Thu Feb 18 18:02:21 2021 +0800 KYLIN-4906 Support query/job server dynamic register and discovery in kylin4 (cherry picked from commit fa37255b15bdf69e347ec747baf0731ce6634b39) (cherry picked from commit fa37255b15bdf69e347ec747baf0731ce6634b39) (cherry picked from commit fa37255b15bdf69e347ec747baf0731ce6634b39) --- core-common/pom.xml | 5 + .../java/org/apache/kylin/common/KConstants.java | 23 +++ .../java/org/apache/kylin/common/KylinConfig.java | 14 +- .../org/apache/kylin/common/KylinConfigBase.java | 4 + .../java/org/apache/kylin/common/ServerMode.java | 71 -------- .../org/apache/kylin/common/util/ServerMode.java | 5 +- .../common/zookeeper/KylinServerDiscovery.java | 197 +++++++++++++++++++++ .../kylin/common/zookeeper}/ExampleServer.java | 26 +-- .../common/zookeeper/KylinServerDiscoveryTest.java | 46 ++--- .../kylin/job/impl/curator/CuratorScheduler.java | 139 +-------------- .../job/impl/threadpool/DefaultScheduler.java | 11 +- .../job/impl/threadpool/DistributedScheduler.java | 11 +- .../kylin/rest/init/InitialSparderContext.java | 4 +- .../kylin/rest/service/AclTableMigrationTool.java | 3 +- .../org/apache/kylin/rest/service/JobService.java | 6 + .../apache/kylin/rest/service/QueryService.java | 8 +- 16 files changed, 298 insertions(+), 275 deletions(-) diff --git a/core-common/pom.xml b/core-common/pom.xml index 093d5ce..4680d4d 100644 --- a/core-common/pom.xml +++ b/core-common/pom.xml @@ -50,6 +50,11 @@ <artifactId>jackson-databind</artifactId> </dependency> <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-x-discovery</artifactId> + <scope>compile</scope> + </dependency> + <dependency> <groupId>org.freemarker</groupId> <artifactId>freemarker</artifactId> </dependency> diff --git a/core-common/src/main/java/org/apache/kylin/common/KConstants.java b/core-common/src/main/java/org/apache/kylin/common/KConstants.java new file mode 100644 index 0000000..5e1723c --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/KConstants.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common; + +public class KConstants { + public static final int DEFAULT_SERVICE_PORT = 7070; +} diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java index 7805a0f..a7d5b8b 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -18,7 +18,6 @@ package org.apache.kylin.common; -import org.apache.kylin.shaded.com.google.common.base.Preconditions; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.restclient.RestClient; @@ -48,6 +47,9 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import org.apache.kylin.shaded.com.google.common.base.Strings; +import org.apache.kylin.shaded.com.google.common.base.Preconditions; + /** */ public class KylinConfig extends KylinConfigBase { @@ -531,7 +533,15 @@ public class KylinConfig extends KylinConfigBase { String value = entry.getValue().toString(); orderedProperties.setProperty(key, value); } - + // Reset some properties which might be overriden by system properties + String[] systemProps = { "kylin.server.cluster-servers", "kylin.server.cluster-servers-with-mode" }; + for (String sysProp : systemProps) { + String sysPropValue = System.getProperty(sysProp); + if (!Strings.isNullOrEmpty(sysPropValue)) { + orderedProperties.setProperty(sysProp, sysPropValue); + } + } + final StringBuilder sb = new StringBuilder(); for (Map.Entry<String, String> entry : orderedProperties.entrySet()) { sb.append(entry.getKey() + "=" + entry.getValue()).append('\n'); diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 0fd24e2..2ed1cb5 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2163,6 +2163,10 @@ public abstract class KylinConfigBase implements Serializable { return getOptional("kylin.server.host-address", "localhost:7070"); } + public boolean getServerSelfDiscoveryEnabled() { + return Boolean.parseBoolean(getOptional("kylin.server.self-discovery-enabled", FALSE)); + } + public String getClusterName() { String key = "kylin.server.cluster-name"; String clusterName = this.getOptional(key, getMetadataUrlPrefix()); diff --git a/core-common/src/main/java/org/apache/kylin/common/ServerMode.java b/core-common/src/main/java/org/apache/kylin/common/ServerMode.java deleted file mode 100644 index fb3624d..0000000 --- a/core-common/src/main/java/org/apache/kylin/common/ServerMode.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common; - -public enum ServerMode { - - ALL("all"), JOB("job"), QUERY("query"); - - private final String name; - - ServerMode(String name) { - this.name = name; - } - - public String getName() { - return name; - } - - private static void validate(KylinConfig config) { - assert config != null; - } - - private static boolean match(ServerMode serverMode, KylinConfig config) { - validate(config); - return serverMode.getName().equals(config.getServerMode()); - } - - public static boolean isJob(KylinConfig config) { - return isJobOnly(config) || isAll(config); - } - - public static boolean isJob(String serverMode) { - return ALL.name.equals(serverMode) || JOB.name.equals(serverMode); - } - - public static boolean isJobOnly(KylinConfig config) { - return match(JOB, config); - } - - public static boolean isQueryOnly(KylinConfig config) { - return match(QUERY, config); - } - - public static boolean isQuery(KylinConfig config) { - return isQueryOnly(config) || isAll(config); - } - - public static boolean isAll(KylinConfig config) { - return match(ALL, config); - } - - public static boolean isQuery(String serverMode) { - return ALL.name.equals(serverMode) || QUERY.name.equals(serverMode); - } -} diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ServerMode.java b/core-common/src/main/java/org/apache/kylin/common/util/ServerMode.java index 14b7f18..314abe1 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/ServerMode.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/ServerMode.java @@ -62,7 +62,10 @@ public class ServerMode { public static ServerMode SERVER_MODE = getServerMode(); private static ServerMode getServerMode() { - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + return getServerMode(KylinConfig.getInstanceFromEnv()); + } + + public static ServerMode getServerMode(KylinConfig kylinConfig) { String serverModeStr = kylinConfig.getServerMode(); List<String> serverModes = Lists.newArrayList(); String[] serverModeArray = serverModeStr.split("\\s*,\\s*"); diff --git a/core-common/src/main/java/org/apache/kylin/common/zookeeper/KylinServerDiscovery.java b/core-common/src/main/java/org/apache/kylin/common/zookeeper/KylinServerDiscovery.java new file mode 100644 index 0000000..07f59ad --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/zookeeper/KylinServerDiscovery.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.zookeeper; + +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.x.discovery.ServiceCache; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; +import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.curator.x.discovery.details.InstanceSerializer; +import org.apache.curator.x.discovery.details.ServiceCacheListener; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.StringUtil; +import org.apache.kylin.common.util.ZKUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +public class KylinServerDiscovery implements Closeable { + + private static final Logger logger = LoggerFactory.getLogger(KylinServerDiscovery.class); + + public static final String SERVICE_PATH = "/service"; + public static final String SERVICE_NAME = "cluster_servers"; + public static final String SERVICE_PAYLOAD_DESCRIPTION = "description"; + + private static class SingletonHolder { + private static final KylinServerDiscovery INSTANCE = new KylinServerDiscovery(); + } + + public static KylinServerDiscovery getInstance() { + return SingletonHolder.INSTANCE; + } + + private final KylinConfig kylinConfig; + private final CuratorFramework curator; + private final ServiceDiscovery<LinkedHashMap> serviceDiscovery; + private final ServiceCache<LinkedHashMap> serviceCache; + + private KylinServerDiscovery() { + this(KylinConfig.getInstanceFromEnv()); + } + + @VisibleForTesting + protected KylinServerDiscovery(KylinConfig kylinConfig) { + this.kylinConfig = kylinConfig; + this.curator = ZKUtil.getZookeeperClient(kylinConfig); + try { + final JsonInstanceSerializer<LinkedHashMap> serializer = new JsonInstanceSerializer<>(LinkedHashMap.class); + serviceDiscovery = ServiceDiscoveryBuilder.builder(LinkedHashMap.class).client(curator) + .basePath(SERVICE_PATH).serializer(serializer).build(); + serviceDiscovery.start(); + + serviceCache = serviceDiscovery.serviceCacheBuilder().name(SERVICE_NAME) + .threadFactory( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("KylinServerTracker-%d").build()) + .build(); + + final AtomicBoolean isFinishInit = new AtomicBoolean(false); + serviceCache.addListener(new ServiceCacheListener() { + @Override + public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { + } + + @Override + public void cacheChanged() { + logger.info("Service discovery get cacheChanged notification"); + final List<ServiceInstance<LinkedHashMap>> instances = serviceCache.getInstances(); + Map<String, String> instanceNodes = Maps.newHashMapWithExpectedSize(instances.size()); + for (ServiceInstance<LinkedHashMap> entry : instances) { + instanceNodes.put(entry.getAddress() + ":" + entry.getPort(), + (String) entry.getPayload().get(SERVICE_PAYLOAD_DESCRIPTION)); + } + + logger.info("kylin.server.cluster-servers update to " + instanceNodes); + // update cluster servers + System.setProperty("kylin.server.cluster-servers", StringUtil.join(instanceNodes.keySet(), ",")); + + // get servers and its mode(query, job, all) + final String restServersInClusterWithMode = StringUtil.join(instanceNodes.entrySet().stream() + .map(input -> input.getKey() + ":" + input.getValue()).collect(Collectors.toList()), ","); + logger.info("kylin.server.cluster-servers-with-mode update to " + restServersInClusterWithMode); + System.setProperty("kylin.server.cluster-servers-with-mode", restServersInClusterWithMode); + isFinishInit.set(true); + } + }); + serviceCache.start(); + + registerSelf(); + while (!isFinishInit.get()) { + logger.info("Haven't registered, waiting ..."); + Thread.sleep(100L); + } + } catch (Exception e) { + throw new RuntimeException("Fail to initialize due to ", e); + } + } + + private void registerSelf() throws Exception { + String hostAddr = kylinConfig.getServerRestAddress(); + String[] hostAddrInfo = hostAddr.split(":"); + if (hostAddrInfo.length < 2) { + logger.error("kylin.server.host-address {} is not qualified ", hostAddr); + throw new RuntimeException("kylin.server.host-address " + hostAddr + " is not qualified"); + } + String host = hostAddrInfo[0]; + int port = Integer.parseInt(hostAddrInfo[1]); + + String serverMode = kylinConfig.getServerMode(); + registerServer(host, port, serverMode); + } + + private void registerServer(String host, int port, String mode) throws Exception { + final LinkedHashMap<String, String> instanceDetail = new LinkedHashMap<>(); + instanceDetail.put(SERVICE_PAYLOAD_DESCRIPTION, mode); + + ServiceInstance<LinkedHashMap> thisInstance = ServiceInstance.<LinkedHashMap> builder().name(SERVICE_NAME) + .payload(instanceDetail).port(port).address(host).build(); + + for (ServiceInstance<LinkedHashMap> instance : serviceCache.getInstances()) { + // Check for registered instances to avoid being double registered + if (instance.getAddress().equals(thisInstance.getAddress()) + && instance.getPort().equals(thisInstance.getPort())) { + serviceDiscovery.unregisterService(instance); + } + } + serviceDiscovery.registerService(thisInstance); + } + + @Override + public void close() throws IOException { + IOUtils.closeQuietly(serviceCache); + IOUtils.closeQuietly(serviceDiscovery); + } + + static class JsonInstanceSerializer<T> implements InstanceSerializer<T> { + private final ObjectMapper mapper; + private final Class<T> payloadClass; + private final JavaType type; + + JsonInstanceSerializer(Class<T> payloadClass) { + this.payloadClass = payloadClass; + this.mapper = new ObjectMapper(); + + // to bypass https://issues.apache.org/jira/browse/CURATOR-394 + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + this.type = this.mapper.getTypeFactory().constructType(ServiceInstance.class); + } + + public ServiceInstance<T> deserialize(byte[] bytes) throws Exception { + ServiceInstance rawServiceInstance = this.mapper.readValue(bytes, this.type); + this.payloadClass.cast(rawServiceInstance.getPayload()); + return rawServiceInstance; + } + + public byte[] serialize(ServiceInstance<T> instance) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + mapper.convertValue(instance.getPayload(), payloadClass); + this.mapper.writeValue(out, instance); + return out.toByteArray(); + } + } +} diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java b/core-common/src/test/java/org/apache/kylin/common/zookeeper/ExampleServer.java similarity index 61% rename from core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java rename to core-common/src/test/java/org/apache/kylin/common/zookeeper/ExampleServer.java index 66e3832..9e9fe95 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java +++ b/core-common/src/test/java/org/apache/kylin/common/zookeeper/ExampleServer.java @@ -16,38 +16,28 @@ * limitations under the License. */ -package org.apache.kylin.job.impl.curator; +package org.apache.kylin.common.zookeeper; import java.io.Closeable; import java.io.IOException; -import org.apache.curator.framework.CuratorFramework; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.ZKUtil; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.exception.SchedulerException; -import org.apache.kylin.job.lock.MockJobLock; /** */ public class ExampleServer implements Closeable { private String address; - private CuratorScheduler scheduler; + private KylinServerDiscovery discovery; - public ExampleServer(String address) throws Exception { + public ExampleServer(String address) { this.address = address; KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); KylinConfig kylinConfig1 = KylinConfig.createKylinConfig(kylinConfig); kylinConfig1.setProperty("kylin.server.host-address", address); - CuratorFramework client = ZKUtil.newZookeeperClient(kylinConfig1); - scheduler = new CuratorScheduler(client); - scheduler.init(new JobEngineConfig(kylinConfig1), new MockJobLock()); - if (!scheduler.hasStarted()) { - throw new RuntimeException("scheduler has not been started"); - } + discovery = new KylinServerDiscovery(kylinConfig1); } public String getAddress() { @@ -56,13 +46,7 @@ public class ExampleServer implements Closeable { @Override public void close() throws IOException { - - if (scheduler!= null) - try { - scheduler.shutdown(); - } catch (SchedulerException e) { - // - } + discovery.close(); } } diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java b/core-common/src/test/java/org/apache/kylin/common/zookeeper/KylinServerDiscoveryTest.java similarity index 71% rename from core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java rename to core-common/src/test/java/org/apache/kylin/common/zookeeper/KylinServerDiscoveryTest.java index 4cf1410..77dc673 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/zookeeper/KylinServerDiscoveryTest.java @@ -15,13 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kylin.job.impl.curator; +package org.apache.kylin.common.zookeeper; import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; - -import javax.annotation.Nullable; +import java.util.stream.Collectors; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.retry.ExponentialBackoffRetry; @@ -32,7 +31,6 @@ import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.common.util.ZKUtil; -import org.apache.kylin.job.execution.ExecutableManager; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -40,19 +38,16 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.kylin.shaded.com.google.common.base.Function; import org.apache.kylin.shaded.com.google.common.collect.Lists; /** */ -public class CuratorSchedulerTest extends LocalFileMetadataTestCase { +public class KylinServerDiscoveryTest extends LocalFileMetadataTestCase { - private static final Logger logger = LoggerFactory.getLogger(CuratorSchedulerTest.class); + private static final Logger logger = LoggerFactory.getLogger(KylinServerDiscoveryTest.class); private TestingServer zkTestServer; - protected ExecutableManager jobService; - @Before public void setup() throws Exception { zkTestServer = new TestingServer(); @@ -80,10 +75,9 @@ public class CuratorSchedulerTest extends LocalFileMetadataTestCase { ServiceDiscovery<LinkedHashMap> serviceDiscovery = null; CuratorFramework curatorClient = null; try { - - final CuratorScheduler.JsonInstanceSerializer<LinkedHashMap> serializer = new CuratorScheduler.JsonInstanceSerializer<>( - LinkedHashMap.class); - String servicePath = CuratorScheduler.KYLIN_SERVICE_PATH; + String servicePath = KylinServerDiscovery.SERVICE_PATH; + final KylinServerDiscovery.JsonInstanceSerializer<LinkedHashMap> serializer = + new KylinServerDiscovery.JsonInstanceSerializer<>(LinkedHashMap.class); curatorClient = ZKUtil.newZookeeperClient(zkString, new ExponentialBackoffRetry(3000, 3)); serviceDiscovery = ServiceDiscoveryBuilder.builder(LinkedHashMap.class).client(curatorClient) .basePath(servicePath).serializer(serializer).build(); @@ -94,36 +88,32 @@ public class CuratorSchedulerTest extends LocalFileMetadataTestCase { Collection<String> serviceNames = serviceDiscovery.queryForNames(); Assert.assertTrue(serviceNames.size() == 1); - Assert.assertTrue(CuratorScheduler.SERVICE_NAME.equals(serviceNames.iterator().next())); + Assert.assertTrue(KylinServerDiscovery.SERVICE_NAME.equals(serviceNames.iterator().next())); Collection<ServiceInstance<LinkedHashMap>> instances = serviceDiscovery - .queryForInstances(CuratorScheduler.SERVICE_NAME); + .queryForInstances(KylinServerDiscovery.SERVICE_NAME); Assert.assertTrue(instances.size() == 2); List<ServiceInstance<LinkedHashMap>> instancesList = Lists.newArrayList(instances); - final List<String> instanceNodes = Lists.transform(instancesList, - new Function<ServiceInstance<LinkedHashMap>, String>() { - - @Nullable - @Override - public String apply(@Nullable ServiceInstance<LinkedHashMap> stringServiceInstance) { - return (String) stringServiceInstance.getPayload() - .get(CuratorScheduler.SERVICE_PAYLOAD_DESCRIPTION); - } - }); + final List<String> instanceNodes = instancesList.stream() + .map(input -> input.getAddress() + ":" + input.getPort() + ":" + + input.getPayload().get(KylinServerDiscovery.SERVICE_PAYLOAD_DESCRIPTION)) + .collect(Collectors.toList()); Assert.assertTrue(instanceNodes.contains(server1.getAddress() + ":query")); Assert.assertTrue(instanceNodes.contains(server2.getAddress() + ":query")); // stop one server server1.close(); - instances = serviceDiscovery.queryForInstances(CuratorScheduler.SERVICE_NAME); + instances = serviceDiscovery.queryForInstances(KylinServerDiscovery.SERVICE_NAME); + ServiceInstance<LinkedHashMap> existingInstance = instances.iterator().next(); Assert.assertTrue(instances.size() == 1); Assert.assertEquals(server2.getAddress() + ":query", - instances.iterator().next().getPayload().get(CuratorScheduler.SERVICE_PAYLOAD_DESCRIPTION)); + existingInstance.getAddress() + ":" + existingInstance.getPort() + ":" + + existingInstance.getPayload().get(KylinServerDiscovery.SERVICE_PAYLOAD_DESCRIPTION)); // all stop server2.close(); - instances = serviceDiscovery.queryForInstances(CuratorScheduler.SERVICE_NAME); + instances = serviceDiscovery.queryForInstances(KylinServerDiscovery.SERVICE_NAME); Assert.assertTrue(instances.size() == 0); } finally { diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java index faa7d71..931dc8a 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java @@ -18,30 +18,16 @@ package org.apache.kylin.job.impl.curator; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; -import java.util.LinkedHashMap; -import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; - -import javax.annotation.Nullable; import org.apache.commons.io.IOUtils; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.x.discovery.ServiceCache; -import org.apache.curator.x.discovery.ServiceDiscovery; -import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; -import org.apache.curator.x.discovery.ServiceInstance; -import org.apache.curator.x.discovery.details.InstanceSerializer; -import org.apache.curator.x.discovery.details.ServiceCacheListener; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.ServerMode; -import org.apache.kylin.common.util.StringUtil; +import org.apache.kylin.common.util.ServerMode; import org.apache.kylin.common.util.ZKUtil; import org.apache.kylin.job.Scheduler; import org.apache.kylin.job.engine.JobEngineConfig; @@ -51,12 +37,7 @@ import org.apache.kylin.job.lock.JobLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.kylin.shaded.com.google.common.annotations.VisibleForTesting; -import org.apache.kylin.shaded.com.google.common.base.Function; -import org.apache.kylin.shaded.com.google.common.collect.Lists; +import com.google.common.annotations.VisibleForTesting; public class CuratorScheduler implements Scheduler<AbstractExecutable> { @@ -64,15 +45,10 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> { private boolean started = false; private CuratorFramework curatorClient = null; private static CuratorLeaderSelector jobClient = null; - private ServiceDiscovery<LinkedHashMap> serviceDiscovery = null; - private ServiceCache<LinkedHashMap> serviceCache = null; private KylinConfig kylinConfig; private AtomicInteger count = new AtomicInteger(); static final String JOB_ENGINE_LEADER_PATH = "/job_engine/leader"; - static final String KYLIN_SERVICE_PATH = "/service"; - static final String SERVICE_NAME = "kylin"; - static final String SERVICE_PAYLOAD_DESCRIPTION = "description"; // the default constructor should exist for reflection initialization public CuratorScheduler() { @@ -100,17 +76,11 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> { curatorClient = ZKUtil.getZookeeperClient(kylinConfig); } - final String serverMode = jobEngineConfig.getConfig().getServerMode(); - final String restAddress = kylinConfig.getServerRestAddress(); - try { - registerInstance(restAddress, serverMode); - } catch (Exception e) { - throw new SchedulerException(e); - } + String restAddress = kylinConfig.getServerRestAddress(); String jobEnginePath = JOB_ENGINE_LEADER_PATH; - if (ServerMode.isJob(jobEngineConfig.getConfig())) { + if (ServerMode.getServerMode(kylinConfig).canServeJobBuild()) { jobClient = new CuratorLeaderSelector(curatorClient, jobEnginePath, restAddress, jobEngineConfig); try { logger.info("start Job Engine, lock path is: " + jobEnginePath); @@ -120,79 +90,13 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> { throw new SchedulerException(e); } } else { - logger.info("server mode: " + jobEngineConfig.getConfig().getServerMode() + ", no need to run job scheduler"); + logger.info("server mode: " + jobEngineConfig.getConfig().getServerMode() + + ", no need to run job scheduler"); } started = true; } } - private void registerInstance(String restAddress, String mode) throws Exception { - final String host = restAddress.substring(0, restAddress.indexOf(":")); - final String port = restAddress.substring(restAddress.indexOf(":") + 1); - - final JsonInstanceSerializer<LinkedHashMap> serializer = new JsonInstanceSerializer<>(LinkedHashMap.class); - final String servicePath = KYLIN_SERVICE_PATH; - serviceDiscovery = ServiceDiscoveryBuilder.builder(LinkedHashMap.class).client(curatorClient) - .basePath(servicePath).serializer(serializer).build(); - serviceDiscovery.start(); - - serviceCache = serviceDiscovery.serviceCacheBuilder().name(SERVICE_NAME) - .threadFactory(Executors.defaultThreadFactory()).build(); - - serviceCache.addListener(new ServiceCacheListener() { - @Override - public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { - } - - @Override - public void cacheChanged() { - logger.info("Service discovery get cacheChanged notification"); - final List<ServiceInstance<LinkedHashMap>> instances = serviceCache.getInstances(); - final List<String> instanceNodes = Lists.transform(instances, - new Function<ServiceInstance<LinkedHashMap>, String>() { - - @Nullable - @Override - public String apply(@Nullable ServiceInstance<LinkedHashMap> stringServiceInstance) { - return (String) stringServiceInstance.getPayload().get(SERVICE_PAYLOAD_DESCRIPTION); - } - }); - - final String restServersInCluster = // - StringUtil.join(instanceNodes.stream().map(input -> { // - String[] split = input.split(":"); // - return split[0] + ":" + split[1]; // - }).collect(Collectors.toList()), ","); // - - - logger.info("kylin.server.cluster-servers update to " + restServersInCluster); - // update cluster servers - System.setProperty("kylin.server.cluster-servers", restServersInCluster); - - // get servers and its mode(query, job, all) - final String restServersInClusterWithMode = StringUtil.join(instanceNodes, ","); - logger.info("kylin.server.cluster-servers-with-mode update to " + restServersInClusterWithMode); - System.setProperty("kylin.server.cluster-servers-with-mode", restServersInClusterWithMode); - } - }); - serviceCache.start(); - - final LinkedHashMap<String, String> instanceDetail = new LinkedHashMap<>(); - - instanceDetail.put(SERVICE_PAYLOAD_DESCRIPTION, restAddress + ":" + mode); - ServiceInstance<LinkedHashMap> thisInstance = ServiceInstance.<LinkedHashMap> builder().name(SERVICE_NAME) - .payload(instanceDetail).port(Integer.valueOf(port)).address(host).build(); - - for (ServiceInstance<LinkedHashMap> instance : serviceCache.getInstances()) { - // Check for registered instances to avoid being double registered - if (instance.getAddress().equals(thisInstance.getAddress()) - && instance.getPort().equals(thisInstance.getPort())) { - serviceDiscovery.unregisterService(instance); - } - } - serviceDiscovery.registerService(thisInstance); - } - private void monitorJobEngine() { logger.info("Start collect monitor ZK Participants"); Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() { @@ -220,8 +124,6 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> { @Override public void shutdown() throws SchedulerException { - IOUtils.closeQuietly(serviceCache); - IOUtils.closeQuietly(serviceDiscovery); IOUtils.closeQuietly(curatorClient); IOUtils.closeQuietly(jobClient); started = false; @@ -248,33 +150,4 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> { return jobClient; } - static class JsonInstanceSerializer<T> implements InstanceSerializer<T> { - private final ObjectMapper mapper; - private final Class<T> payloadClass; - private final JavaType type; - - JsonInstanceSerializer(Class<T> payloadClass) { - this.payloadClass = payloadClass; - this.mapper = new ObjectMapper(); - - // to bypass https://issues.apache.org/jira/browse/CURATOR-394 - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - - this.type = this.mapper.getTypeFactory().constructType(ServiceInstance.class); - } - - public ServiceInstance<T> deserialize(byte[] bytes) throws Exception { - ServiceInstance rawServiceInstance = this.mapper.readValue(bytes, this.type); - this.payloadClass.cast(rawServiceInstance.getPayload()); - return rawServiceInstance; - } - - public byte[] serialize(ServiceInstance<T> instance) throws Exception { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - mapper.convertValue(instance.getPayload(), payloadClass); - this.mapper.writeValue(out, instance); - return out.toByteArray(); - } - } - } diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index f45c2e4..9c4573e 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -18,7 +18,6 @@ package org.apache.kylin.job.impl.threadpool; -import java.util.Locale; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -26,6 +25,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.kylin.common.util.ServerMode; import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.job.Scheduler; import org.apache.kylin.job.engine.JobEngineConfig; @@ -35,11 +35,10 @@ import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.Executable; import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.lock.JobLock; +import org.apache.kylin.shaded.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.kylin.shaded.com.google.common.collect.Maps; - /** */ public class DefaultScheduler implements Scheduler<AbstractExecutable> { @@ -134,9 +133,9 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable> { public synchronized void init(JobEngineConfig jobEngineConfig, JobLock lock) throws SchedulerException { jobLock = lock; - String serverMode = jobEngineConfig.getConfig().getServerMode(); - if (!("job".equals(serverMode.toLowerCase(Locale.ROOT)) || "all".equals(serverMode.toLowerCase(Locale.ROOT)))) { - logger.info("server mode: " + serverMode + ", no need to run job scheduler"); + if (!ServerMode.SERVER_MODE.canServeJobBuild()) { + logger.info( + "server mode: " + jobEngineConfig.getConfig().getServerMode() + ", no need to run job scheduler"); return; } logger.info("Initializing Job Engine ...."); diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java index 77717bc..51e7dc0 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -20,7 +20,6 @@ package org.apache.kylin.job.impl.threadpool; import java.io.Closeable; import java.io.IOException; -import java.util.Locale; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; @@ -33,6 +32,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.lock.DistributedLock; +import org.apache.kylin.common.util.ServerMode; import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.common.util.ToolUtil; @@ -48,11 +48,10 @@ import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.Output; import org.apache.kylin.job.lock.JobLock; +import org.apache.kylin.shaded.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.kylin.shaded.com.google.common.collect.Maps; - /** * schedule the cubing jobs when several job server running with the same metadata. * @@ -107,9 +106,9 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> { @Override public synchronized void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException { - String serverMode = jobEngineConfig.getConfig().getServerMode(); - if (!("job".equals(serverMode.toLowerCase(Locale.ROOT)) || "all".equals(serverMode.toLowerCase(Locale.ROOT)))) { - logger.info("server mode: " + serverMode + ", no need to run job scheduler"); + if (!ServerMode.SERVER_MODE.canServeJobBuild()) { + logger.info( + "server mode: " + jobEngineConfig.getConfig().getServerMode() + ", no need to run job scheduler"); return; } logger.info("Initializing Job Engine ...."); diff --git a/server-base/src/main/java/org/apache/kylin/rest/init/InitialSparderContext.java b/server-base/src/main/java/org/apache/kylin/rest/init/InitialSparderContext.java index 49714bd..d57c9ce 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/init/InitialSparderContext.java +++ b/server-base/src/main/java/org/apache/kylin/rest/init/InitialSparderContext.java @@ -19,7 +19,7 @@ package org.apache.kylin.rest.init; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.ServerMode; +import org.apache.kylin.common.util.ServerMode; import org.apache.spark.sql.SparderContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +36,7 @@ public class InitialSparderContext implements InitializingBean { private void runInitialSparder() { KylinConfig config = KylinConfig.getInstanceFromEnv(); - if (ServerMode.isJobOnly(config) || !config.isAutoStartSparder()) { + if (!ServerMode.SERVER_MODE.canServeQuery() || !config.isAutoStartSparder()) { logger.info("Maybe this is job node, or switch is off, do not need to start Spark, {}", config.isAutoStartSparder()); return; } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java b/server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java index 42db50d..206c618 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java @@ -39,6 +39,7 @@ import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.StringEntity; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.ServerMode; import org.apache.kylin.rest.security.AclConstant; import org.apache.kylin.rest.security.ManagedUser; import org.apache.kylin.rest.security.springacl.AclRecord; @@ -75,7 +76,7 @@ public class AclTableMigrationTool { logger.info("Do not need to migrate acl table data"); return; } else { - if (!kylinConfig.getServerMode().equals("all")) { + if (!ServerMode.SERVER_MODE.canServeAll()) { throw new IllegalStateException( "Please make sure that you have config kylin.server.mode=all before migrating data"); } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index e543c22..7cb5d3d 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -38,6 +38,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.lock.DistributedLock; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.zookeeper.KylinServerDiscovery; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -134,6 +135,11 @@ public class JobService extends BasicService implements InitializingBean { final Scheduler<AbstractExecutable> scheduler = (Scheduler<AbstractExecutable>) SchedulerFactory .scheduler(kylinConfig.getSchedulerType()); + if (kylinConfig.getServerSelfDiscoveryEnabled()) { + KylinServerDiscovery.getInstance(); + } + logger.info("Cluster servers: {}", Lists.newArrayList(kylinConfig.getRestServers())); + scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 7296636..b8f1c56 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -77,6 +77,7 @@ import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.util.DBUtils; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.util.ServerMode; import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.CubeInstance; @@ -408,10 +409,9 @@ public class QueryService extends BasicService { sqlRequest.setUsername(getUserName()); KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - String serverMode = kylinConfig.getServerMode(); - if (!(Constant.SERVER_MODE_QUERY.equals(serverMode.toLowerCase(Locale.ROOT)) - || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase(Locale.ROOT)))) { - throw new BadRequestException(String.format(Locale.ROOT, msg.getQUERY_NOT_ALLOWED(), serverMode)); + if (!ServerMode.SERVER_MODE.canServeQuery()) { + throw new BadRequestException( + String.format(Locale.ROOT, msg.getQUERY_NOT_ALLOWED(), kylinConfig.getServerMode())); } if (StringUtils.isBlank(sqlRequest.getProject())) { throw new BadRequestException(msg.getEMPTY_PROJECT_NAME());