http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java deleted file mode 100644 index 271bf41..0000000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.streaming; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.JsonSerializer; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.Serializer; -import org.apache.kylin.metadata.MetadataConstants; -import org.apache.kylin.metadata.cachesync.Broadcaster; -import org.apache.kylin.metadata.cachesync.Broadcaster.Event; -import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -public class StreamingManager { - - private static final Logger logger = LoggerFactory.getLogger(StreamingManager.class); - - // static cached instances - private static final ConcurrentHashMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap<KylinConfig, StreamingManager>(); - - public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class); - - private KylinConfig config; - - // name ==> StreamingConfig - private CaseInsensitiveStringCache<StreamingConfig> streamingMap; - - public static void clearCache() { - CACHE.clear(); - } - - private StreamingManager(KylinConfig config) throws IOException { - this.config = config; - this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, "streaming"); - - // touch lower level metadata before registering my listener - reloadAllStreaming(); - Broadcaster.getInstance(config).registerListener(new StreamingSyncListener(), "streaming"); - } - - private class StreamingSyncListener extends Broadcaster.Listener { - @Override - public void onClearAll(Broadcaster broadcaster) throws IOException { - clearCache(); - } - - @Override - public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException { - if (event == Event.DROP) - removeStreamingLocal(cacheKey); - else - reloadStreamingConfigLocal(cacheKey); - } - } - - private ResourceStore getStore() { - return ResourceStore.getStore(this.config); - } - - public static StreamingManager getInstance(KylinConfig config) { - StreamingManager r = CACHE.get(config); - if (r != null) { - return r; - } - - synchronized (StreamingManager.class) { - r = CACHE.get(config); - if (r != null) { - return r; - } - try { - r = new StreamingManager(config); - CACHE.put(config, r); - if (CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - } - return r; - } catch (IOException e) { - throw new IllegalStateException("Failed to init StreamingManager from " + config, e); - } - } - } - - private static String formatStreamingConfigPath(String name) { - return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json"; - } - - private static String formatStreamingOutputPath(String streaming, int partition) { - return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json"; - } - - private static String formatStreamingOutputPath(String streaming, List<Integer> partitions) { - return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json"; - } - - public StreamingConfig getStreamingConfig(String name) { - return streamingMap.get(name); - } - - public List<StreamingConfig> listAllStreaming() { - return new ArrayList<>(streamingMap.values()); - } - - /** - * Reload StreamingConfig from resource store It will be triggered by an desc - * update event. - * - * @param name - * @throws IOException - */ - public StreamingConfig reloadStreamingConfigLocal(String name) throws IOException { - - // Save Source - String path = StreamingConfig.concatResourcePath(name); - - // Reload the StreamingConfig - StreamingConfig ndesc = loadStreamingConfigAt(path); - - // Here replace the old one - streamingMap.putLocal(ndesc.getName(), ndesc); - return ndesc; - } - - // remove streamingConfig - public void removeStreamingConfig(StreamingConfig streamingConfig) throws IOException { - String path = streamingConfig.getResourcePath(); - getStore().deleteResource(path); - streamingMap.remove(streamingConfig.getName()); - } - - public StreamingConfig getConfig(String name) { - name = name.toUpperCase(); - return streamingMap.get(name); - } - - public void removeStreamingLocal(String streamingName) { - streamingMap.removeLocal(streamingName); - } - - /** - * Update CubeDesc with the input. Broadcast the event into cluster - * - * @param desc - * @return - * @throws IOException - */ - public StreamingConfig updateStreamingConfig(StreamingConfig desc) throws IOException { - // Validate CubeDesc - if (desc.getUuid() == null || desc.getName() == null) { - throw new IllegalArgumentException("SteamingConfig Illegal."); - } - String name = desc.getName(); - if (!streamingMap.containsKey(name)) { - throw new IllegalArgumentException("StreamingConfig '" + name + "' does not exist."); - } - - // Save Source - String path = desc.getResourcePath(); - getStore().putResource(path, desc, STREAMING_SERIALIZER); - - // Reload the StreamingConfig - StreamingConfig ndesc = loadStreamingConfigAt(path); - // Here replace the old one - streamingMap.put(ndesc.getName(), desc); - - return ndesc; - } - - public StreamingConfig saveStreamingConfig(StreamingConfig streamingConfig) throws IOException { - if (streamingConfig == null || StringUtils.isEmpty(streamingConfig.getName())) { - throw new IllegalArgumentException(); - } - - if (streamingMap.containsKey(streamingConfig.getName())) - throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists"); - - String path = StreamingConfig.concatResourcePath(streamingConfig.getName()); - getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER); - streamingMap.put(streamingConfig.getName(), streamingConfig); - return streamingConfig; - } - - private StreamingConfig loadStreamingConfigAt(String path) throws IOException { - ResourceStore store = getStore(); - StreamingConfig streamingDesc = store.getResource(path, StreamingConfig.class, STREAMING_SERIALIZER); - - if (StringUtils.isBlank(streamingDesc.getName())) { - throw new IllegalStateException("StreamingConfig name must not be blank"); - } - return streamingDesc; - } - - private void reloadAllStreaming() throws IOException { - ResourceStore store = getStore(); - logger.info("Reloading Streaming Metadata from folder " + store.getReadableResourcePath(ResourceStore.STREAMING_RESOURCE_ROOT)); - - streamingMap.clear(); - - List<String> paths = store.collectResourceRecursively(ResourceStore.STREAMING_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX); - for (String path : paths) { - StreamingConfig streamingConfig; - try { - streamingConfig = loadStreamingConfigAt(path); - } catch (Exception e) { - logger.error("Error loading streaming desc " + path, e); - continue; - } - if (path.equals(streamingConfig.getResourcePath()) == false) { - logger.error("Skip suspicious desc at " + path + ", " + streamingConfig + " should be at " + streamingConfig.getResourcePath()); - continue; - } - if (streamingMap.containsKey(streamingConfig.getName())) { - logger.error("Dup StreamingConfig name '" + streamingConfig.getName() + "' on path " + path); - continue; - } - - streamingMap.putLocal(streamingConfig.getName(), streamingConfig); - } - - logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)"); - } -}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java deleted file mode 100644 index 32030ad..0000000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.streaming.cli; - -import java.util.List; - -import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.engine.streaming.monitor.StreamingMonitor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - */ -public class MonitorCLI { - - private static final Logger logger = LoggerFactory.getLogger(MonitorCLI.class); - - public static void main(String[] args) { - Preconditions.checkArgument(args[0].equals("monitor")); - - int i = 1; - List<String> receivers = null; - String host = null; - String tableName = null; - String authorization = null; - String cubeName = null; - String projectName = "default"; - while (i < args.length) { - String argName = args[i]; - switch (argName) { - case "-receivers": - receivers = Lists.newArrayList(StringUtils.split(args[++i], ";")); - break; - case "-host": - host = args[++i]; - break; - case "-tableName": - tableName = args[++i]; - break; - case "-authorization": - authorization = args[++i]; - break; - case "-cubeName": - cubeName = args[++i]; - break; - case "-projectName": - projectName = args[++i]; - break; - default: - throw new RuntimeException("invalid argName:" + argName); - } - i++; - } - Preconditions.checkArgument(receivers != null && receivers.size() > 0); - final StreamingMonitor streamingMonitor = new StreamingMonitor(); - if (tableName != null) { - logger.info(String.format("check query tableName:%s host:%s receivers:%s", tableName, host, StringUtils.join(receivers, ";"))); - Preconditions.checkNotNull(host); - Preconditions.checkNotNull(authorization); - Preconditions.checkNotNull(tableName); - streamingMonitor.checkCountAll(receivers, host, authorization, projectName, tableName); - } - if (cubeName != null) { - logger.info(String.format("check cube cubeName:%s receivers:%s", cubeName, StringUtils.join(receivers, ";"))); - streamingMonitor.checkCube(receivers, cubeName, host); - } - System.exit(0); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java deleted file mode 100644 index 1d66b41..0000000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.streaming.cli; - -import java.util.List; - -import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.engine.streaming.BootstrapConfig; -import org.apache.kylin.engine.streaming.OneOffStreamingBuilder; -import org.apache.kylin.engine.streaming.monitor.StreamingMonitor; -import org.apache.kylin.metadata.realization.RealizationType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -public class StreamingCLI { - - private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class); - - public static void main(String[] args) { - try { - Preconditions.checkArgument(args[0].equals("streaming")); - Preconditions.checkArgument(args[1].equals("start")); - - int i = 2; - BootstrapConfig bootstrapConfig = new BootstrapConfig(); - while (i < args.length) { - String argName = args[i]; - switch (argName) { - case "-start": - bootstrapConfig.setStart(Long.parseLong(args[++i])); - break; - case "-end": - bootstrapConfig.setEnd(Long.parseLong(args[++i])); - break; - case "-cube": - bootstrapConfig.setCubeName(args[++i]); - break; - case "-fillGap": - bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i])); - break; - case "-maxFillGapRange": - bootstrapConfig.setMaxFillGapRange(Long.parseLong(args[++i])); - break; - default: - logger.warn("ignore this arg:" + argName); - } - i++; - } - if (bootstrapConfig.isFillGap()) { - final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(bootstrapConfig.getCubeName()); - logger.info("all gaps:" + StringUtils.join(gaps, ",")); - for (Pair<Long, Long> gap : gaps) { - List<Pair<Long, Long>> splitGaps = splitGap(gap, bootstrapConfig.getMaxFillGapRange()); - for (Pair<Long, Long> splitGap : splitGaps) { - logger.info("start filling the gap from " + splitGap.getFirst() + " to " + splitGap.getSecond()); - startOneOffCubeStreaming(bootstrapConfig.getCubeName(), splitGap.getFirst(), splitGap.getSecond()); - logger.info("finish filling the gap from " + splitGap.getFirst() + " to " + splitGap.getSecond()); - } - } - } else { - startOneOffCubeStreaming(bootstrapConfig.getCubeName(), bootstrapConfig.getStart(), bootstrapConfig.getEnd()); - logger.info("streaming process finished, exit with 0"); - System.exit(0); - } - } catch (Exception e) { - printArgsError(args); - logger.error("error start streaming", e); - System.exit(-1); - } - } - - private static List<Pair<Long, Long>> splitGap(Pair<Long, Long> gap, long maxFillGapRange) { - List<Pair<Long, Long>> gaps = Lists.newArrayList(); - Long startTime = gap.getFirst(); - - while (startTime < gap.getSecond()) { - Long endTime = gap.getSecond() <= startTime + maxFillGapRange ? gap.getSecond() : startTime + maxFillGapRange; - gaps.add(Pair.newPair(startTime, endTime)); - startTime = endTime; - } - - return gaps; - } - - private static void startOneOffCubeStreaming(String cubeName, long start, long end) { - final Runnable runnable = new OneOffStreamingBuilder(RealizationType.CUBE, cubeName, start, end).build(); - runnable.run(); - } - - private static void printArgsError(String[] args) { - logger.warn("invalid args:" + StringUtils.join(args, " ")); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java deleted file mode 100644 index 350a5f8..0000000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.streaming.cube; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; - -import javax.annotation.Nullable; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.common.util.StreamingBatch; -import org.apache.kylin.common.util.StreamingMessage; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.cube.inmemcubing.ICuboidWriter; -import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder; -import org.apache.kylin.cube.util.CubingUtils; -import org.apache.kylin.engine.EngineFactory; -import org.apache.kylin.engine.streaming.StreamingBatchBuilder; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; -import org.apache.kylin.metadata.model.IBuildable; -import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.model.TblColRef; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Function; -import com.google.common.collect.Lists; - -/** - */ -public class StreamingCubeBuilder implements StreamingBatchBuilder { - - private static final Logger logger = LoggerFactory.getLogger(StreamingCubeBuilder.class); - - private final String cubeName; - private int processedRowCount = 0; - - public StreamingCubeBuilder(String cubeName) { - this.cubeName = cubeName; - } - - @Override - public void build(StreamingBatch streamingBatch, Map<TblColRef, Dictionary<String>> dictionaryMap, ICuboidWriter cuboidWriter) { - try { - CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); - final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName); - final IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeInstance.getDescriptor()); - - LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(); - InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), flatDesc, dictionaryMap); - final Future<?> future = Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, cuboidWriter)); - processedRowCount = streamingBatch.getMessages().size(); - for (StreamingMessage streamingMessage : streamingBatch.getMessages()) { - blockingQueue.put(streamingMessage.getData()); - } - blockingQueue.put(Collections.<String> emptyList()); - future.get(); - cuboidWriter.flush(); - - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException("error build cube from StreamingBatch", e.getCause()); - } catch (IOException e) { - throw new RuntimeException("error build cube from StreamingBatch", e.getCause()); - } finally { - try { - cuboidWriter.close(); - } catch (IOException e) { - throw new RuntimeException("error build cube from StreamingBatch", e.getCause()); - } - } - } - - @Override - public IBuildable createBuildable(StreamingBatch streamingBatch) { - CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); - final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName); - try { - CubeSegment segment = cubeManager.appendSegment(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond()); - segment.setLastBuildJobID(segment.getUuid()); // give a fake job id - segment.setInputRecords(streamingBatch.getMessages().size()); - segment.setLastBuildTime(System.currentTimeMillis()); - return segment; - } catch (IOException e) { - throw new RuntimeException("failed to create IBuildable", e); - } - } - - @Override - public Map<Long, HyperLogLogPlusCounter> sampling(StreamingBatch streamingBatch) { - final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); - final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName); - final IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeInstance.getDescriptor()); - long start = System.currentTimeMillis(); - - final Map<Long, HyperLogLogPlusCounter> samplingResult = CubingUtils.sampling(cubeInstance.getDescriptor(), flatDesc, Lists.transform(streamingBatch.getMessages(), new Function<StreamingMessage, List<String>>() { - @Nullable - @Override - public List<String> apply(@Nullable StreamingMessage input) { - return input.getData(); - } - })); - logger.info(String.format("sampling of %d messages cost %d ms", streamingBatch.getMessages().size(), (System.currentTimeMillis() - start))); - return samplingResult; - } - - @Override - public Map<TblColRef, Dictionary<String>> buildDictionary(StreamingBatch streamingBatch, IBuildable buildable) { - final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); - final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName); - final Map<TblColRef, Dictionary<String>> dictionaryMap; - try { - dictionaryMap = CubingUtils.buildDictionary(cubeInstance, Lists.transform(streamingBatch.getMessages(), new Function<StreamingMessage, List<String>>() { - @Nullable - @Override - public List<String> apply(@Nullable StreamingMessage input) { - return input.getData(); - } - })); - Map<TblColRef, Dictionary<String>> realDictMap = CubingUtils.writeDictionary((CubeSegment) buildable, dictionaryMap, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond()); - return realDictMap; - } catch (IOException e) { - throw new RuntimeException("failed to build dictionary", e); - } - } - - @Override - public void commit(IBuildable buildable) { - CubeSegment cubeSegment = (CubeSegment) buildable; - cubeSegment.setStatus(SegmentStatusEnum.READY); - cubeSegment.setInputRecords(processedRowCount); - CubeUpdate cubeBuilder = new CubeUpdate(cubeSegment.getCubeInstance()); - cubeBuilder.setToUpdateSegs(cubeSegment); - try { - CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).updateCube(cubeBuilder); - } catch (IOException e) { - throw new RuntimeException("failed to update CubeSegment", e); - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java deleted file mode 100644 index fba664d..0000000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.streaming.diagnose; - -import java.io.File; -import java.nio.charset.Charset; -import java.nio.file.Files; -import java.text.SimpleDateFormat; -import java.util.List; -import java.util.TimeZone; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.commons.io.FileUtils; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -public class StreamingLogAnalyzer { - public static void main(String[] args) { - int errorFileCount = 0; - List<Long> ellapsedTimes = Lists.newArrayList(); - - String patternStr = "(\\d{2}/\\d{2}/\\d{2} \\d{2}:\\d{2}:\\d{2})"; - Pattern pattern = Pattern.compile(patternStr); - - SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); - format.setTimeZone(TimeZone.getTimeZone("GMT")); // NOTE: this must be GMT to calculate epoch date correctly - - Preconditions.checkArgument(args.length == 1, "Usage: StreamingLogsAnalyser streaming_logs_folder"); - for (File file : FileUtils.listFiles(new File(args[0]), new String[] { "log" }, false)) { - System.out.println("Processing file " + file.toString()); - - long startTime = 0; - long endTime = 0; - try { - List<String> contents = Files.readAllLines(file.toPath(), Charset.defaultCharset()); - for (int i = 0; i < contents.size(); ++i) { - Matcher m = pattern.matcher(contents.get(i)); - if (m.find()) { - startTime = format.parse("20" + m.group(1)).getTime(); - break; - } - } - - for (int i = contents.size() - 1; i >= 0; --i) { - Matcher m = pattern.matcher(contents.get(i)); - if (m.find()) { - endTime = format.parse("20" + m.group(1)).getTime(); - break; - } - } - - if (startTime == 0 || endTime == 0) { - throw new RuntimeException("start time or end time is not found"); - } - - if (endTime - startTime < 60000) { - System.out.println("Warning: this job took less than one minute!!!! " + file.toString()); - } - - ellapsedTimes.add(endTime - startTime); - - } catch (Exception e) { - System.out.println("Exception when processing log file " + file.toString()); - System.out.println(e); - errorFileCount++; - } - } - - System.out.println("Totally error files count " + errorFileCount); - System.out.println("Totally normal files processed " + ellapsedTimes.size()); - - long sum = 0; - for (Long x : ellapsedTimes) { - sum += x; - } - System.out.println("Avg build time " + (sum / ellapsedTimes.size())); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java deleted file mode 100644 index 55252c4..0000000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.streaming.monitor; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; - -import javax.annotation.Nullable; - -import org.apache.commons.httpclient.HttpClient; -import org.apache.commons.httpclient.methods.ByteArrayRequestEntity; -import org.apache.commons.httpclient.methods.PostMethod; -import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.JsonUtil; -import org.apache.kylin.common.util.MailService; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - */ -public class StreamingMonitor { - - private static final Logger logger = LoggerFactory.getLogger(StreamingMonitor.class); - - public void checkCountAll(List<String> receivers, String host, String authorization, String projectName, String tableName) { - String title = "checkCountAll job(host:" + host + " tableName:" + tableName + ") "; - StringBuilder stringBuilder = new StringBuilder(); - String url = host + "/kylin/api/query"; - PostMethod request = new PostMethod(url); - try { - - request.addRequestHeader("Authorization", "Basic " + authorization); - request.addRequestHeader("Content-Type", "application/json"); - String query = String.format("{\"sql\":\"select count(*) from %s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"%s\"}", tableName, projectName); - request.setRequestEntity(new ByteArrayRequestEntity(query.getBytes())); - - int statusCode = new HttpClient().executeMethod(request); - String msg = Bytes.toString(request.getResponseBody()); - stringBuilder.append("host:").append(host).append("\n"); - stringBuilder.append("query:").append(query).append("\n"); - stringBuilder.append("statusCode:").append(statusCode).append("\n"); - if (statusCode == 200) { - title += "succeed"; - final HashMap<?, ?> hashMap = JsonUtil.readValue(msg, HashMap.class); - stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n"); - stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n"); - } else { - title += "failed"; - stringBuilder.append("response:").append(msg).append("\n"); - } - } catch (Exception e) { - final StringWriter out = new StringWriter(); - e.printStackTrace(new PrintWriter(out)); - title += "failed"; - stringBuilder.append(out.toString()); - } finally { - request.releaseConnection(); - } - logger.info("title:" + title); - logger.info("content:" + stringBuilder.toString()); - sendMail(receivers, title, stringBuilder.toString()); - } - - public static final List<Pair<Long, Long>> findGaps(String cubeName) { - List<CubeSegment> segments = getSortedReadySegments(cubeName); - List<Pair<Long, Long>> gaps = Lists.newArrayList(); - for (int i = 0; i < segments.size() - 1; ++i) { - CubeSegment first = segments.get(i); - CubeSegment second = segments.get(i + 1); - if (first.getDateRangeEnd() == second.getDateRangeStart()) { - continue; - } else if (first.getDateRangeEnd() < second.getDateRangeStart()) { - gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart())); - } - } - return gaps; - } - - private static List<CubeSegment> getSortedReadySegments(String cubeName) { - final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName); - Preconditions.checkNotNull(cube); - final List<CubeSegment> segments = cube.getSegments(SegmentStatusEnum.READY); - logger.info("totally " + segments.size() + " cubeSegments"); - Collections.sort(segments); - return segments; - } - - public static final List<Pair<String, String>> findOverlaps(String cubeName) { - List<CubeSegment> segments = getSortedReadySegments(cubeName); - List<Pair<String, String>> overlaps = Lists.newArrayList(); - for (int i = 0; i < segments.size() - 1; ++i) { - CubeSegment first = segments.get(i); - CubeSegment second = segments.get(i + 1); - if (first.getDateRangeEnd() == second.getDateRangeStart()) { - continue; - } else { - overlaps.add(Pair.newPair(first.getName(), second.getName())); - } - } - return overlaps; - } - - public void checkCube(List<String> receivers, String cubeName, String host) { - final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName); - if (cube == null) { - logger.info("cube:" + cubeName + " does not exist"); - return; - } - List<Pair<Long, Long>> gaps = findGaps(cubeName); - List<Pair<String, String>> overlaps = Lists.newArrayList(); - StringBuilder content = new StringBuilder(); - if (!gaps.isEmpty()) { - content.append("all gaps:").append("\n").append(StringUtils.join(Lists.transform(gaps, new Function<Pair<Long, Long>, String>() { - @Nullable - @Override - public String apply(Pair<Long, Long> input) { - return parseInterval(input); - } - }), "\n")).append("\n"); - } - if (!overlaps.isEmpty()) { - content.append("all overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n"); - } - if (content.length() > 0) { - logger.info(content.toString()); - sendMail(receivers, String.format("%s has gaps or overlaps on host %s", cubeName, host), content.toString()); - } else { - logger.info("no gaps or overlaps"); - } - } - - private String parseInterval(Pair<Long, Long> interval) { - return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new Date(interval.getFirst()).toString(), interval.getSecond(), new Date(interval.getSecond()).toString()); - } - - private void sendMail(List<String> receivers, String title, String content) { - final MailService mailService = new MailService(KylinConfig.getInstanceFromEnv()); - mailService.sendMail(receivers, title, content, false); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java deleted file mode 100644 index 5790bc1..0000000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.streaming.util; - -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.engine.streaming.IStreamingInput; -import org.apache.kylin.engine.streaming.IStreamingOutput; -import org.apache.kylin.engine.streaming.StreamingBatchBuilder; -import org.apache.kylin.engine.streaming.cube.StreamingCubeBuilder; -import org.apache.kylin.metadata.realization.RealizationType; - -import com.google.common.base.Preconditions; - -/** - * TODO: like MRUtil, use Factory pattern to allow config - */ -public class StreamingUtils { - - public static IStreamingInput getStreamingInput() { - return (IStreamingInput) ClassUtil.newInstance("org.apache.kylin.source.kafka.KafkaStreamingInput"); - } - - public static IStreamingOutput getStreamingOutput() { - return (IStreamingOutput) ClassUtil.newInstance("org.apache.kylin.storage.hbase.steps.HBaseStreamingOutput"); - } - - public static StreamingBatchBuilder getMicroBatchBuilder(RealizationType realizationType, String realizationName) { - Preconditions.checkNotNull(realizationName); - if (realizationType == RealizationType.CUBE) { - return new StreamingCubeBuilder(realizationName); - } else { - throw new UnsupportedOperationException("not implemented yet"); - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index c30abc0..a47fcde 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -44,8 +44,8 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.CubeUpdate; import org.apache.kylin.engine.EngineFactory; -import org.apache.kylin.engine.streaming.StreamingConfig; -import org.apache.kylin.engine.streaming.StreamingManager; +import org.apache.kylin.metadata.streaming.StreamingConfig; +import org.apache.kylin.metadata.streaming.StreamingManager; import org.apache.kylin.job.DeployUtil; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 09ef0e8..72e4069 100644 --- a/pom.xml +++ b/pom.xml @@ -225,11 +225,6 @@ </dependency> <dependency> <groupId>org.apache.kylin</groupId> - <artifactId>kylin-engine-streaming</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> <artifactId>kylin-engine-spark</artifactId> <version>${project.version}</version> </dependency> @@ -1017,7 +1012,6 @@ <module>core-job</module> <module>core-storage</module> <module>engine-mr</module> - <module>engine-streaming</module> <module>engine-spark</module> <module>source-hive</module> <module>source-kafka</module> http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java index f3374c3..a5fb874 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java @@ -26,7 +26,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.engine.streaming.StreamingConfig; +import org.apache.kylin.metadata.streaming.StreamingConfig; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.rest.exception.BadRequestException; http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java index c4af5f4..34cc57f 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java @@ -32,7 +32,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.engine.streaming.StreamingConfig; +import org.apache.kylin.metadata.streaming.StreamingConfig; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.ColumnDesc; http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java index abf0638..170c395 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java @@ -29,7 +29,7 @@ import org.apache.kylin.cube.CubeDescManager; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; -import org.apache.kylin.engine.streaming.StreamingManager; +import org.apache.kylin.metadata.streaming.StreamingManager; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.Output; http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java index e49e882..7310d9c 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java @@ -23,7 +23,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.engine.streaming.StreamingConfig; +import org.apache.kylin.metadata.streaming.StreamingConfig; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.exception.InternalErrorException; import org.springframework.beans.factory.annotation.Autowired; http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml index d4cdfd5..e2100c4 100644 --- a/source-kafka/pom.xml +++ b/source-kafka/pom.xml @@ -43,12 +43,6 @@ <artifactId>kylin-core-common</artifactId> </dependency> - - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-engine-streaming</artifactId> - </dependency> - <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java deleted file mode 100644 index 6981096..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.source.kafka; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; - -/** - */ -class ByteBufferBackedInputStream extends InputStream { - - private ByteBuffer buf; - - public ByteBufferBackedInputStream(ByteBuffer buf) { - this.buf = buf; - } - - @Override - public int read() throws IOException { - if (!buf.hasRemaining()) { - return -1; - } - return buf.get() & 0xFF; - } - - @Override - public int read(byte[] bytes, int off, int len) throws IOException { - if (!buf.hasRemaining()) { - return -1; - } - - len = Math.min(len, buf.remaining()); - buf.get(bytes, off, len); - return len; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index d039583..208c0ce 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -20,7 +20,7 @@ package org.apache.kylin.source.kafka; import com.google.common.collect.Lists; import org.apache.kylin.engine.mr.IMRInput; -import org.apache.kylin.engine.streaming.StreamingConfig; +import org.apache.kylin.metadata.streaming.StreamingConfig; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.source.ISource; import org.apache.kylin.source.ReadableTable; http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java deleted file mode 100644 index 78a67c2..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.source.kafka; - -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import com.google.common.base.Function; -import kafka.cluster.BrokerEndPoint; -import org.apache.kafka.common.protocol.SecurityProtocol; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.common.util.StreamingBatch; -import org.apache.kylin.common.util.StreamingMessage; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.engine.EngineFactory; -import org.apache.kylin.engine.streaming.IStreamingInput; -import org.apache.kylin.engine.streaming.StreamingConfig; -import org.apache.kylin.engine.streaming.StreamingManager; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.realization.RealizationType; -import org.apache.kylin.source.kafka.config.KafkaClusterConfig; -import org.apache.kylin.source.kafka.config.KafkaConfig; -import org.apache.kylin.source.kafka.util.KafkaRequester; -import org.apache.kylin.source.kafka.util.KafkaUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -import kafka.cluster.Broker; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.PartitionMetadata; -import kafka.message.MessageAndOffset; - -import javax.annotation.Nullable; - -@SuppressWarnings("unused") -public class KafkaStreamingInput implements IStreamingInput { - - private static final Logger logger = LoggerFactory.getLogger(KafkaStreamingInput.class); - - @Override - public StreamingBatch getBatchWithTimeWindow(RealizationType realizationType, String realizationName, int id, long startTime, long endTime) { - if (realizationType != RealizationType.CUBE) { - throw new IllegalArgumentException("Unsupported realization in KafkaStreamingInput: " + realizationType); - } - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(realizationName); - final String streaming = cube.getFactTable(); - final StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig); - final StreamingConfig streamingConfig = streamingManager.getConfig(streaming); - if (streamingConfig == null) { - throw new IllegalArgumentException("Table " + streaming + " is not a streaming table."); - } - if (StreamingConfig.STREAMING_TYPE_KAFKA.equals(streamingConfig.getType())) { - logger.info(String.format("prepare to get streaming batch, name:%s, id:%d, startTime:%d, endTime:%d", streaming, id, startTime, endTime)); - - try { - final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig); - final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming); - List<TblColRef> columns = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor()).getAllColumns(); - - final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns); - final ExecutorService executorService = Executors.newCachedThreadPool(); - final List<Future<List<StreamingMessage>>> futures = Lists.newArrayList(); - for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) { - - final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size(); - for (int i = 0; i < partitionCount; ++i) { - final StreamingMessageProducer producer = new StreamingMessageProducer(kafkaClusterConfig, i, Pair.newPair(startTime, endTime), kafkaConfig.getMargin(), streamingParser); - final Future<List<StreamingMessage>> future = executorService.submit(producer); - futures.add(future); - } - } - List<StreamingMessage> messages = Lists.newLinkedList(); - for (Future<List<StreamingMessage>> future : futures) { - try { - messages.addAll(future.get()); - } catch (InterruptedException e) { - logger.warn("this thread should not be interrupted, just ignore", e); - continue; - } catch (ExecutionException e) { - throw new RuntimeException("error when get StreamingMessages", e.getCause()); - } - } - final Pair<Long, Long> timeRange = Pair.newPair(startTime, endTime); - logger.info("finish to get streaming batch, total message count:" + messages.size()); - return new StreamingBatch(messages, timeRange); - } catch (ReflectiveOperationException e) { - throw new RuntimeException("failed to create instance of StreamingParser", e); - } - } else { - throw new IllegalArgumentException("kafka is the only supported streaming type."); - } - } - - private static class StreamingMessageProducer implements Callable<List<StreamingMessage>> { - - private final KafkaClusterConfig kafkaClusterConfig; - private final int partitionId; - private final StreamingParser streamingParser; - private final Pair<Long, Long> timeRange; - private final long margin; - - private List<Broker> replicaBrokers; - - StreamingMessageProducer(KafkaClusterConfig kafkaClusterConfig, int partitionId, Pair<Long, Long> timeRange, long margin, StreamingParser streamingParser) { - this.kafkaClusterConfig = kafkaClusterConfig; - this.partitionId = partitionId; - this.streamingParser = streamingParser; - this.margin = margin; - this.timeRange = timeRange; - this.replicaBrokers = kafkaClusterConfig.getBrokers(); - } - - private Broker getLeadBroker() { - final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, replicaBrokers, kafkaClusterConfig); - if (partitionMetadata != null) { - if (partitionMetadata.errorCode() != 0) { - logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode()); - } - replicaBrokers = Lists.transform(partitionMetadata.replicas(), new Function<BrokerEndPoint, Broker>() { - @Nullable - @Override - public Broker apply(@Nullable BrokerEndPoint brokerEndPoint) { - return new Broker(brokerEndPoint, SecurityProtocol.PLAINTEXT); - } - }); - BrokerEndPoint leaderEndpoint = partitionMetadata.leader(); - - return new Broker(leaderEndpoint, SecurityProtocol.PLAINTEXT); - } else { - return null; - } - } - - @Override - public List<StreamingMessage> call() throws Exception { - List<StreamingMessage> result = Lists.newLinkedList(); - try { - long startTimestamp = timeRange.getFirst() - margin; - long offset = KafkaUtils.findClosestOffsetWithDataTimestamp(kafkaClusterConfig, partitionId, startTimestamp, streamingParser); - int fetchRound = 0; - int consumeMsgCount = 0; - Broker leadBroker = null; - String topic = kafkaClusterConfig.getTopic(); - while (true) { - boolean outOfMargin = false; - int consumeMsgCountAtBeginning = consumeMsgCount; - fetchRound++; - - if (leadBroker == null) { - leadBroker = getLeadBroker(); - } - - if (leadBroker == null) { - logger.warn("cannot find lead broker, wait 5s"); - Thread.sleep(5000); - continue; - } - - logger.info("fetching topic {} partition id {} offset {} leader {}", topic, String.valueOf(partitionId), String.valueOf(offset), leadBroker.toString()); - - final FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, kafkaClusterConfig); - if (fetchResponse.errorCode(topic, partitionId) != 0) { - logger.warn("fetch response offset:" + offset + " errorCode:" + fetchResponse.errorCode(topic, partitionId)); - Thread.sleep(30000); - continue; - } - - for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) { - offset++; - consumeMsgCount++; - final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload()); - streamingMessage.setOffset(messageAndOffset.offset()); - if (streamingParser.filter(streamingMessage)) { - final long timestamp = streamingMessage.getTimestamp(); - if (timestamp >= timeRange.getFirst() && timestamp < timeRange.getSecond()) { - result.add(streamingMessage); - } else if (timestamp < timeRange.getSecond() + margin) { - //do nothing - } else { - logger.info("thread:" + Thread.currentThread() + " message timestamp:" + timestamp + " is out of time range:" + timeRange + " margin:" + margin); - outOfMargin = true; - break; - } - } - } - logger.info("Number of messages consumed: " + consumeMsgCount + " offset is: " + offset + " total fetch round: " + fetchRound); - if (outOfMargin) { - break; - } - if (consumeMsgCount == consumeMsgCountAtBeginning) {//nothing this round - logger.info("no message consumed this round, wait 30s"); - Thread.sleep(30000); - } - } - } catch (InterruptedException e) { - logger.warn("this thread should not be interrupted, just stop fetching", e); - } - return result; - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java index e4c702d..633a30c 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java @@ -33,6 +33,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.source.kafka.util.ByteBufferBackedInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java deleted file mode 100644 index b1b4011..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java +++ /dev/null @@ -1,312 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.source.kafka.diagnose; - -import java.text.SimpleDateFormat; -import java.util.Calendar; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import com.google.common.collect.Maps; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.AbstractApplication; -import org.apache.kylin.common.util.DaemonThreadFactory; -import org.apache.kylin.common.util.OptionsHelper; -import org.apache.kylin.common.util.StreamingMessage; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.source.kafka.KafkaConfigManager; -import org.apache.kylin.source.kafka.StreamingParser; -import org.apache.kylin.source.kafka.TimedJsonStreamParser; -import org.apache.kylin.source.kafka.config.KafkaClusterConfig; -import org.apache.kylin.source.kafka.config.KafkaConfig; -import org.apache.kylin.source.kafka.util.KafkaRequester; -import org.apache.kylin.source.kafka.util.KafkaUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -import kafka.api.OffsetRequest; -import kafka.cluster.Broker; -import kafka.javaapi.FetchResponse; -import kafka.message.MessageAndOffset; - -/** - * Continuously run this as a daemon to discover how "disordered" the kafka queue is. - * This daemon only store a digest so it should not be space-consuming - */ -public class KafkaInputAnalyzer extends AbstractApplication { - - public class KafkaMessagePuller implements Runnable { - - private final String topic; - private final int partitionId; - private final KafkaClusterConfig streamingConfig; - private final LinkedBlockingQueue<StreamingMessage> streamQueue; - private final StreamingParser streamingParser; - private final Broker leadBroker; - private long offset; - - protected final Logger logger; - - public KafkaMessagePuller(int clusterID, String topic, int partitionId, long startOffset, Broker leadBroker, KafkaClusterConfig kafkaClusterConfig, StreamingParser streamingParser) { - this.topic = topic; - this.partitionId = partitionId; - this.streamingConfig = kafkaClusterConfig; - this.offset = startOffset; - this.logger = LoggerFactory.getLogger(topic + "_cluster_" + clusterID + "_" + partitionId); - this.streamQueue = new LinkedBlockingQueue<StreamingMessage>(10000); - this.streamingParser = streamingParser; - this.leadBroker = leadBroker; - } - - public BlockingQueue<StreamingMessage> getStreamQueue() { - return streamQueue; - } - - @Override - public void run() { - try { - int consumeMsgCount = 0; - int fetchRound = 0; - while (true) { - int consumeMsgCountAtBeginning = consumeMsgCount; - fetchRound++; - - logger.info("fetching topic {} partition id {} offset {} leader {}", topic, String.valueOf(partitionId), String.valueOf(offset), leadBroker.toString()); - - final FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, streamingConfig); - if (fetchResponse.errorCode(topic, partitionId) != 0) { - logger.warn("fetch response offset:" + offset + " errorCode:" + fetchResponse.errorCode(topic, partitionId)); - Thread.sleep(30000); - continue; - } - - for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) { - offset++; - consumeMsgCount++; - - final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload()); - streamingMessage.setOffset(messageAndOffset.offset()); - if (streamingParser.filter(streamingMessage)) { - streamQueue.add(streamingMessage); - } - - } - logger.info("Number of messages consumed: " + consumeMsgCount + " offset is: " + offset + " total fetch round: " + fetchRound); - - if (consumeMsgCount == consumeMsgCountAtBeginning) {//nothing this round - Thread.sleep(30000); - } - } - } catch (Exception e) { - logger.error("consumer has encountered an error", e); - } - } - - } - - @SuppressWarnings("static-access") - private static final Option OPTION_STREAMING = OptionBuilder.withArgName("streaming").hasArg().isRequired(true).withDescription("Name of the streaming").create("streaming"); - @SuppressWarnings("static-access") - private static final Option OPTION_TASK = OptionBuilder.withArgName("task").hasArg().isRequired(true).withDescription("get delay or get disorder degree").create("task"); - @SuppressWarnings("static-access") - private static final Option OPTION_TSCOLNAME = OptionBuilder.withArgName("tsColName").hasArg().isRequired(true).withDescription("field name of the ts").create("tsColName"); - - private static final Logger logger = LoggerFactory.getLogger(KafkaInputAnalyzer.class); - - private StreamingParser parser; - private KafkaConfig kafkaConfig; - - private Options options; - - public KafkaInputAnalyzer() { - options = new Options(); - options.addOption(OPTION_STREAMING); - options.addOption(OPTION_TASK); - options.addOption(OPTION_TSCOLNAME); - - } - - private List<BlockingQueue<StreamingMessage>> consume(final int clusterID, final KafkaClusterConfig kafkaClusterConfig, final int partitionCount, long whichtime) { - List<BlockingQueue<StreamingMessage>> result = Lists.newArrayList(); - for (int partitionId = 0; partitionId < partitionCount; ++partitionId) { - final kafka.cluster.Broker leadBroker = KafkaUtils.getLeadBroker(kafkaClusterConfig, partitionId); - long streamingOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, whichtime, leadBroker, kafkaClusterConfig); - logger.info("starting offset:" + streamingOffset + " cluster id:" + clusterID + " partitionId:" + partitionId); - KafkaMessagePuller consumer = new KafkaMessagePuller(clusterID, kafkaClusterConfig.getTopic(), partitionId, streamingOffset, leadBroker, kafkaClusterConfig, parser); - Executors.newSingleThreadExecutor(new DaemonThreadFactory()).submit(consumer); - result.add(consumer.getStreamQueue()); - } - return result; - } - - private List<BlockingQueue<StreamingMessage>> consumeAll(long whichtime) { - int clusterId = 0; - final List<BlockingQueue<StreamingMessage>> queues = Lists.newLinkedList(); - - for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) { - final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size(); - final List<BlockingQueue<StreamingMessage>> oneClusterQueue = consume(clusterId, kafkaClusterConfig, partitionCount, whichtime); - queues.addAll(oneClusterQueue); - logger.info("Cluster {} with {} partitions", clusterId, oneClusterQueue.size()); - clusterId++; - } - return queues; - } - - private void analyzeLatency() throws InterruptedException { - long[] intervals = new long[] { 1, 5, 60, 300, 1800 }; - final List<BlockingQueue<StreamingMessage>> allPartitionData = consumeAll(OffsetRequest.LatestTime()); - final List<TimeHistogram> allHistograms = Lists.newArrayList(); - final TimeHistogram overallHistogram = new TimeHistogram(intervals, "overall"); - - ExecutorService executorService = Executors.newFixedThreadPool(allPartitionData.size(), new DaemonThreadFactory()); - for (int i = 0; i < allPartitionData.size(); ++i) { - final int index = i; - allHistograms.add(new TimeHistogram(intervals, "" + i)); - executorService.submit(new Runnable() { - @Override - public void run() { - while (true) { - try { - StreamingMessage message = allPartitionData.get(index).take(); - long t = message.getTimestamp(); - allHistograms.get(index).processMillis(System.currentTimeMillis() - t); - overallHistogram.processMillis(System.currentTimeMillis() - t); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - }); - } - - while (true) { - System.out.println("Printing status at : " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(Calendar.getInstance().getTime())); - - for (TimeHistogram histogram : allHistograms) { - histogram.printStatus(); - } - overallHistogram.printStatus(); - Thread.sleep(300000); - } - } - - private void analyzeDisorder() throws InterruptedException { - final List<BlockingQueue<StreamingMessage>> allPartitionData = consumeAll(OffsetRequest.EarliestTime()); - - final List<Long> wallClocks = Lists.newArrayList(); - final List<Long> wallOffset = Lists.newArrayList(); - final List<Long> maxDisorderTime = Lists.newArrayList(); - final List<Long> maxDisorderOffset = Lists.newArrayList(); - final List<Long> processedMessages = Lists.newArrayList(); - - for (int i = 0; i < allPartitionData.size(); i++) { - wallClocks.add(0L); - wallOffset.add(0L); - maxDisorderTime.add(0L); - maxDisorderOffset.add(0L); - processedMessages.add(0L); - } - - ExecutorService executorService = Executors.newFixedThreadPool(allPartitionData.size(), new DaemonThreadFactory()); - final CountDownLatch countDownLatch = new CountDownLatch(allPartitionData.size()); - for (int i = 0; i < allPartitionData.size(); ++i) { - final int index = i; - executorService.submit(new Runnable() { - @Override - public void run() { - try { - while (true) { - StreamingMessage message = allPartitionData.get(index).poll(60, TimeUnit.SECONDS); - if (message == null) { - System.out.println(String.format("Thread %d is exiting", index)); - break; - } - long t = message.getTimestamp(); - long offset = message.getOffset(); - if (t < wallClocks.get(index)) { - maxDisorderTime.set(index, Math.max(wallClocks.get(index) - t, maxDisorderTime.get(index))); - maxDisorderOffset.set(index, Math.max(offset - wallOffset.get(index), maxDisorderOffset.get(index))); - } else { - wallClocks.set(index, t); - wallOffset.set(index, offset); - } - processedMessages.set(index, processedMessages.get(index) + 1); - - if (processedMessages.get(index) % 10000 == 1) { - System.out.println(String.format("Thread %d processed %d messages. Max disorder time is %d , max disorder offset is %d", // - index, processedMessages.get(index), maxDisorderTime.get(index), maxDisorderOffset.get(index))); - } - } - - System.out.println(String.format("Thread %d finishes after %d messages. Max disorder time is %d , max disorder offset is %d", // - index, processedMessages.get(index), maxDisorderTime.get(index), maxDisorderOffset.get(index))); - countDownLatch.countDown(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - }); - } - - countDownLatch.await(); - } - - @Override - protected Options getOptions() { - return options; - } - - @Override - protected void execute(OptionsHelper optionsHelper) throws Exception { - - String streaming = optionsHelper.getOptionValue(OPTION_STREAMING); - String task = optionsHelper.getOptionValue(OPTION_TASK); - String tsColName = optionsHelper.getOptionValue(OPTION_TSCOLNAME); - - Map<String, String> properties = Maps.newHashMap(); - properties.put(StreamingParser.PROPERTY_TS_COLUMN_NAME, tsColName); - kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(streaming); - parser = new TimedJsonStreamParser(Lists.<TblColRef> newArrayList(), properties); - - if ("disorder".equalsIgnoreCase(task)) { - analyzeDisorder(); - } else if ("delay".equalsIgnoreCase(task)) { - analyzeLatency(); - } else { - optionsHelper.printUsage(this.getClass().getName(), options); - } - } - - public static void main(String[] args) { - KafkaInputAnalyzer analyzer = new KafkaInputAnalyzer(); - analyzer.execute(args); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5aee0226/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java deleted file mode 100644 index 6a456bc..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.source.kafka.diagnose; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.lang.StringUtils; - -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.type.MapType; -import com.fasterxml.jackson.databind.type.SimpleType; - -/** - * only for verify kylin streaming's correctness by comparing to data in original kafka topic - */ -public class KafkaVerify { - - public static void main(String[] args) throws IOException { - - System.out.println("start"); - - ObjectMapper mapper = new ObjectMapper(); - JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class)); - - long start = Long.valueOf(args[0]); - long end = Long.valueOf(args[1]); - long interval = Long.valueOf(args[2]); - int bucket = (int) ((end - start + interval - 1) / interval); - - long[] qtySum = new long[bucket]; - long qtyTotal = 0; - long[] counts = new long[bucket]; - long countTotal = 0; - long processed = 0; - long minOffset = -1; - long maxOffset = -1; - - try (BufferedReader br = new BufferedReader(new FileReader(new File(args[3])))) { - String s; - while ((s = br.readLine()) != null) { - // process the line. - if (++processed % 10000 == 1) { - System.out.println("processing " + processed); - } - - Map<String, String> root = mapper.readValue(s, mapType); - String tsStr = root.get("sys_ts"); - - if (StringUtils.isEmpty(tsStr)) { - continue; - } - long ts = Long.valueOf(tsStr); - if (ts < start || ts >= end) { - continue; - } - - if (minOffset == -1) { - minOffset = processed - 1; - } - maxOffset = processed - 1; - - long qty = Long.valueOf(root.get("qty")); - int index = (int) ((ts - start) / interval); - qtySum[index] += qty; - qtyTotal += qty; - counts[index]++; - countTotal++; - } - } - - System.out.println("qty sum is " + Arrays.toString(qtySum)); - System.out.println("qty total is " + qtyTotal); - System.out.println("count is " + Arrays.toString(counts)); - System.out.println("count total is " + countTotal); - System.out.println("first processed is " + minOffset); - System.out.println("last processed is " + maxOffset); - } -}