This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch realtime-streaming in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/realtime-streaming by this push: new c137bc8 KYLIN-3768 Save streaming metadata a standard kylin path in zookeeper c137bc8 is described below commit c137bc80a8bdf49f41a5214541ca42e5ff4cc6ef Author: chao long <wayn...@qq.com> AuthorDate: Tue Mar 26 18:12:22 2019 +0800 KYLIN-3768 Save streaming metadata a standard kylin path in zookeeper --- .../org/apache/kylin/common/KylinConfigBase.java | 4 -- .../java/org/apache/kylin/common/util/ZKUtil.java | 8 ++- .../kylin/provision/BuildCubeWithStream.java | 2 +- .../kylin/realtime/BuildCubeWithStreamV2.java | 14 +---- .../kylin/stream/coordinator/Coordinator.java | 6 +- .../coordinator/StreamMetadataStoreFactory.java | 4 +- .../kylin/stream/coordinator/StreamingUtils.java | 34 ++++++++++ .../apache/kylin/stream/coordinator/ZKUtils.java | 72 ---------------------- .../coordinator/ZookeeperStreamMetadataStore.java | 6 +- .../kylin/stream/coordinator/CoordinatorTest.java | 7 ++- .../stream/server/ReplicaSetLeaderSelector.java | 4 +- .../kylin/stream/server/StreamingServer.java | 4 +- 12 files changed, 59 insertions(+), 106 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 26dc711..ea182d8 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2052,10 +2052,6 @@ public abstract class KylinConfigBase implements Serializable { return getOptional("kylin.stream.metadata.store.type", "zk"); } - public String getStreamingCoordinateZK() { - return getOptional("kylin.stream.zookeeper", null); - } - public String getStreamingSegmentRetentionPolicy() { return getOptional("kylin.stream.segment.retention.policy", "fullBuild"); } diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ZKUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/ZKUtil.java index 5e032f4..5e02a19 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/ZKUtil.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/ZKUtil.java @@ -106,6 +106,10 @@ public class ZKUtil { return zkString; } + public static String getZkRootBasedPath(String path) { + return zkChRoot + "/" + path; + } + public static CuratorFramework getZookeeperClient(KylinConfig config) { RetryPolicy retryPolicy = getRetryPolicy(config); return getZookeeperClient(getZKConnectString(config), retryPolicy); @@ -215,15 +219,13 @@ public class ZKUtil { }), ","); } - - public static void cleanZkPath(String path) { CuratorFramework zkClient = ZKUtil.newZookeeperClient(); try { zkClient.delete().deletingChildrenIfNeeded().forPath(path); } catch (Exception e) { - logger.warn("Failed to delete zookeeper path: " + path, e); + logger.warn("Failed to delete zookeeper path: {}", path, e); } finally { zkClient.close(); } diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index ef0316c..f09825b 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -88,7 +88,7 @@ public class BuildCubeWithStream { private KafkaConfig kafkaConfig; private MockKafka kafkaServer; private ZkConnection zkConnection; - private final String kafkaZkPath = "/kylin/streaming/" + RandomUtil.randomUUID().toString(); + private final String kafkaZkPath = ZKUtil.getZkRootBasedPath("streaming") + "/" + RandomUtil.randomUUID().toString(); protected static boolean fastBuildMode = false; private volatile boolean generateData = true; private volatile boolean generateDataDone = false; diff --git a/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java b/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java index 94d459d..1c4a934 100644 --- a/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java +++ b/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java @@ -53,7 +53,7 @@ import org.apache.kylin.query.KylinTestBase; import org.apache.kylin.rest.job.StorageCleanupJob; import org.apache.kylin.job.lock.zookeeper.ZookeeperJobLock; import org.apache.kylin.stream.coordinator.Coordinator; -import org.apache.kylin.stream.coordinator.ZKUtils; +import org.apache.kylin.stream.coordinator.StreamingUtils; import org.apache.kylin.stream.core.client.ReceiverAdminClient; import org.apache.kylin.stream.core.consumer.ConsumerStartMode; import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol; @@ -81,7 +81,7 @@ public class BuildCubeWithStreamV2 extends KylinTestBase { private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStreamV2.class); private static final String CUBE_NAME = "test_streaming_v2_user_info_cube"; - private final String kafkaZkPath = "/kylin/streamingv2/" + RandomUtil.randomUUID().toString(); + private final String kafkaZkPath = ZKUtil.getZkRootBasedPath("streamingv2") + "/" + RandomUtil.randomUUID().toString(); private final String messageFile = "src/test/resources/streaming_v2_user_info_messages.txt"; private String topicName; @@ -152,20 +152,12 @@ public class BuildCubeWithStreamV2 extends KylinTestBase { deployEnv(); final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); -// ExecutableManager jobService = ExecutableManager.getInstance(kylinConfig); scheduler = DefaultScheduler.createInstance(); scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); if (!scheduler.hasStarted()) { throw new RuntimeException("scheduler has not been started"); } -// for (String jobId : jobService.getAllJobIds()) { -// AbstractExecutable executable = jobService.getJob(jobId); -// if (executable instanceof CubingJob || executable instanceof CheckpointExecutable) { -// jobService.deleteJob(jobId); -// } -// } - final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME); final String streamingTableName = cubeInstance.getRootFactTable(); final StreamingSourceConfig sourceConfig = StreamingSourceConfigManager.getInstance(kylinConfig).getConfig(streamingTableName); @@ -355,7 +347,7 @@ public class BuildCubeWithStreamV2 extends KylinTestBase { } public static void cleanStreamZkRoot() { - ZKUtil.cleanZkPath(ZKUtils.ZK_ROOT); + ZKUtil.cleanZkPath(StreamingUtils.STREAM_ZK_ROOT); } public static void main(String[] args) { diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java index 1e0750e..66a9c01 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java @@ -141,7 +141,7 @@ public class Coordinator implements CoordinatorClient { this.streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore(); this.receiverAdminClient = new HttpReceiverAdminClient(); this.assigner = getAssigner(); - this.zkClient = ZKUtils.getZookeeperClient(); + this.zkClient = StreamingUtils.getZookeeperClient(); this.selector = new CoordinatorLeaderSelector(); this.jobStatusChecker = new StreamingBuildJobStatusChecker(); this.streamingJobCheckExecutor = Executors.newScheduledThreadPool(1, @@ -156,7 +156,7 @@ public class Coordinator implements CoordinatorClient { this.streamMetadataStore = metadataStore; this.receiverAdminClient = receiverClient; this.assigner = getAssigner(); - this.zkClient = ZKUtils.getZookeeperClient(); + this.zkClient = StreamingUtils.getZookeeperClient(); this.selector = new CoordinatorLeaderSelector(); this.jobStatusChecker = new StreamingBuildJobStatusChecker(); this.streamingJobCheckExecutor = Executors.newScheduledThreadPool(1, @@ -1273,7 +1273,7 @@ public class Coordinator implements CoordinatorClient { private LeaderSelector leaderSelector; public CoordinatorLeaderSelector() { - String path = ZKUtils.COORDINATOR_LEAD; + String path = StreamingUtils.COORDINATOR_LEAD; leaderSelector = new LeaderSelector(zkClient, path, this); leaderSelector.autoRequeue(); } diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStoreFactory.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStoreFactory.java index a2a20de..ac63659 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStoreFactory.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStoreFactory.java @@ -18,7 +18,6 @@ package org.apache.kylin.stream.coordinator; -import org.apache.curator.framework.CuratorFramework; import org.apache.kylin.common.KylinConfig; public class StreamMetadataStoreFactory { @@ -45,8 +44,7 @@ public class StreamMetadataStoreFactory { } public static StreamMetadataStore getZKStreamMetaDataStore() { - CuratorFramework client = ZKUtils.getZookeeperClient(); - StreamMetadataStore store = new ZookeeperStreamMetadataStore(client); + StreamMetadataStore store = new ZookeeperStreamMetadataStore(); return store; } diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamingUtils.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamingUtils.java new file mode 100644 index 0000000..3500543 --- /dev/null +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamingUtils.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.stream.coordinator; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ZKUtil; + + +public class StreamingUtils { + public static final String STREAM_ZK_ROOT = "/stream"; + public static final String COORDINATOR_LEAD = STREAM_ZK_ROOT + "/coordinator"; + public static final String REPLICASETS_LEADER_ELECT = STREAM_ZK_ROOT + "/replica_sets_lead"; + + public static CuratorFramework getZookeeperClient() { + return ZKUtil.getZookeeperClient(KylinConfig.getInstanceFromEnv()); + } +} diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZKUtils.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZKUtils.java deleted file mode 100644 index f21f478..0000000 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZKUtils.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.stream.coordinator; - -import java.util.Arrays; - -import javax.annotation.Nullable; - -import org.apache.commons.lang3.StringUtils; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.storage.hbase.HBaseConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Function; -import com.google.common.collect.Iterables; - -public class ZKUtils { - public static final String ZK_ROOT = KylinConfig.getInstanceFromEnv().getZookeeperBasePath() + "/stream/" + KylinConfig.getInstanceFromEnv().getDeployEnv(); - public static final String COORDINATOR_LEAD = ZK_ROOT + "/coordinator"; - public static final String REPLICASETS_LEADER_ELECT = ZK_ROOT + "/replica_sets_lead"; - private static final Logger logger = LoggerFactory.getLogger(ZKUtils.class); - - public static CuratorFramework getZookeeperClient() { - String zkString = KylinConfig.getInstanceFromEnv().getStreamingCoordinateZK(); - if (zkString == null) { - zkString = getHBaseZKConnString(); - logger.info("streaming zk is not config, use hbase zookeeper:{}", zkString); - } - CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkString) - .retryPolicy(new ExponentialBackoffRetry(1000, 3)).connectionTimeoutMs(15 * 1000) - .sessionTimeoutMs(60 * 1000).build(); - client.start(); - return client; - } - - public static String getHBaseZKConnString() { - Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM); - final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT); - return StringUtils.join( - Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() { - @Nullable - @Override - public String apply(String input) { - return input + ":" + port; - } - }), ","); - } - -} diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java index 4cad0e5..2c3acb2 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java @@ -61,9 +61,9 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore { private String cubeRoot; private String coordinatorRoot; - public ZookeeperStreamMetadataStore(CuratorFramework client) { - this.client = client; - this.zkRoot = ZKUtils.ZK_ROOT; + public ZookeeperStreamMetadataStore() { + this.client = StreamingUtils.getZookeeperClient(); + this.zkRoot = StreamingUtils.STREAM_ZK_ROOT; init(); } diff --git a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java index 70529a5..591ce8c 100644 --- a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java +++ b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java @@ -21,6 +21,7 @@ package org.apache.kylin.stream.coordinator; import com.google.common.collect.Lists; import org.apache.curator.test.TestingServer; import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.common.util.ZKUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.stream.coordinator.exception.ClusterStateException; @@ -102,7 +103,7 @@ public class CoordinatorTest extends LocalFileMetadataTestCase { staticCreateTestMetadata(); testingServer = new TestingServer(12181, false); testingServer.start(); - System.setProperty("kylin.stream.zookeeper", "localhost:12181"); + System.setProperty("kylin.env.zookeeper-connect-string", "localhost:12181"); metadataStore = StreamMetadataStoreFactory.getZKStreamMetaDataStore(); initZookeeperMetadataStore(); mockCube(); @@ -111,8 +112,10 @@ public class CoordinatorTest extends LocalFileMetadataTestCase { @After public void tearDown() throws Exception { coordinator = null; - System.clearProperty("kylin.stream.zookeeper"); + ZKUtil.cleanZkPath(StreamingUtils.STREAM_ZK_ROOT); + StreamingUtils.getZookeeperClient().close(); testingServer.stop();// clear metadata + System.clearProperty("kylin.env.zookeeper-connect-string"); } private void initZookeeperMetadataStore() { diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java index 80a6adc..e7bdbde 100644 --- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java +++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java @@ -25,7 +25,7 @@ import java.util.List; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; -import org.apache.kylin.stream.coordinator.ZKUtils; +import org.apache.kylin.stream.coordinator.StreamingUtils; import org.apache.kylin.stream.core.model.Node; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +42,7 @@ public class ReplicaSetLeaderSelector extends LeaderSelectorListenerAdapter impl public ReplicaSetLeaderSelector(CuratorFramework client, Node currNode, int replicaSetID) { this.node = currNode; this.replicaSetID = replicaSetID; - String path = ZKUtils.REPLICASETS_LEADER_ELECT + "/" + replicaSetID; + String path = StreamingUtils.REPLICASETS_LEADER_ELECT + "/" + replicaSetID; leaderSelector = new LeaderSelector(client, path, this); leaderSelector.autoRequeue(); leaderChangeListeners = Lists.newArrayList(); diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java index 32b8a2b..01e4aa4 100644 --- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java +++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java @@ -57,7 +57,7 @@ import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.stream.coordinator.StreamMetadataStore; import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory; -import org.apache.kylin.stream.coordinator.ZKUtils; +import org.apache.kylin.stream.coordinator.StreamingUtils; import org.apache.kylin.stream.coordinator.client.CoordinatorClient; import org.apache.kylin.stream.coordinator.client.HttpCoordinatorClient; import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol; @@ -121,7 +121,7 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis private String baseStorePath; private StreamingServer() { - streamZKClient = ZKUtils.getZookeeperClient(); + streamZKClient = StreamingUtils.getZookeeperClient(); streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore(); coordinatorClient = new HttpCoordinatorClient(streamMetadataStore); currentNode = NodeUtil.getCurrentNode(DEFAULT_PORT);