KYLIN-1762 fix query test error Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ab5563a8 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ab5563a8 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ab5563a8
Branch: refs/heads/KYLIN-1726-2 Commit: ab5563a8ec060fba48ec8f43244bed6f887b0e83 Parents: be18158 Author: shaofengshi <shaofeng...@apache.org> Authored: Sun Sep 25 21:41:37 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Tue Sep 27 10:17:40 2016 +0800 ---------------------------------------------------------------------- .../kylin/job/streaming/Kafka10DataLoader.java | 2 +- .../cube/model/CubeJoinedFlatTableDesc.java | 2 +- .../mr/steps/FactDistinctColumnPartitioner.java | 3 ++ .../engine/mr/steps/FactDistinctColumnsJob.java | 2 +- .../mr/steps/FactDistinctColumnsReducer.java | 38 +++++++++----- .../mr/steps/FactDistinctHiveColumnsMapper.java | 49 ++++++++++++++++-- .../kafka/DEFAULT.STREAMING_TABLE.json | 1 + .../kylin/provision/BuildCubeWithStream.java | 52 ++++++++++++-------- .../kylin/provision/BuildCubeWithStream2.java | 4 +- .../apache/kylin/query/ITKylinQueryTest.java | 3 ++ .../org/apache/kylin/query/KylinTestBase.java | 2 +- .../apache/kylin/source/kafka/KafkaMRInput.java | 2 +- .../source/kafka/TimedJsonStreamParser.java | 7 +-- 13 files changed, 117 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java index 2b299cc..8c548be 100644 --- a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java +++ b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java @@ -65,7 +65,7 @@ public class Kafka10DataLoader extends StreamDataLoader { props.put("retry.backoff.ms", "1000"); KafkaProducer producer = KafkaClient.getKafkaProducer(brokerList, props); - for (int i = 0; i < messages.size(); ++i) { + for (int i = 0; i < messages.size(); i++) { ProducerRecord<String, String> keyedMessage = new ProducerRecord<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i)); producer.send(keyedMessage); } http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java index 6ca89c8..5cd4f1d 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java @@ -143,7 +143,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { public int getColumnIndex(TblColRef colRef) { Integer index = columnIndexMap.get(colRef); if (index == null) - throw new IllegalArgumentException("Column " + colRef.toString() + " wasn't found on flat table."); + return -1; return index.intValue(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java index a631cf4..6973c4b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java @@ -34,6 +34,9 @@ public class FactDistinctColumnPartitioner extends Partitioner<Text, Text> { if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_HLL) { // the last reducer is for merging hll return numReduceTasks - 1; + } else if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_PARTITION_COL) { + // the last reducer is for merging hll + return numReduceTasks - 2; } else { int colIndex = BytesUtil.readUnsigned(key.getBytes(), 0, 1); return colIndex; http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java index a6c4d30..a9cc17f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@ -101,7 +101,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { System.out.println("Found segment " + segment); } setupMapper(cube.getSegmentById(segmentID)); - setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 1 : columnsNeedDict.size()); + setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 2 : columnsNeedDict.size()); attachKylinPropsAndMetadata(cube, job.getConfiguration()); http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 0c13df7..2889ba8 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -65,7 +65,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri private List<ByteArray> colValues; private TblColRef col = null; private boolean isStatistics = false; - private boolean outputTouched = false; + private boolean isPartitionCol = false; private KylinConfig cubeConfig; protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class); @@ -92,25 +92,25 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri baseCuboidRowCountInMappers = Lists.newArrayList(); cuboidHLLMap = Maps.newHashMap(); samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); + } else if (collectStatistics && (taskId == numberOfTasks - 2)) { + // partition col + isStatistics = false; + isPartitionCol = true; + col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); + colValues = Lists.newLinkedList(); } else { // col isStatistics = false; + isPartitionCol = false; col = columnList.get(taskId); - colValues = Lists.newArrayList(); + colValues = Lists.newLinkedList(); } } @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { - if (isStatistics == false) { - colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1))); - if (colValues.size() == 1000000) { //spill every 1 million - logger.info("spill values to disk..."); - outputDistinctValues(col, colValues, context); - colValues.clear(); - } - } else { + if (isStatistics == true) { // for hll long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG); for (Text value : values) { @@ -130,6 +130,21 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri cuboidHLLMap.put(cuboidId, hll); } } + } else if (isPartitionCol == true) { + // for partition col min/max value + ByteArray value = new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1)); + if (colValues.size() > 1) { + colValues.set(1, value); + } else { + colValues.add(value); + } + } else { + colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1))); + if (colValues.size() == 1000000) { //spill every 1 million + logger.info("spill values to disk..."); + outputDistinctValues(col, colValues, context); + colValues.clear(); + } } } @@ -156,7 +171,6 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri } } finally { IOUtils.closeQuietly(out); - outputTouched = true; } } @@ -164,7 +178,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri protected void cleanup(Context context) throws IOException, InterruptedException { if (isStatistics == false) { - if (!outputTouched || colValues.size() > 0) { + if (colValues.size() > 0) { outputDistinctValues(col, colValues, context); colValues.clear(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java index 5e278f8..86ef487 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java @@ -35,6 +35,7 @@ import com.google.common.collect.Lists; import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; +import org.apache.kylin.metadata.model.TblColRef; /** */ @@ -52,8 +53,12 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap private ByteArray[] row_hashcodes = null; private ByteBuffer keyBuffer; private static final Text EMPTY_TEXT = new Text(); + public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE; public static final byte MARK_FOR_HLL = (byte) 0xFF; + private int partitionColumnIndex = -1; + private boolean needFetchPartitionCol = true; + @Override protected void setup(Context context) throws IOException { super.setup(context); @@ -81,6 +86,26 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap for (int i = 0; i < nRowKey; i++) { row_hashcodes[i] = new ByteArray(); } + + TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); + if (partitionColRef != null) { + partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef); + } + + // check whether need fetch the partition col values + if (partitionColumnIndex < 0) { + // if partition col not on cube, no need + needFetchPartitionCol = false; + } else { + for (int x : dictionaryColumnIndex) { + if (x == partitionColumnIndex) { + // if partition col already build dict, no need + needFetchPartitionCol = false; + break; + } + } + } + } } @@ -108,24 +133,38 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap @Override public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException { String[] row = flatTableInputFormat.parseMapperInput(record); + + keyBuffer.clear(); try { for (int i = 0; i < factDictCols.size(); i++) { String fieldValue = row[dictionaryColumnIndex[i]]; if (fieldValue == null) continue; - - keyBuffer.clear(); + int offset = keyBuffer.position(); keyBuffer.put(Bytes.toBytes(i)[3]); // one byte is enough keyBuffer.put(Bytes.toBytes(fieldValue)); - outputKey.set(keyBuffer.array(), 0, keyBuffer.position()); + outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset); context.write(outputKey, EMPTY_TEXT); } } catch (Exception ex) { handleErrorRecord(row, ex); } - if (collectStatistics && rowCount < samplingPercentage) { - putRowKeyToHLL(row); + if (collectStatistics) { + if (rowCount < samplingPercentage) { + putRowKeyToHLL(row); + } + + if (needFetchPartitionCol == true) { + String fieldValue = row[partitionColumnIndex]; + if (fieldValue != null) { + int offset = keyBuffer.position(); + keyBuffer.put(MARK_FOR_PARTITION_COL); + keyBuffer.put(Bytes.toBytes(fieldValue)); + outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset); + context.write(outputKey, EMPTY_TEXT); + } + } } if (rowCount++ == 100) http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json b/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json index 6a64cce..e3ac2d6 100644 --- a/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json +++ b/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json @@ -6,6 +6,7 @@ "timeout": 60000, "bufferSize": 65536, "parserName": "org.apache.kylin.source.kafka.TimedJsonStreamParser", + "parserProperties": "tsColName=timestamp", "last_modified": 0, "clusters": [ { http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index bfe1d0a..dfcedfb 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -69,10 +69,19 @@ public class BuildCubeWithStream { private KafkaConfig kafkaConfig; private MockKafka kafkaServer; + protected static boolean fastBuildMode = false; public void before() throws Exception { deployEnv(); + String fastModeStr = System.getProperty("fastBuildMode"); + if (fastModeStr != null && fastModeStr.equalsIgnoreCase("true")) { + fastBuildMode = true; + logger.info("Will use fast build mode"); + } else { + logger.info("Will not use fast build mode"); + } + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); jobService = ExecutableManager.getInstance(kylinConfig); scheduler = DefaultScheduler.createInstance(); @@ -139,29 +148,32 @@ public class BuildCubeWithStream { generateStreamData(date1, date2, numberOfRecrods1); ExecutableState result = buildSegment(cubeName, 0, Long.MAX_VALUE); Assert.assertTrue(result == ExecutableState.SUCCEED); - long date3 = f.parse("2013-04-01").getTime(); - int numberOfRecords2 = 5000; - generateStreamData(date2, date3, numberOfRecords2); - result = buildSegment(cubeName, 0, Long.MAX_VALUE); - Assert.assertTrue(result == ExecutableState.SUCCEED); - //empty build - result = buildSegment(cubeName, 0, Long.MAX_VALUE); - Assert.assertTrue(result == ExecutableState.DISCARDED); + if (fastBuildMode == false) { + long date3 = f.parse("2013-04-01").getTime(); + int numberOfRecords2 = 5000; + generateStreamData(date2, date3, numberOfRecords2); + result = buildSegment(cubeName, 0, Long.MAX_VALUE); + Assert.assertTrue(result == ExecutableState.SUCCEED); - //merge - result = mergeSegment(cubeName, 0, 15000); - Assert.assertTrue(result == ExecutableState.SUCCEED); + //empty build + result = buildSegment(cubeName, 0, Long.MAX_VALUE); + Assert.assertTrue(result == ExecutableState.DISCARDED); + + //merge + result = mergeSegment(cubeName, 0, 15000); + Assert.assertTrue(result == ExecutableState.SUCCEED); - List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments(); - Assert.assertTrue(segments.size() == 1); + List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments(); + Assert.assertTrue(segments.size() == 1); - CubeSegment toRefreshSeg = segments.get(0); - HashMap<String, String> partitionOffsetMap = toRefreshSeg.getAdditionalInfo(); + CubeSegment toRefreshSeg = segments.get(0); + HashMap<String, String> partitionOffsetMap = toRefreshSeg.getAdditionalInfo(); - refreshSegment(cubeName, toRefreshSeg.getSourceOffsetStart(), toRefreshSeg.getSourceOffsetEnd(), partitionOffsetMap); - segments = cubeManager.getCube(cubeName).getSegments(); - Assert.assertTrue(segments.size() == 1); + refreshSegment(cubeName, toRefreshSeg.getSourceOffsetStart(), toRefreshSeg.getSourceOffsetEnd(), partitionOffsetMap); + segments = cubeManager.getCube(cubeName).getSegments(); + Assert.assertTrue(segments.size() == 1); + } } @@ -197,8 +209,8 @@ public class BuildCubeWithStream { protected void deployEnv() throws IOException { DeployUtil.overrideJobJarLocations(); - //DeployUtil.initCliWorkDir(); - //DeployUtil.deployMetadata(); +// DeployUtil.initCliWorkDir(); +// DeployUtil.deployMetadata(); } public static void beforeClass() throws Exception { http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java index 7959701..d8c857f 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java @@ -45,7 +45,7 @@ import static java.lang.Thread.sleep; public class BuildCubeWithStream2 extends BuildCubeWithStream { private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream2.class); - private static boolean generateData = true; + private boolean generateData = true; @Override public void build() throws Exception { @@ -76,6 +76,7 @@ public class BuildCubeWithStream2 extends BuildCubeWithStream { List<FutureTask<ExecutableState>> futures = Lists.newArrayList(); for (int i = 0; i < 5; i++) { + Thread.sleep(2 * 60 * 1000); // sleep 2 mintues FutureTask futureTask = new FutureTask(new Callable<ExecutableState>() { @Override public ExecutableState call() { @@ -92,7 +93,6 @@ public class BuildCubeWithStream2 extends BuildCubeWithStream { executorService.submit(futureTask); futures.add(futureTask); - Thread.sleep(2 * 60 * 1000); // sleep 2 mintues } generateData = false; // stop generating message to kafka http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java index 59a3a04..93d47f1 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java @@ -24,6 +24,7 @@ import java.io.File; import java.sql.SQLException; import java.util.List; import java.util.Map; +import java.util.TimeZone; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; @@ -65,6 +66,8 @@ public class ITKylinQueryTest extends KylinTestBase { RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_left_join_empty]"); RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_inner_join_empty]"); + + TimeZone.setDefault(TimeZone.getTimeZone("UTC")); } @AfterClass http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java index d0bcf52..57c4f4d 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java @@ -586,7 +586,7 @@ public class KylinTestBase { //setup cube conn File olapTmp = OLAPSchemaFactory.createTempOLAPJson(ProjectInstance.DEFAULT_PROJECT_NAME, config); Properties props = new Properties(); - props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, "15001"); + props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, "20001"); cubeConnection = DriverManager.getConnection("jdbc:calcite:model=" + olapTmp.getAbsolutePath(), props); //setup h2 http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index a5f678f..729719a 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -118,7 +118,7 @@ public class KafkaMRInput implements IMRInput { } } Text text = (Text) mapperInput; - ByteBuffer buffer = ByteBuffer.wrap(text.getBytes(), 0, text.getLength()).slice(); + ByteBuffer buffer = ByteBuffer.wrap(text.getBytes(), 0, text.getLength()); StreamingMessage streamingMessage = streamingParser.parse(buffer); return streamingMessage.getData().toArray(new String[streamingMessage.getData().size()]); } http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java index d3530f1..148ae25 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java @@ -47,7 +47,6 @@ public final class TimedJsonStreamParser extends StreamingParser { private static final Logger logger = LoggerFactory.getLogger(TimedJsonStreamParser.class); private List<TblColRef> allColumns; - private boolean formatTs = false;//not used private final ObjectMapper mapper = new ObjectMapper(); private String tsColName = "timestamp"; private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class)); @@ -61,9 +60,6 @@ public final class TimedJsonStreamParser extends StreamingParser { String[] parts = prop.split("="); if (parts.length == 2) { switch (parts[0]) { - case "formatTs": - this.formatTs = Boolean.valueOf(parts[1]); - break; case "tsColName": this.tsColName = parts[1]; break; @@ -78,7 +74,7 @@ public final class TimedJsonStreamParser extends StreamingParser { } } - logger.info("TimedJsonStreamParser with formatTs {} tsColName {}", formatTs, tsColName); + logger.info("TimedJsonStreamParser with tsColName {}", tsColName); } @Override @@ -105,7 +101,6 @@ public final class TimedJsonStreamParser extends StreamingParser { } } - logger.info("Streaming Message: " + result.toString()); return new StreamingMessage(result, 0, t, Collections.<String, Object> emptyMap()); } catch (IOException e) { logger.error("error", e);