This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 88205b9 fix bug: real-time stream don't support GMT-N timezone and other non-derived time columns 88205b9 is described below commit 88205b94bf5fb7451e01dd95fc08b35aade04570 Author: liukun4515 <liu...@apache.org> AuthorDate: Sat Feb 6 20:00:28 2021 +0800 fix bug: real-time stream don't support GMT-N timezone and other non-derived time columns --- .../apache/kylin/storage/gtrecord/CubeTupleConverter.java | 12 +++++++----- .../kylin/stream/core/query/StreamingTupleConverter.java | 12 ++++++++---- .../kylin/stream/source/kafka/TimedJsonStreamParser.java | 9 ++++++++- 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java index 9911b51..5595f5a 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java @@ -71,7 +71,7 @@ public class CubeTupleConverter implements ITupleConverter { public final List<Integer> advMeasureIndexInGTValues; private List<ILookupTable> usedLookupTables; - final Set<Integer> timestampColumn = new HashSet<>(); + final Set<Integer> needAdjustTimeColumns = new HashSet<>(); String eventTimezone; boolean autoJustByTimezone; private final long timeZoneOffset; @@ -113,9 +113,11 @@ public class CubeTupleConverter implements ITupleConverter { // pre-calculate dimension index mapping to tuple for (TblColRef dim : selectedDimensions) { tupleIdx[i] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1; - if (TimeDerivedColumnType.isTimeDerivedColumn(dim.getName()) - && !TimeDerivedColumnType.isTimeDerivedColumnAboveDayLevel(dim.getName())) { - timestampColumn.add(tupleIdx[i]); + // all time columns should be adjusted using timezone offset except derived column above day, + // such as DAY_START, WEEK_STAR, YEAR_START. + if (dim.getType().isDateTimeFamily() && + !TimeDerivedColumnType.isTimeDerivedColumnAboveDayLevel(dim.getName())) { + needAdjustTimeColumns.add(tupleIdx[i]); } i++; } @@ -175,7 +177,7 @@ public class CubeTupleConverter implements ITupleConverter { int ti = tupleIdx[i]; if (ti >= 0) { // add offset to return result according to timezone - if (autoJustByTimezone && timestampColumn.contains(ti)) { + if (autoJustByTimezone && needAdjustTimeColumns.contains(ti)) { // For streaming try { String v = toString(gtValues[i]); diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java index a6a3da5..d820174 100755 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java @@ -51,7 +51,7 @@ public class StreamingTupleConverter { final int dimCnt; final int metricsCnt; final MeasureType<?>[] measureTypes; - final Set<Integer> timestampColumn = new HashSet<>(); + final Set<Integer> needAdjustTimeColumns = new HashSet<>(); final List<MeasureType.IAdvMeasureFiller> advMeasureFillers; final List<Integer> advMeasureIndexInGTValues; @@ -76,8 +76,12 @@ public class StreamingTupleConverter { // pre-calculate dimension index mapping to tuple for (TblColRef dim : schema.getDimensions()) { dimTupleIdx[idx] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1; - if (dim.getType().isDateTimeFamily() && TimeDerivedColumnType.isTimeDerivedColumn(dim.getName())) - timestampColumn.add(dimTupleIdx[idx]); + // all time columns should be adjusted using timezone offset except derived column above day, + // such as DAY_START, WEEK_STAR, YEAR_START. + if (dim.getType().isDateTimeFamily() && + !TimeDerivedColumnType.isTimeDerivedColumnAboveDayLevel(dim.getName())) { + needAdjustTimeColumns.add(dimTupleIdx[idx]); + } idx++; } @@ -109,7 +113,7 @@ public class StreamingTupleConverter { for (int i = 0; i < dimCnt; i++) { int ti = dimTupleIdx[i]; if (ti >= 0) { - if (autoTimezone && timestampColumn.contains(ti)) { + if (autoTimezone && needAdjustTimeColumns.contains(ti)) { try { tuple.setDimensionValue(ti, Long.toString(Long.parseLong(dimValues[i]) + TIME_ZONE_OFFSET)); } catch (NumberFormatException nfe) { diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java index 4342715..1bf8046 100644 --- a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java +++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java @@ -149,9 +149,16 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons String columnName = column.getName(); TimeDerivedColumnType columnType = TimeDerivedColumnType.getTimeDerivedColumnType(columnName); if (columnType != null) { - if (timeZoneOffset > 0 && TimeDerivedColumnType.isTimeDerivedColumnAboveDayLevel(columnName)) { + // For example: timezone is GMT+8 + // t = 1612506748660 and it represent 2021-02-05 14:32:28 GMT+8 or 2021-02-05 5:32:28 GMT-0. + // The day_start of 2021-02-05 14:32:28 GMT+8 is 2021-02-05 00:00:00 GMT+8 + // t + timeZoneOffset represent 2021-02-05 14:32:28 GMT-0, and normalized result is 2021-02-05 00:00:00 GMT-0. + // In the query convert, just convert the long to time string using GMT+0 timezone. + if (TimeDerivedColumnType.isTimeDerivedColumnAboveDayLevel(columnName)) { result.add(String.valueOf(columnType.normalize(t + timeZoneOffset))); } else { + // Before conversion in query convert, the long should add timezone offset and then use GMT+0 to + // convert to time string. result.add(String.valueOf(columnType.normalize(t))); } } else {