KYLIN-2072 Cleanup old streaming code Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5aee0226 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5aee0226 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5aee0226
Branch: refs/heads/master-cdh5.7 Commit: 5aee022612c6fa40c41e8c00063714b79b6d5237 Parents: cb2b12b Author: shaofengshi <shaofeng...@apache.org> Authored: Sun Oct 9 13:10:50 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Mon Oct 10 13:32:44 2016 +0800 ---------------------------------------------------------------------- assembly/pom.xml | 4 - .../kylin/job/streaming/KafkaDataLoader.java | 79 ---- build/bin/cleanup_streaming_files.sh | 42 -- build/bin/kylin.sh | 61 --- build/bin/streaming_build.sh | 33 -- build/bin/streaming_check.sh | 29 -- build/bin/streaming_fillgap.sh | 26 -- build/bin/streaming_rolllog.sh | 29 -- .../metadata/streaming/StreamingConfig.java | 85 ++++ .../metadata/streaming/StreamingManager.java | 248 ++++++++++++ .../.settings/org.eclipse.core.resources.prefs | 6 - .../.settings/org.eclipse.jdt.core.prefs | 386 ------------------- .../.settings/org.eclipse.jdt.ui.prefs | 7 - engine-streaming/pom.xml | 121 ------ .../kylin/engine/streaming/BootstrapConfig.java | 71 ---- .../kylin/engine/streaming/IStreamingInput.java | 30 -- .../engine/streaming/IStreamingOutput.java | 34 -- .../streaming/OneOffStreamingBuilder.java | 71 ---- .../engine/streaming/StreamingBatchBuilder.java | 43 --- .../kylin/engine/streaming/StreamingConfig.java | 85 ---- .../engine/streaming/StreamingManager.java | 248 ------------ .../kylin/engine/streaming/cli/MonitorCLI.java | 88 ----- .../engine/streaming/cli/StreamingCLI.java | 114 ------ .../streaming/cube/StreamingCubeBuilder.java | 168 -------- .../diagnose/StreamingLogAnalyzer.java | 96 ----- .../streaming/monitor/StreamingMonitor.java | 172 --------- .../engine/streaming/util/StreamingUtils.java | 51 --- .../kylin/provision/BuildCubeWithStream.java | 4 +- pom.xml | 6 - .../rest/controller/StreamingController.java | 2 +- .../kylin/rest/controller/TableController.java | 2 +- .../apache/kylin/rest/service/BasicService.java | 2 +- .../kylin/rest/service/StreamingService.java | 2 +- source-kafka/pom.xml | 6 - .../kafka/ByteBufferBackedInputStream.java | 52 --- .../apache/kylin/source/kafka/KafkaSource.java | 2 +- .../kylin/source/kafka/KafkaStreamingInput.java | 227 ----------- .../source/kafka/TimedJsonStreamParser.java | 1 + .../kafka/diagnose/KafkaInputAnalyzer.java | 312 --------------- .../source/kafka/diagnose/KafkaVerify.java | 101 ----- .../source/kafka/diagnose/TimeHistogram.java | 85 ---- .../kafka/util/ByteBufferBackedInputStream.java | 52 +++ .../kylin/source/kafka/util/KafkaRequester.java | 191 --------- .../kylin/source/kafka/util/KafkaUtils.java | 173 --------- .../test/java/TimedJsonStreamParserTest.java | 4 +- storage-hbase/pom.xml | 4 - .../hbase/steps/HBaseStreamingOutput.java | 98 ----- .../apache/kylin/tool/CubeMetaExtractor.java | 4 +- 48 files changed, 397 insertions(+), 3360 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/assembly/pom.xml ---------------------------------------------------------------------- diff --git a/assembly/pom.xml b/assembly/pom.xml index 0c80afc..e6f83a8 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -47,10 +47,6 @@ <groupId>org.apache.kylin</groupId> <artifactId>kylin-engine-mr</artifactId> </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-engine-streaming</artifactId> - </dependency> <!-- Env & Test --> <dependency> http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java deleted file mode 100644 index 454f6cf..0000000 --- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.job.streaming; - -import java.util.List; -import java.util.Properties; - -import javax.annotation.Nullable; - -import org.apache.commons.lang.StringUtils; -import org.apache.kylin.source.kafka.config.BrokerConfig; -import org.apache.kylin.source.kafka.config.KafkaClusterConfig; - -import com.google.common.base.Function; -import com.google.common.collect.Collections2; -import com.google.common.collect.Lists; -import org.apache.kylin.source.kafka.config.KafkaConfig; - -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; - -/** - * Load prepared data into kafka(for test use) - */ -public class KafkaDataLoader extends StreamDataLoader { - List<KafkaClusterConfig> kafkaClusterConfigs; - - public KafkaDataLoader(KafkaConfig kafkaConfig) { - super(kafkaConfig); - this.kafkaClusterConfigs = kafkaConfig.getKafkaClusterConfigs(); - } - - public void loadIntoKafka(List<String> messages) { - - KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0); - String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() { - @Nullable - @Override - public String apply(BrokerConfig brokerConfig) { - return brokerConfig.getHost() + ":" + brokerConfig.getPort(); - } - }), ","); - Properties props = new Properties(); - props.put("metadata.broker.list", brokerList); - props.put("serializer.class", "kafka.serializer.StringEncoder"); - props.put("request.required.acks", "1"); - props.put("retry.backoff.ms", "1000"); - - ProducerConfig config = new ProducerConfig(props); - - Producer<String, String> producer = new Producer<String, String>(config); - - List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList(); - for (int i = 0; i < messages.size(); ++i) { - KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i)); - keyedMessages.add(keyedMessage); - } - producer.send(keyedMessages); - producer.close(); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/cleanup_streaming_files.sh ---------------------------------------------------------------------- diff --git a/build/bin/cleanup_streaming_files.sh b/build/bin/cleanup_streaming_files.sh deleted file mode 100644 index 9b31a4f..0000000 --- a/build/bin/cleanup_streaming_files.sh +++ /dev/null @@ -1,42 +0,0 @@ -#!/bin/bash - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -if [ $# != 1 ] -then - echo 'invalid input' - exit -1 -fi - -cd $KYLIN_HOME/logs - -for pidfile in `find -L . -name "$1_1*"` -do - pidfile=`echo "$pidfile" | cut -c 3-` - echo "pidfile:$pidfile" - pid=`cat $pidfile` - if [ `ps -ef | awk '{print $2}' | grep -w $pid | wc -l` = 1 ] - then - echo "pid:$pid still running" - else - echo "pid:$pid not running, try to delete files" - echo $pidfile | xargs rm - echo "streaming_$pidfile.log" | xargs rm - fi -done - http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/kylin.sh ---------------------------------------------------------------------- diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh index e767492..039be9f 100644 --- a/build/bin/kylin.sh +++ b/build/bin/kylin.sh @@ -139,67 +139,6 @@ then exit 1 fi -# streaming command -elif [ "$1" == "streaming" ] -then - if [ $# -lt 4 ] - then - echo "invalid input args $@" - exit -1 - fi - if [ "$2" == "start" ] - then - retrieveDependency - source ${dir}/find-kafka-dependency.sh - - # KYLIN_EXTRA_START_OPTS is for customized settings, checkout bin/setenv.sh - hbase ${KYLIN_EXTRA_START_OPTS} \ - -Dlog4j.configuration=kylin-log4j.properties\ - -Dkylin.hive.dependency=${hive_dependency} \ - -Dkylin.kafka.dependency=${kafka_dependency} \ - -Dkylin.hbase.dependency=${hbase_dependency} \ - org.apache.kylin.engine.streaming.cli.StreamingCLI $@ > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/logs/$3_$4 & - echo "streaming started name: $3 id: $4" - exit 0 - elif [ "$2" == "stop" ] - then - if [ ! -f "${KYLIN_HOME}/$3_$4" ] - then - echo "streaming is not running, please check" - exit 1 - fi - pid=`cat ${KYLIN_HOME}/$3_$4` - if [ "$pid" = "" ] - then - echo "streaming is not running, please check" - exit 1 - else - echo "stopping streaming:$pid" - kill $pid - fi - rm ${KYLIN_HOME}/$3_$4 - exit 0 - else - echo - fi - -# monitor command -elif [ "$1" == "monitor" ] -then - echo "monitor job" - - retrieveDependency - source ${dir}/find-kafka-dependency.sh - - # KYLIN_EXTRA_START_OPTS is for customized settings, checkout bin/setenv.sh - hbase ${KYLIN_EXTRA_START_OPTS} \ - -Dlog4j.configuration=kylin-log4j.properties\ - -Dkylin.hive.dependency=${hive_dependency} \ - -Dkylin.kafka.dependency=${kafka_dependency} \ - -Dkylin.hbase.dependency=${hbase_dependency} \ - org.apache.kylin.engine.streaming.cli.MonitorCLI $@ > ${KYLIN_HOME}/logs/monitor.log 2>&1 - exit 0 - elif [ "$1" = "version" ] then exec hbase -Dlog4j.configuration=kylin-log4j.properties org.apache.kylin.common.KylinVersion http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/streaming_build.sh ---------------------------------------------------------------------- diff --git a/build/bin/streaming_build.sh b/build/bin/streaming_build.sh deleted file mode 100644 index ed19036..0000000 --- a/build/bin/streaming_build.sh +++ /dev/null @@ -1,33 +0,0 @@ -#!/bin/bash - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -source /etc/profile -source ~/.bash_profile - -CUBE=$1 -INTERVAL=$2 -DELAY=$3 -CURRENT_TIME_IN_SECOND=`date +%s` -CURRENT_TIME=$((CURRENT_TIME_IN_SECOND * 1000)) -START=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY)) -END=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY + INTERVAL)) - -ID="$START"_"$END" -echo "building for ${ID}" >> ${KYLIN_HOME}/logs/build_trace.log -sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${CUBE} ${ID} -start ${START} -end ${END} -cube ${CUBE} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/streaming_check.sh ---------------------------------------------------------------------- diff --git a/build/bin/streaming_check.sh b/build/bin/streaming_check.sh deleted file mode 100644 index fef0139..0000000 --- a/build/bin/streaming_check.sh +++ /dev/null @@ -1,29 +0,0 @@ -#!/bin/bash - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -source /etc/profile -source ~/.bash_profile - -receivers=$1 -host=$2 -tablename=$3 -authorization=$4 -projectname=$5 -cubename=$6 -sh ${KYLIN_HOME}/bin/kylin.sh monitor -receivers ${receivers} -host ${host} -tableName ${tablename} -authorization ${authorization} -cubeName ${cubename} -projectName ${projectname} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/streaming_fillgap.sh ---------------------------------------------------------------------- diff --git a/build/bin/streaming_fillgap.sh b/build/bin/streaming_fillgap.sh deleted file mode 100644 index c67809a..0000000 --- a/build/bin/streaming_fillgap.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/bin/bash - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -source /etc/profile -source ~/.bash_profile - -cube=$1 - -cd ${KYLIN_HOME} -sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${cube} fillgap -cube ${cube} -fillGap true \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/build/bin/streaming_rolllog.sh ---------------------------------------------------------------------- diff --git a/build/bin/streaming_rolllog.sh b/build/bin/streaming_rolllog.sh deleted file mode 100644 index 8018eb8..0000000 --- a/build/bin/streaming_rolllog.sh +++ /dev/null @@ -1,29 +0,0 @@ -#!/bin/bash - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -source /etc/profile -source ~/.bash_profile - -KYLIN_LOG_HOME=${KYLIN_HOME}/logs -cd ${KYLIN_LOG_HOME} -timestamp=`date +%Y_%m_%d_%H_%M_%S` -tarfile=logs_archived_at_${timestamp}.tar -files=`find -L . ! -name '*.tar' -type f -mtime +1` # keep two days' log -echo ${files} | xargs tar -cvf ${tarfile} -echo ${files} | xargs rm \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java new file mode 100644 index 0000000..9fd6ede --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metadata.streaming; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.kylin.common.persistence.JsonSerializer; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.RootPersistentEntity; +import org.apache.kylin.common.persistence.Serializer; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + */ +@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) +public class StreamingConfig extends RootPersistentEntity { + + public static Serializer<StreamingConfig> SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class); + + public static final String STREAMING_TYPE_KAFKA = "kafka"; + + @JsonProperty("name") + private String name; + + @JsonProperty("type") + private String type = STREAMING_TYPE_KAFKA; + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getResourcePath() { + return concatResourcePath(name); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public static String concatResourcePath(String name) { + return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json"; + } + + @Override + public StreamingConfig clone() { + try { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + SERIALIZER.serialize(this, new DataOutputStream(baos)); + return SERIALIZER.deserialize(new DataInputStream(new ByteArrayInputStream(baos.toByteArray()))); + } catch (IOException e) { + throw new RuntimeException(e);//in mem, should not happen + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java new file mode 100644 index 0000000..8cfe87d --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metadata.streaming; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.JsonSerializer; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.Serializer; +import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.metadata.cachesync.Broadcaster; +import org.apache.kylin.metadata.cachesync.Broadcaster.Event; +import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class StreamingManager { + + private static final Logger logger = LoggerFactory.getLogger(StreamingManager.class); + + // static cached instances + private static final ConcurrentHashMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap<KylinConfig, StreamingManager>(); + + public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class); + + private KylinConfig config; + + // name ==> StreamingConfig + private CaseInsensitiveStringCache<StreamingConfig> streamingMap; + + public static void clearCache() { + CACHE.clear(); + } + + private StreamingManager(KylinConfig config) throws IOException { + this.config = config; + this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, "streaming"); + + // touch lower level metadata before registering my listener + reloadAllStreaming(); + Broadcaster.getInstance(config).registerListener(new StreamingSyncListener(), "streaming"); + } + + private class StreamingSyncListener extends Broadcaster.Listener { + @Override + public void onClearAll(Broadcaster broadcaster) throws IOException { + clearCache(); + } + + @Override + public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException { + if (event == Event.DROP) + removeStreamingLocal(cacheKey); + else + reloadStreamingConfigLocal(cacheKey); + } + } + + private ResourceStore getStore() { + return ResourceStore.getStore(this.config); + } + + public static StreamingManager getInstance(KylinConfig config) { + StreamingManager r = CACHE.get(config); + if (r != null) { + return r; + } + + synchronized (StreamingManager.class) { + r = CACHE.get(config); + if (r != null) { + return r; + } + try { + r = new StreamingManager(config); + CACHE.put(config, r); + if (CACHE.size() > 1) { + logger.warn("More than one singleton exist"); + } + return r; + } catch (IOException e) { + throw new IllegalStateException("Failed to init StreamingManager from " + config, e); + } + } + } + + private static String formatStreamingConfigPath(String name) { + return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json"; + } + + private static String formatStreamingOutputPath(String streaming, int partition) { + return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json"; + } + + private static String formatStreamingOutputPath(String streaming, List<Integer> partitions) { + return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json"; + } + + public StreamingConfig getStreamingConfig(String name) { + return streamingMap.get(name); + } + + public List<StreamingConfig> listAllStreaming() { + return new ArrayList<>(streamingMap.values()); + } + + /** + * Reload StreamingConfig from resource store It will be triggered by an desc + * update event. + * + * @param name + * @throws IOException + */ + public StreamingConfig reloadStreamingConfigLocal(String name) throws IOException { + + // Save Source + String path = StreamingConfig.concatResourcePath(name); + + // Reload the StreamingConfig + StreamingConfig ndesc = loadStreamingConfigAt(path); + + // Here replace the old one + streamingMap.putLocal(ndesc.getName(), ndesc); + return ndesc; + } + + // remove streamingConfig + public void removeStreamingConfig(StreamingConfig streamingConfig) throws IOException { + String path = streamingConfig.getResourcePath(); + getStore().deleteResource(path); + streamingMap.remove(streamingConfig.getName()); + } + + public StreamingConfig getConfig(String name) { + name = name.toUpperCase(); + return streamingMap.get(name); + } + + public void removeStreamingLocal(String streamingName) { + streamingMap.removeLocal(streamingName); + } + + /** + * Update CubeDesc with the input. Broadcast the event into cluster + * + * @param desc + * @return + * @throws IOException + */ + public StreamingConfig updateStreamingConfig(StreamingConfig desc) throws IOException { + // Validate CubeDesc + if (desc.getUuid() == null || desc.getName() == null) { + throw new IllegalArgumentException("SteamingConfig Illegal."); + } + String name = desc.getName(); + if (!streamingMap.containsKey(name)) { + throw new IllegalArgumentException("StreamingConfig '" + name + "' does not exist."); + } + + // Save Source + String path = desc.getResourcePath(); + getStore().putResource(path, desc, STREAMING_SERIALIZER); + + // Reload the StreamingConfig + StreamingConfig ndesc = loadStreamingConfigAt(path); + // Here replace the old one + streamingMap.put(ndesc.getName(), desc); + + return ndesc; + } + + public StreamingConfig saveStreamingConfig(StreamingConfig streamingConfig) throws IOException { + if (streamingConfig == null || StringUtils.isEmpty(streamingConfig.getName())) { + throw new IllegalArgumentException(); + } + + if (streamingMap.containsKey(streamingConfig.getName())) + throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists"); + + String path = StreamingConfig.concatResourcePath(streamingConfig.getName()); + getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER); + streamingMap.put(streamingConfig.getName(), streamingConfig); + return streamingConfig; + } + + private StreamingConfig loadStreamingConfigAt(String path) throws IOException { + ResourceStore store = getStore(); + StreamingConfig streamingDesc = store.getResource(path, StreamingConfig.class, STREAMING_SERIALIZER); + + if (StringUtils.isBlank(streamingDesc.getName())) { + throw new IllegalStateException("StreamingConfig name must not be blank"); + } + return streamingDesc; + } + + private void reloadAllStreaming() throws IOException { + ResourceStore store = getStore(); + logger.info("Reloading Streaming Metadata from folder " + store.getReadableResourcePath(ResourceStore.STREAMING_RESOURCE_ROOT)); + + streamingMap.clear(); + + List<String> paths = store.collectResourceRecursively(ResourceStore.STREAMING_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX); + for (String path : paths) { + StreamingConfig streamingConfig; + try { + streamingConfig = loadStreamingConfigAt(path); + } catch (Exception e) { + logger.error("Error loading streaming desc " + path, e); + continue; + } + if (path.equals(streamingConfig.getResourcePath()) == false) { + logger.error("Skip suspicious desc at " + path + ", " + streamingConfig + " should be at " + streamingConfig.getResourcePath()); + continue; + } + if (streamingMap.containsKey(streamingConfig.getName())) { + logger.error("Dup StreamingConfig name '" + streamingConfig.getName() + "' on path " + path); + continue; + } + + streamingMap.putLocal(streamingConfig.getName(), streamingConfig); + } + + logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)"); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/.settings/org.eclipse.core.resources.prefs ---------------------------------------------------------------------- diff --git a/engine-streaming/.settings/org.eclipse.core.resources.prefs b/engine-streaming/.settings/org.eclipse.core.resources.prefs deleted file mode 100644 index 29abf99..0000000 --- a/engine-streaming/.settings/org.eclipse.core.resources.prefs +++ /dev/null @@ -1,6 +0,0 @@ -eclipse.preferences.version=1 -encoding//src/main/java=UTF-8 -encoding//src/main/resources=UTF-8 -encoding//src/test/java=UTF-8 -encoding//src/test/resources=UTF-8 -encoding/<project>=UTF-8 http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/.settings/org.eclipse.jdt.core.prefs ---------------------------------------------------------------------- diff --git a/engine-streaming/.settings/org.eclipse.jdt.core.prefs b/engine-streaming/.settings/org.eclipse.jdt.core.prefs deleted file mode 100644 index 5aaaf1e..0000000 --- a/engine-streaming/.settings/org.eclipse.jdt.core.prefs +++ /dev/null @@ -1,386 +0,0 @@ -eclipse.preferences.version=1 -org.eclipse.jdt.core.compiler.annotation.inheritNullAnnotations=disabled -org.eclipse.jdt.core.compiler.annotation.missingNonNullByDefaultAnnotation=ignore -org.eclipse.jdt.core.compiler.annotation.nonnull=org.eclipse.jdt.annotation.NonNull -org.eclipse.jdt.core.compiler.annotation.nonnull.secondary= -org.eclipse.jdt.core.compiler.annotation.nonnullbydefault=org.eclipse.jdt.annotation.NonNullByDefault -org.eclipse.jdt.core.compiler.annotation.nonnullbydefault.secondary= -org.eclipse.jdt.core.compiler.annotation.nullable=org.eclipse.jdt.annotation.Nullable -org.eclipse.jdt.core.compiler.annotation.nullable.secondary= -org.eclipse.jdt.core.compiler.annotation.nullanalysis=disabled -org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled -org.eclipse.jdt.core.compiler.codegen.methodParameters=do not generate -org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7 -org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve -org.eclipse.jdt.core.compiler.compliance=1.7 -org.eclipse.jdt.core.compiler.debug.lineNumber=generate -org.eclipse.jdt.core.compiler.debug.localVariable=generate -org.eclipse.jdt.core.compiler.debug.sourceFile=generate -org.eclipse.jdt.core.compiler.problem.annotationSuperInterface=warning -org.eclipse.jdt.core.compiler.problem.assertIdentifier=error -org.eclipse.jdt.core.compiler.problem.autoboxing=ignore -org.eclipse.jdt.core.compiler.problem.comparingIdentical=warning -org.eclipse.jdt.core.compiler.problem.deadCode=warning -org.eclipse.jdt.core.compiler.problem.deprecation=warning -org.eclipse.jdt.core.compiler.problem.deprecationInDeprecatedCode=disabled -org.eclipse.jdt.core.compiler.problem.deprecationWhenOverridingDeprecatedMethod=disabled -org.eclipse.jdt.core.compiler.problem.discouragedReference=ignore -org.eclipse.jdt.core.compiler.problem.emptyStatement=ignore -org.eclipse.jdt.core.compiler.problem.enumIdentifier=error -org.eclipse.jdt.core.compiler.problem.explicitlyClosedAutoCloseable=ignore -org.eclipse.jdt.core.compiler.problem.fallthroughCase=ignore -org.eclipse.jdt.core.compiler.problem.fatalOptionalError=disabled -org.eclipse.jdt.core.compiler.problem.fieldHiding=ignore -org.eclipse.jdt.core.compiler.problem.finalParameterBound=warning -org.eclipse.jdt.core.compiler.problem.finallyBlockNotCompletingNormally=warning -org.eclipse.jdt.core.compiler.problem.forbiddenReference=ignore -org.eclipse.jdt.core.compiler.problem.hiddenCatchBlock=warning -org.eclipse.jdt.core.compiler.problem.includeNullInfoFromAsserts=disabled -org.eclipse.jdt.core.compiler.problem.incompatibleNonInheritedInterfaceMethod=warning -org.eclipse.jdt.core.compiler.problem.incompleteEnumSwitch=warning -org.eclipse.jdt.core.compiler.problem.indirectStaticAccess=ignore -org.eclipse.jdt.core.compiler.problem.localVariableHiding=ignore -org.eclipse.jdt.core.compiler.problem.methodWithConstructorName=warning -org.eclipse.jdt.core.compiler.problem.missingDefaultCase=ignore -org.eclipse.jdt.core.compiler.problem.missingDeprecatedAnnotation=ignore -org.eclipse.jdt.core.compiler.problem.missingEnumCaseDespiteDefault=disabled -org.eclipse.jdt.core.compiler.problem.missingHashCodeMethod=ignore -org.eclipse.jdt.core.compiler.problem.missingOverrideAnnotation=ignore -org.eclipse.jdt.core.compiler.problem.missingOverrideAnnotationForInterfaceMethodImplementation=enabled -org.eclipse.jdt.core.compiler.problem.missingSerialVersion=warning -org.eclipse.jdt.core.compiler.problem.missingSynchronizedOnInheritedMethod=ignore -org.eclipse.jdt.core.compiler.problem.noEffectAssignment=warning -org.eclipse.jdt.core.compiler.problem.noImplicitStringConversion=warning -org.eclipse.jdt.core.compiler.problem.nonExternalizedStringLiteral=ignore -org.eclipse.jdt.core.compiler.problem.nonnullParameterAnnotationDropped=warning -org.eclipse.jdt.core.compiler.problem.nonnullTypeVariableFromLegacyInvocation=warning -org.eclipse.jdt.core.compiler.problem.nullAnnotationInferenceConflict=error -org.eclipse.jdt.core.compiler.problem.nullReference=warning -org.eclipse.jdt.core.compiler.problem.nullSpecViolation=error -org.eclipse.jdt.core.compiler.problem.nullUncheckedConversion=warning -org.eclipse.jdt.core.compiler.problem.overridingPackageDefaultMethod=warning -org.eclipse.jdt.core.compiler.problem.parameterAssignment=ignore -org.eclipse.jdt.core.compiler.problem.pessimisticNullAnalysisForFreeTypeVariables=warning -org.eclipse.jdt.core.compiler.problem.possibleAccidentalBooleanAssignment=ignore -org.eclipse.jdt.core.compiler.problem.potentialNullReference=ignore -org.eclipse.jdt.core.compiler.problem.potentiallyUnclosedCloseable=ignore -org.eclipse.jdt.core.compiler.problem.rawTypeReference=ignore -org.eclipse.jdt.core.compiler.problem.redundantNullAnnotation=warning -org.eclipse.jdt.core.compiler.problem.redundantNullCheck=ignore -org.eclipse.jdt.core.compiler.problem.redundantSpecificationOfTypeArguments=ignore -org.eclipse.jdt.core.compiler.problem.redundantSuperinterface=ignore -org.eclipse.jdt.core.compiler.problem.reportMethodCanBePotentiallyStatic=ignore -org.eclipse.jdt.core.compiler.problem.reportMethodCanBeStatic=ignore -org.eclipse.jdt.core.compiler.problem.specialParameterHidingField=disabled -org.eclipse.jdt.core.compiler.problem.staticAccessReceiver=warning -org.eclipse.jdt.core.compiler.problem.suppressOptionalErrors=disabled -org.eclipse.jdt.core.compiler.problem.suppressWarnings=enabled -org.eclipse.jdt.core.compiler.problem.syntacticNullAnalysisForFields=disabled -org.eclipse.jdt.core.compiler.problem.syntheticAccessEmulation=ignore -org.eclipse.jdt.core.compiler.problem.typeParameterHiding=warning -org.eclipse.jdt.core.compiler.problem.unavoidableGenericTypeProblems=enabled -org.eclipse.jdt.core.compiler.problem.uncheckedTypeOperation=ignore -org.eclipse.jdt.core.compiler.problem.unclosedCloseable=warning -org.eclipse.jdt.core.compiler.problem.undocumentedEmptyBlock=ignore -org.eclipse.jdt.core.compiler.problem.unhandledWarningToken=warning -org.eclipse.jdt.core.compiler.problem.unnecessaryElse=ignore -org.eclipse.jdt.core.compiler.problem.unnecessaryTypeCheck=ignore -org.eclipse.jdt.core.compiler.problem.unqualifiedFieldAccess=ignore -org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownException=ignore -org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionExemptExceptionAndThrowable=enabled -org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionIncludeDocCommentReference=enabled -org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionWhenOverriding=disabled -org.eclipse.jdt.core.compiler.problem.unusedExceptionParameter=ignore -org.eclipse.jdt.core.compiler.problem.unusedImport=warning -org.eclipse.jdt.core.compiler.problem.unusedLabel=warning -org.eclipse.jdt.core.compiler.problem.unusedLocal=warning -org.eclipse.jdt.core.compiler.problem.unusedObjectAllocation=ignore -org.eclipse.jdt.core.compiler.problem.unusedParameter=ignore -org.eclipse.jdt.core.compiler.problem.unusedParameterIncludeDocCommentReference=enabled -org.eclipse.jdt.core.compiler.problem.unusedParameterWhenImplementingAbstract=disabled -org.eclipse.jdt.core.compiler.problem.unusedParameterWhenOverridingConcrete=disabled -org.eclipse.jdt.core.compiler.problem.unusedPrivateMember=warning -org.eclipse.jdt.core.compiler.problem.unusedTypeParameter=ignore -org.eclipse.jdt.core.compiler.problem.unusedWarningToken=warning -org.eclipse.jdt.core.compiler.problem.varargsArgumentNeedCast=warning -org.eclipse.jdt.core.compiler.source=1.7 -org.eclipse.jdt.core.formatter.align_type_members_on_columns=false -org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression=16 -org.eclipse.jdt.core.formatter.alignment_for_arguments_in_annotation=0 -org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant=16 -org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call=16 -org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation=16 -org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression=16 -org.eclipse.jdt.core.formatter.alignment_for_assignment=0 -org.eclipse.jdt.core.formatter.alignment_for_binary_expression=16 -org.eclipse.jdt.core.formatter.alignment_for_compact_if=16 -org.eclipse.jdt.core.formatter.alignment_for_conditional_expression=80 -org.eclipse.jdt.core.formatter.alignment_for_enum_constants=0 -org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer=16 -org.eclipse.jdt.core.formatter.alignment_for_method_declaration=0 -org.eclipse.jdt.core.formatter.alignment_for_multiple_fields=16 -org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration=16 -org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration=16 -org.eclipse.jdt.core.formatter.alignment_for_resources_in_try=80 -org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation=16 -org.eclipse.jdt.core.formatter.alignment_for_superclass_in_type_declaration=16 -org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_enum_declaration=16 -org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_type_declaration=16 -org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration=16 -org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration=16 -org.eclipse.jdt.core.formatter.alignment_for_union_type_in_multicatch=16 -org.eclipse.jdt.core.formatter.blank_lines_after_imports=1 -org.eclipse.jdt.core.formatter.blank_lines_after_package=1 -org.eclipse.jdt.core.formatter.blank_lines_before_field=0 -org.eclipse.jdt.core.formatter.blank_lines_before_first_class_body_declaration=0 -org.eclipse.jdt.core.formatter.blank_lines_before_imports=1 -org.eclipse.jdt.core.formatter.blank_lines_before_member_type=1 -org.eclipse.jdt.core.formatter.blank_lines_before_method=1 -org.eclipse.jdt.core.formatter.blank_lines_before_new_chunk=1 -org.eclipse.jdt.core.formatter.blank_lines_before_package=0 -org.eclipse.jdt.core.formatter.blank_lines_between_import_groups=1 -org.eclipse.jdt.core.formatter.blank_lines_between_type_declarations=1 -org.eclipse.jdt.core.formatter.brace_position_for_annotation_type_declaration=end_of_line -org.eclipse.jdt.core.formatter.brace_position_for_anonymous_type_declaration=end_of_line -org.eclipse.jdt.core.formatter.brace_position_for_array_initializer=end_of_line -org.eclipse.jdt.core.formatter.brace_position_for_block=end_of_line -org.eclipse.jdt.core.formatter.brace_position_for_block_in_case=end_of_line -org.eclipse.jdt.core.formatter.brace_position_for_constructor_declaration=end_of_line -org.eclipse.jdt.core.formatter.brace_position_for_enum_constant=end_of_line -org.eclipse.jdt.core.formatter.brace_position_for_enum_declaration=end_of_line -org.eclipse.jdt.core.formatter.brace_position_for_method_declaration=end_of_line -org.eclipse.jdt.core.formatter.brace_position_for_switch=end_of_line -org.eclipse.jdt.core.formatter.brace_position_for_type_declaration=end_of_line -org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment=false -org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment=false -org.eclipse.jdt.core.formatter.comment.format_block_comments=false -org.eclipse.jdt.core.formatter.comment.format_header=false -org.eclipse.jdt.core.formatter.comment.format_html=true -org.eclipse.jdt.core.formatter.comment.format_javadoc_comments=false -org.eclipse.jdt.core.formatter.comment.format_line_comments=false -org.eclipse.jdt.core.formatter.comment.format_source_code=true -org.eclipse.jdt.core.formatter.comment.indent_parameter_description=true -org.eclipse.jdt.core.formatter.comment.indent_root_tags=true -org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags=insert -org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter=insert -org.eclipse.jdt.core.formatter.comment.line_length=80 -org.eclipse.jdt.core.formatter.comment.new_lines_at_block_boundaries=true -org.eclipse.jdt.core.formatter.comment.new_lines_at_javadoc_boundaries=true -org.eclipse.jdt.core.formatter.comment.preserve_white_space_between_code_and_line_comments=false -org.eclipse.jdt.core.formatter.compact_else_if=true -org.eclipse.jdt.core.formatter.continuation_indentation=2 -org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer=2 -org.eclipse.jdt.core.formatter.disabling_tag=@formatter\:off -org.eclipse.jdt.core.formatter.enabling_tag=@formatter\:on -org.eclipse.jdt.core.formatter.format_guardian_clause_on_one_line=false -org.eclipse.jdt.core.formatter.format_line_comment_starting_on_first_column=true -org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_annotation_declaration_header=true -org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_constant_header=true -org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_declaration_header=true -org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_type_header=true -org.eclipse.jdt.core.formatter.indent_breaks_compare_to_cases=true -org.eclipse.jdt.core.formatter.indent_empty_lines=false -org.eclipse.jdt.core.formatter.indent_statements_compare_to_block=true -org.eclipse.jdt.core.formatter.indent_statements_compare_to_body=true -org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_cases=true -org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch=false -org.eclipse.jdt.core.formatter.indentation.size=4 -org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_field=insert -org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_local_variable=insert -org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_method=insert -org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_package=insert -org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_parameter=do not insert -org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_type=insert -org.eclipse.jdt.core.formatter.insert_new_line_after_label=do not insert -org.eclipse.jdt.core.formatter.insert_new_line_after_opening_brace_in_array_initializer=do not insert -org.eclipse.jdt.core.formatter.insert_new_line_at_end_of_file_if_missing=do not insert -org.eclipse.jdt.core.formatter.insert_new_line_before_catch_in_try_statement=do not insert -org.eclipse.jdt.core.formatter.insert_new_line_before_closing_brace_in_array_initializer=do not insert -org.eclipse.jdt.core.formatter.insert_new_line_before_else_in_if_statement=do not insert -org.eclipse.jdt.core.formatter.insert_new_line_before_finally_in_try_statement=do not insert -org.eclipse.jdt.core.formatter.insert_new_line_before_while_in_do_statement=do not insert -org.eclipse.jdt.core.formatter.insert_new_line_in_empty_annotation_declaration=insert -org.eclipse.jdt.core.formatter.insert_new_line_in_empty_anonymous_type_declaration=insert -org.eclipse.jdt.core.formatter.insert_new_line_in_empty_block=insert -org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_constant=insert -org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_declaration=insert -org.eclipse.jdt.core.formatter.insert_new_line_in_empty_method_body=insert -org.eclipse.jdt.core.formatter.insert_new_line_in_empty_type_declaration=insert -org.eclipse.jdt.core.formatter.insert_space_after_and_in_type_parameter=insert -org.eclipse.jdt.core.formatter.insert_space_after_assignment_operator=insert -org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation_type_declaration=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_binary_operator=insert -org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_arguments=insert -org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_parameters=insert -org.eclipse.jdt.core.formatter.insert_space_after_closing_brace_in_block=insert -org.eclipse.jdt.core.formatter.insert_space_after_closing_paren_in_cast=insert -org.eclipse.jdt.core.formatter.insert_space_after_colon_in_assert=insert -org.eclipse.jdt.core.formatter.insert_space_after_colon_in_case=insert -org.eclipse.jdt.core.formatter.insert_space_after_colon_in_conditional=insert -org.eclipse.jdt.core.formatter.insert_space_after_colon_in_for=insert -org.eclipse.jdt.core.formatter.insert_space_after_colon_in_labeled_statement=insert -org.eclipse.jdt.core.formatter.insert_space_after_comma_in_allocation_expression=insert -org.eclipse.jdt.core.formatter.insert_space_after_comma_in_annotation=insert -org.eclipse.jdt.core.formatter.insert_space_after_comma_in_array_initializer=insert -org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_parameters=insert -org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_throws=insert -org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_constant_arguments=insert -org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_declarations=insert -org.eclipse.jdt.core.formatter.insert_space_after_comma_in_explicitconstructorcall_arguments=insert -org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_increments=insert -org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_inits=insert -org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_parameters=insert -org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_throws=insert -org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_invocation_arguments=insert -org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_field_declarations=insert -org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_local_declarations=insert -org.eclipse.jdt.core.formatter.insert_space_after_comma_in_parameterized_type_reference=insert -org.eclipse.jdt.core.formatter.insert_space_after_comma_in_superinterfaces=insert -org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_arguments=insert -org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_parameters=insert -org.eclipse.jdt.core.formatter.insert_space_after_ellipsis=insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_parameterized_type_reference=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_arguments=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_parameters=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_brace_in_array_initializer=insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_allocation_expression=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_reference=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_annotation=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_cast=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_catch=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_constructor_declaration=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_enum_constant=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_for=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_if=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_declaration=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_invocation=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_parenthesized_expression=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_switch=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_synchronized=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_try=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_while=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_postfix_operator=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_prefix_operator=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_question_in_conditional=insert -org.eclipse.jdt.core.formatter.insert_space_after_question_in_wildcard=do not insert -org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_for=insert -org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_try_resources=insert -org.eclipse.jdt.core.formatter.insert_space_after_unary_operator=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_and_in_type_parameter=insert -org.eclipse.jdt.core.formatter.insert_space_before_assignment_operator=insert -org.eclipse.jdt.core.formatter.insert_space_before_at_in_annotation_type_declaration=insert -org.eclipse.jdt.core.formatter.insert_space_before_binary_operator=insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_parameterized_type_reference=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_arguments=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_parameters=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_brace_in_array_initializer=insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_allocation_expression=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_reference=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_annotation=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_cast=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_catch=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_constructor_declaration=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_enum_constant=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_for=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_if=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_declaration=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_invocation=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_parenthesized_expression=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_switch=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_synchronized=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_try=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_while=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_colon_in_assert=insert -org.eclipse.jdt.core.formatter.insert_space_before_colon_in_case=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_colon_in_conditional=insert -org.eclipse.jdt.core.formatter.insert_space_before_colon_in_default=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_colon_in_for=insert -org.eclipse.jdt.core.formatter.insert_space_before_colon_in_labeled_statement=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_comma_in_allocation_expression=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_comma_in_annotation=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_comma_in_array_initializer=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_parameters=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_throws=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_constant_arguments=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_declarations=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_comma_in_explicitconstructorcall_arguments=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_increments=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_inits=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_parameters=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_throws=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_invocation_arguments=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_field_declarations=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_local_declarations=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_comma_in_parameterized_type_reference=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_comma_in_superinterfaces=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_arguments=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_parameters=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_ellipsis=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_parameterized_type_reference=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_arguments=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_parameters=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_annotation_type_declaration=insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_anonymous_type_declaration=insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_array_initializer=insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_block=insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_constructor_declaration=insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_constant=insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_declaration=insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_method_declaration=insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_switch=insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_type_declaration=insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_allocation_expression=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_reference=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_type_reference=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation_type_member_declaration=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_catch=insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_constructor_declaration=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_enum_constant=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_for=insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_if=insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_declaration=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_invocation=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_parenthesized_expression=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_switch=insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_synchronized=insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_try=insert -org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_while=insert -org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return=insert -org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw=insert -org.eclipse.jdt.core.formatter.insert_space_before_postfix_operator=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_prefix_operator=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_question_in_conditional=insert -org.eclipse.jdt.core.formatter.insert_space_before_question_in_wildcard=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_semicolon=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_for=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_try_resources=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_unary_operator=do not insert -org.eclipse.jdt.core.formatter.insert_space_between_brackets_in_array_type_reference=do not insert -org.eclipse.jdt.core.formatter.insert_space_between_empty_braces_in_array_initializer=do not insert -org.eclipse.jdt.core.formatter.insert_space_between_empty_brackets_in_array_allocation_expression=do not insert -org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_annotation_type_member_declaration=do not insert -org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_constructor_declaration=do not insert -org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant=do not insert -org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration=do not insert -org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation=do not insert -org.eclipse.jdt.core.formatter.join_lines_in_comments=true -org.eclipse.jdt.core.formatter.join_wrapped_lines=true -org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line=false -org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line=false -org.eclipse.jdt.core.formatter.keep_imple_if_on_one_line=false -org.eclipse.jdt.core.formatter.keep_then_statement_on_same_line=false -org.eclipse.jdt.core.formatter.lineSplit=999 -org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column=false -org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column=false -org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body=0 -org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve=1 -org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line=true -org.eclipse.jdt.core.formatter.tabulation.char=space -org.eclipse.jdt.core.formatter.tabulation.size=4 -org.eclipse.jdt.core.formatter.use_on_off_tags=false -org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations=false -org.eclipse.jdt.core.formatter.wrap_before_binary_operator=true -org.eclipse.jdt.core.formatter.wrap_before_or_operator_multicatch=true -org.eclipse.jdt.core.formatter.wrap_outer_expressions_when_nested=true http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/.settings/org.eclipse.jdt.ui.prefs ---------------------------------------------------------------------- diff --git a/engine-streaming/.settings/org.eclipse.jdt.ui.prefs b/engine-streaming/.settings/org.eclipse.jdt.ui.prefs deleted file mode 100644 index d521bab..0000000 --- a/engine-streaming/.settings/org.eclipse.jdt.ui.prefs +++ /dev/null @@ -1,7 +0,0 @@ -eclipse.preferences.version=1 -formatter_profile=_Space Indent & Long Lines -formatter_settings_version=12 -org.eclipse.jdt.ui.ignorelowercasenames=true -org.eclipse.jdt.ui.importorder=java;javax;org;com; -org.eclipse.jdt.ui.ondemandthreshold=99 -org.eclipse.jdt.ui.staticondemandthreshold=99 http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/pom.xml ---------------------------------------------------------------------- diff --git a/engine-streaming/pom.xml b/engine-streaming/pom.xml deleted file mode 100644 index 876279d..0000000 --- a/engine-streaming/pom.xml +++ /dev/null @@ -1,121 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <artifactId>kylin-engine-streaming</artifactId> - <packaging>jar</packaging> - <name>Apache Kylin - Streaming Engine</name> - - <parent> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin</artifactId> - <version>1.6.0-SNAPSHOT</version> - - </parent> - - <properties> - </properties> - - <dependencies> - - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-core-cube</artifactId> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-core-storage</artifactId> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-core-job</artifactId> - </dependency> - - <!-- Env & Test --> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-core-common</artifactId> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <scope>provided</scope> - <!-- protobuf version conflict with hbase --> - <exclusions> - <exclusion> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-app</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-jobclient</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-resourcemanager</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.mrunit</groupId> - <artifactId>mrunit</artifactId> - <classifier>hadoop2</classifier> - <scope>test</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - - -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java deleted file mode 100644 index 35bdfa8..0000000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.streaming; - -/** - */ -public class BootstrapConfig { - - private String cubeName; - private long start = 0L; - private long end = 0L; - - private boolean fillGap; - private long maxFillGapRange = 4 * 3600 * 1000L; - - public long getStart() { - return start; - } - - public void setStart(long start) { - this.start = start; - } - - public long getEnd() { - return end; - } - - public void setEnd(long end) { - this.end = end; - } - - public String getCubeName() { - return cubeName; - } - - public void setCubeName(String cubeName) { - this.cubeName = cubeName; - } - - public boolean isFillGap() { - return fillGap; - } - - public void setFillGap(boolean fillGap) { - this.fillGap = fillGap; - } - - public long getMaxFillGapRange() { - return maxFillGapRange; - } - - public void setMaxFillGapRange(long maxFillGapRange) { - this.maxFillGapRange = maxFillGapRange; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java deleted file mode 100644 index c583283..0000000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.streaming; - -import org.apache.kylin.common.util.StreamingBatch; -import org.apache.kylin.metadata.realization.RealizationType; - -/** - */ -public interface IStreamingInput { - - StreamingBatch getBatchWithTimeWindow(RealizationType realizationType, String realizationName, int id, long startTime, long endTime); - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java deleted file mode 100644 index cb15e2b..0000000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.streaming; - -import java.util.Map; - -import org.apache.kylin.cube.inmemcubing.ICuboidWriter; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; -import org.apache.kylin.metadata.model.IBuildable; - -/** - */ -public interface IStreamingOutput { - - ICuboidWriter getCuboidWriter(IBuildable buildable); - - void output(IBuildable buildable, Map<Long, HyperLogLogPlusCounter> samplingResult); -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java deleted file mode 100644 index c9da46e..0000000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.streaming; - -import java.util.Map; - -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.common.util.StreamingBatch; -import org.apache.kylin.engine.streaming.util.StreamingUtils; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; -import org.apache.kylin.metadata.model.IBuildable; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.realization.RealizationType; - -import com.google.common.base.Preconditions; - -/** - */ -public class OneOffStreamingBuilder { - - private final IStreamingInput streamingInput; - private final IStreamingOutput streamingOutput; - private final StreamingBatchBuilder streamingBatchBuilder; - private final long startTime; - private final long endTime; - private final RealizationType realizationType; - private final String realizationName; - - public OneOffStreamingBuilder(RealizationType realizationType, String realizationName, long startTime, long endTime) { - Preconditions.checkArgument(startTime < endTime); - this.startTime = startTime; - this.endTime = endTime; - this.realizationType = Preconditions.checkNotNull(realizationType); - this.realizationName = Preconditions.checkNotNull(realizationName); - this.streamingInput = Preconditions.checkNotNull(StreamingUtils.getStreamingInput()); - this.streamingOutput = Preconditions.checkNotNull(StreamingUtils.getStreamingOutput()); - this.streamingBatchBuilder = Preconditions.checkNotNull(StreamingUtils.getMicroBatchBuilder(realizationType, realizationName)); - } - - public Runnable build() { - return new Runnable() { - @Override - public void run() { - StreamingBatch streamingBatch = streamingInput.getBatchWithTimeWindow(realizationType, realizationName, -1, startTime, endTime); - final IBuildable buildable = streamingBatchBuilder.createBuildable(streamingBatch); - final Map<Long, HyperLogLogPlusCounter> samplingResult = streamingBatchBuilder.sampling(streamingBatch); - final Map<TblColRef, Dictionary<String>> dictionaryMap = streamingBatchBuilder.buildDictionary(streamingBatch, buildable); - streamingBatchBuilder.build(streamingBatch, dictionaryMap, streamingOutput.getCuboidWriter(buildable)); - streamingOutput.output(buildable, samplingResult); - streamingBatchBuilder.commit(buildable); - } - }; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java deleted file mode 100644 index 8b0b8e6..0000000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.streaming; - -import java.util.Map; - -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.common.util.StreamingBatch; -import org.apache.kylin.cube.inmemcubing.ICuboidWriter; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; -import org.apache.kylin.metadata.model.IBuildable; -import org.apache.kylin.metadata.model.TblColRef; - -/** - */ -public interface StreamingBatchBuilder { - - IBuildable createBuildable(StreamingBatch streamingBatch); - - Map<Long, HyperLogLogPlusCounter> sampling(StreamingBatch streamingBatch); - - Map<TblColRef, Dictionary<String>> buildDictionary(StreamingBatch streamingBatch, IBuildable buildable); - - void build(StreamingBatch streamingBatch, Map<TblColRef, Dictionary<String>> dictionaryMap, ICuboidWriter cuboidWriter); - - void commit(IBuildable buildable); -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java deleted file mode 100644 index 9d1a0b1..0000000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.streaming; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.kylin.common.persistence.JsonSerializer; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.RootPersistentEntity; -import org.apache.kylin.common.persistence.Serializer; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonProperty; - -/** - */ -@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) -public class StreamingConfig extends RootPersistentEntity { - - public static Serializer<StreamingConfig> SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class); - - public static final String STREAMING_TYPE_KAFKA = "kafka"; - - @JsonProperty("name") - private String name; - - @JsonProperty("type") - private String type = STREAMING_TYPE_KAFKA; - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public String getResourcePath() { - return concatResourcePath(name); - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public static String concatResourcePath(String name) { - return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json"; - } - - @Override - public StreamingConfig clone() { - try { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - SERIALIZER.serialize(this, new DataOutputStream(baos)); - return SERIALIZER.deserialize(new DataInputStream(new ByteArrayInputStream(baos.toByteArray()))); - } catch (IOException e) { - throw new RuntimeException(e);//in mem, should not happen - } - } - -}