This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3b8390784d [multistage] Support TIMESTAMP type and date ops functions (#11350) 3b8390784d is described below commit 3b8390784da62cd1da564e74afe93ca8dfef2d21 Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Thu Aug 17 08:52:01 2023 -0700 [multistage] Support TIMESTAMP type and date ops functions (#11350) --- .../common/function/TransformFunctionType.java | 20 ++ .../common/request/context/LiteralContext.java | 6 +- .../org/apache/pinot/common/utils/DataSchema.java | 11 +- .../core/common/datablock/DataBlockBuilder.java | 13 +- .../function/ScalarTransformFunctionWrapper.java | 8 + .../integration/tests/custom/TimestampTest.java | 326 +++++++++++++++++++++ .../query/parser/CalciteRexExpressionParser.java | 5 + .../planner/logical/RelToPlanNodeConverter.java | 13 + .../pinot/query/planner/logical/RexExpression.java | 5 + .../query/planner/logical/RexExpressionUtils.java | 9 + .../pinot/query/planner/plannode/ValueNode.java | 12 + .../query/service/dispatch/QueryDispatcher.java | 9 +- .../pinot/query/runtime/QueryRunnerTestBase.java | 2 + 13 files changed, 427 insertions(+), 12 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java index f741ff223e..b27c6cd53b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java @@ -39,6 +39,11 @@ import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DateTimeFormatSpec; +/** + * The {@code TransformFunctionType} enum represents all the transform functions supported by Calcite SQL parser in + * v2 engine. + * TODO: Add support for scalar functions auto registration. + */ public enum TransformFunctionType { // arithmetic functions for single-valued columns ADD("add", "plus"), @@ -124,6 +129,21 @@ public enum TransformFunctionType { SqlTypeFamily.CHARACTER), ordinal -> ordinal > 1)), + FROMDATETIME("fromDateTime", ReturnTypes.TIMESTAMP_NULLABLE, + OperandTypes.family(ImmutableList.of(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER), + ordinal -> ordinal > 1)), + + TODATETIME("toDateTime", ReturnTypes.VARCHAR_2000_NULLABLE, + OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER), + ordinal -> ordinal > 1)), + + TIMESTAMPADD("timestampAdd", ReturnTypes.TIMESTAMP_NULLABLE, + OperandTypes.family(ImmutableList.of(SqlTypeFamily.CHARACTER, SqlTypeFamily.NUMERIC, SqlTypeFamily.ANY)), + "dateAdd"), + + TIMESTAMPDIFF("timestampDiff", ReturnTypes.BIGINT_NULLABLE, + OperandTypes.family(ImmutableList.of(SqlTypeFamily.CHARACTER, SqlTypeFamily.ANY, SqlTypeFamily.ANY)), "dateDiff"), + YEAR("year"), YEAR_OF_WEEK("yearOfWeek", "yow"), QUARTER("quarter"), diff --git a/pinot-common/src/main/java/org/apache/pinot/common/request/context/LiteralContext.java b/pinot-common/src/main/java/org/apache/pinot/common/request/context/LiteralContext.java index 3436e826f3..0fa430a0c7 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/request/context/LiteralContext.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/request/context/LiteralContext.java @@ -122,14 +122,14 @@ public class LiteralContext { Pair<FieldSpec.DataType, Object> typeAndValue = inferLiteralDataTypeAndValue(literal.getFieldValue().toString()); _type = typeAndValue.getLeft(); - _value = typeAndValue.getRight(); if (_type == FieldSpec.DataType.BIG_DECIMAL) { - _bigDecimalValue = (BigDecimal) _value; + _bigDecimalValue = (BigDecimal) typeAndValue.getRight(); } else if (_type == FieldSpec.DataType.TIMESTAMP) { - _bigDecimalValue = PinotDataType.TIMESTAMP.toBigDecimal(Timestamp.valueOf(_value.toString())); + _bigDecimalValue = PinotDataType.TIMESTAMP.toBigDecimal(typeAndValue.getRight()); } else { _bigDecimalValue = BigDecimal.ZERO; } + _value = literal.getFieldValue().toString(); break; case NULL_VALUE: _type = FieldSpec.DataType.UNKNOWN; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java index 282a3d7416..80c51fb846 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java @@ -353,7 +353,10 @@ public class DataSchema { case BOOLEAN: return ((Number) value).intValue() == 1; case TIMESTAMP: - return new Timestamp((long) value); + if (value instanceof Timestamp) { + return (Timestamp) value; + } + return new Timestamp(((Number) value).longValue()); case STRING: case JSON: return value.toString(); @@ -416,8 +419,14 @@ public class DataSchema { case BIG_DECIMAL: return (BigDecimal) value; case BOOLEAN: + if (value instanceof Boolean) { + return (boolean) value; + } return ((Number) value).intValue() == 1; case TIMESTAMP: + if (value instanceof Timestamp) { + return value.toString(); + } return new Timestamp((long) value).toString(); case STRING: case JSON: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java index 8b486b4012..8bce975c70 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java @@ -148,10 +148,19 @@ public class DataBlockBuilder { setColumn(rowBuilder, byteBuffer, (BigDecimal) value); break; case BOOLEAN: - byteBuffer.putInt(((Boolean) value) ? 1 : 0); + if (value instanceof Boolean) { + byteBuffer.putInt(((Boolean) value) ? 1 : 0); + } else { + byteBuffer.putInt(((Number) value).intValue() > 0 ? 1 : 0); + } break; case TIMESTAMP: - byteBuffer.putLong(((Timestamp) value).getTime()); + // Certain non strong typed functions in v2 might return long value instead of Timestamp. + if (value instanceof Long) { + byteBuffer.putLong((long) value); + } else { + byteBuffer.putLong(((Timestamp) value).getTime()); + } break; case STRING: setColumn(rowBuilder, byteBuffer, (String) value); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java index 3a2c35b90a..09f3c27421 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java @@ -117,10 +117,18 @@ public class ScalarTransformFunctionWrapper extends BaseTransformFunction { parameterTypes[i].convert(literalTransformFunction.getDoubleLiteral(), PinotDataType.DOUBLE); break; case BIG_DECIMAL: + if (parameterTypes[i] == PinotDataType.STRING) { + _scalarArguments[i] = literalTransformFunction.getStringLiteral(); + break; + } _scalarArguments[i] = parameterTypes[i].convert(literalTransformFunction.getBigDecimalLiteral(), PinotDataType.BIG_DECIMAL); break; case TIMESTAMP: + if (parameterTypes[i] == PinotDataType.STRING) { + _scalarArguments[i] = literalTransformFunction.getStringLiteral(); + break; + } _scalarArguments[i] = parameterTypes[i].convert(literalTransformFunction.getLongLiteral(), PinotDataType.TIMESTAMP); break; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java new file mode 100644 index 0000000000..c9995e3785 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.custom; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableList; +import java.io.File; +import java.sql.Timestamp; +import java.util.TimeZone; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.pinot.common.function.scalar.DateTimeFunctions; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +@Test(suiteName = "CustomClusterIntegrationTest") +public class TimestampTest extends CustomDataQueryClusterIntegrationTest { + + private static final String DEFAULT_TABLE_NAME = "TimestampTest"; + private static final String TIMESTAMP_1 = "ts1"; + private static final String TIMESTAMP_2 = "ts2"; + private static final String LONG_1 = "long1"; + private static final String LONG_2 = "long2"; + + private static final TimeZone DEFAULT_TIME_ZONE = TimeZone.getDefault(); + + @Override + protected long getCountStarResult() { + return 1000; + } + + @BeforeClass + public void setUpTimeZone() { + TimeZone.setDefault(TimeZone.getTimeZone("GMT")); + } + + @AfterClass + public void removeTimeZone() { + TimeZone.setDefault(DEFAULT_TIME_ZONE); + } + + @Test(dataProvider = "useBothQueryEngines") + public void testSelectQueries(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = String.format("SELECT ts1, ts2, long1,long2 FROM %s LIMIT %d", getTableName(), getCountStarResult()); + JsonNode jsonNode = postQuery(query); + long expectedTs1 = DateTimeFunctions.fromDateTime("2019-01-01 00:00:00", "yyyy-MM-dd HH:mm:ss"); + long expectedTs2 = DateTimeFunctions.fromDateTime("2019-01-01 12:00:00", "yyyy-MM-dd HH:mm:ss"); + + for (int i = 0; i < getCountStarResult(); i++) { + String ts1 = jsonNode.get("resultTable").get("rows").get(i).get(0).asText(); + String ts2 = jsonNode.get("resultTable").get("rows").get(i).get(1).asText(); + long long1 = jsonNode.get("resultTable").get("rows").get(i).get(2).asLong(); + long long2 = jsonNode.get("resultTable").get("rows").get(i).get(3).asLong(); + assertEquals(ts1, new Timestamp(expectedTs1).toString()); + assertEquals(ts2, new Timestamp(expectedTs2).toString()); + assertEquals(long1, expectedTs1); + assertEquals(long2, expectedTs2); + expectedTs1 += 86400000; + expectedTs2 += 86400000; + } + } + + @Test(dataProvider = "useBothQueryEngines") + public void testSelectWithCastQueries(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = String.format("\n" + + "SELECT CAST(DATETRUNC('DAY', CAST(FROMDATETIME(TODATETIME(FROMDATETIME(CAST(CAST(ts1 AS TIMESTAMP) AS " + + "VARCHAR), 'yyyy-MM-dd HH:mm:ss.S'), 'yyyy-MM-dd'), 'yyyy-MM-dd') AS TIMESTAMP), 'MILLISECONDS') AS " + + "TIMESTAMP) AS tdy_Calculation_2683863928708153344_ok\n" + + "FROM %s\n" + + "GROUP BY tdy_Calculation_2683863928708153344_ok\n" + + "ORDER BY tdy_Calculation_2683863928708153344_ok ASC\n" + + "LIMIT %d", getTableName(), getCountStarResult()); + JsonNode jsonNode = postQuery(query); + long expectedTs1 = DateTimeFunctions.fromDateTime("2019-01-01 00:00:00", "yyyy-MM-dd HH:mm:ss"); + for (int i = 0; i < getCountStarResult(); i++) { + String ts1 = jsonNode.get("resultTable").get("rows").get(i).get(0).asText(); + assertEquals(ts1, new Timestamp(expectedTs1).toString()); + expectedTs1 += 86400000; + } + } + + @Test(dataProvider = "useBothQueryEngines") + public void testSelectWithCastAndFilterQueries(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = String.format("\n" + + "SELECT CAST(DATETRUNC('DAY', CAST(FROMDATETIME(TODATETIME(FROMDATETIME(CAST(CAST(ts1 AS TIMESTAMP) AS " + + "VARCHAR), 'yyyy-MM-dd HH:mm:ss.S'), 'yyyy-MM-dd'), 'yyyy-MM-dd') AS TIMESTAMP), 'MILLISECONDS') AS " + + "TIMESTAMP) AS tdy_Calculation_2683863928708153344_ok\n" + + "FROM %s\n" + + "WHERE CAST(DATETRUNC('DAY', CAST(FROMDATETIME(TODATETIME(FROMDATETIME(CAST(CAST(ts1 AS TIMESTAMP) AS " + + "VARCHAR), 'yyyy-MM-dd HH:mm:ss.S'), 'yyyy-MM-dd'), 'yyyy-MM-dd') AS TIMESTAMP), 'MILLISECONDS') AS " + + "TIMESTAMP) = FROMDATETIME( '2019-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss')\n", getTableName()); + JsonNode jsonNode = postQuery(query); + assertEquals(jsonNode.get("resultTable").get("rows").size(), 1); + assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asText(), "2019-01-01 00:00:00.0"); + } + + @Test(dataProvider = "useBothQueryEngines") + public void testTimeExtractFunction(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = String.format("\n" + + "SELECT HOUR(ts1), HOUR(ts2),\n" + + "MINUTE(ts1), MINUTE(ts2),\n" + + "SECOND(ts1), SECOND(ts2),\n" + + "MILLISECOND(ts1), MILLISECOND(ts2),\n" + + "YEAR(ts1), YEAR(ts2),\n" + + "YEAR_OF_WEEK(ts1), YEAR_OF_WEEK(ts2),\n" + + "MONTH_OF_YEAR(ts1), MONTH_OF_YEAR(ts2),\n" + + "WEEK_OF_YEAR(ts1), WEEK_OF_YEAR(ts2),\n" + + "DAY_OF_YEAR(ts1), DAY_OF_YEAR(ts2),\n" + + "DAY_OF_MONTH(ts1), DAY_OF_MONTH(ts2),\n" + + "DAY_OF_WEEK(ts1), DAY_OF_WEEK(ts2)\n" + + "FROM %s\n" + + "LIMIT %d\n", getTableName(), getCountStarResult()); + JsonNode jsonNode = postQuery(query); + for (int i = 0; i < getCountStarResult(); i++) { + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(0).asInt(), 0); + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(1).asInt(), 12); + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(2).asInt(), 0); + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(3).asInt(), 0); + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(4).asInt(), 0); + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(5).asInt(), 0); + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(6).asInt(), 0); + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(7).asInt(), 0); + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(8).asInt(), + jsonNode.get("resultTable").get("rows").get(i).get(9).asInt()); + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(10).asInt(), + jsonNode.get("resultTable").get("rows").get(i).get(11).asInt()); + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(12).asInt(), + jsonNode.get("resultTable").get("rows").get(i).get(13).asInt()); + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(14).asInt(), + jsonNode.get("resultTable").get("rows").get(i).get(15).asInt()); + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(16).asInt(), + jsonNode.get("resultTable").get("rows").get(i).get(17).asInt()); + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(18).asInt(), + jsonNode.get("resultTable").get("rows").get(i).get(19).asInt()); + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(20).asInt(), + jsonNode.get("resultTable").get("rows").get(i).get(21).asInt()); + } + } + + @Test(dataProvider = "useBothQueryEngines") + public void testTimestampDiffQueries(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = String.format("\n" + + "SELECT TIMESTAMPDIFF(second, ts1, ts2)\n" + + "FROM %s\n" + + "LIMIT %d\n", getTableName(), getCountStarResult()); + JsonNode jsonNode = postQuery(query); + for (int i = 0; i < getCountStarResult(); i++) { + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(0).asLong(), 43200); + } + + query = String.format("\n" + + "SELECT TIMESTAMPDIFF(minute, ts1, ts2)\n" + + "FROM %s\n" + + "LIMIT %d\n", getTableName(), getCountStarResult()); + jsonNode = postQuery(query); + for (int i = 0; i < getCountStarResult(); i++) { + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(0).asLong(), 720); + } + + query = String.format("\n" + + "SELECT TIMESTAMPDIFF(hour, ts1, ts2)\n" + + "FROM %s\n" + + "LIMIT %d\n", getTableName(), getCountStarResult()); + jsonNode = postQuery(query); + for (int i = 0; i < getCountStarResult(); i++) { + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(0).asLong(), 12); + } + } + + @Test(dataProvider = "useBothQueryEngines") + public void testTimestampAddQueries(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = String.format("\n" + + "SELECT TIMESTAMPADD(MINUTE, 720, ts1), ts2\n" + + "FROM %s\n" + + "LIMIT %d\n", getTableName(), getCountStarResult()); + JsonNode jsonNode = postQuery(query); + for (int i = 0; i < getCountStarResult(); i++) { + if (useMultiStageQueryEngine) { + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(0).asText(), + jsonNode.get("resultTable").get("rows").get(i).get(1).textValue()); + } else { + assertEquals(new Timestamp(jsonNode.get("resultTable").get("rows").get(i).get(0).longValue()).toString(), + jsonNode.get("resultTable").get("rows").get(i).get(1).textValue()); + } + } + + query = String.format("\n" + + "SELECT TIMESTAMPADD(SECOND, 43200, ts1), ts2\n" + + "FROM %s\n" + + "LIMIT %d\n", getTableName(), getCountStarResult()); + jsonNode = postQuery(query); + for (int i = 0; i < getCountStarResult(); i++) { + if (useMultiStageQueryEngine) { + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(0).asText(), + jsonNode.get("resultTable").get("rows").get(i).get(1).textValue()); + } else { + assertEquals(new Timestamp(jsonNode.get("resultTable").get("rows").get(i).get(0).longValue()).toString(), + jsonNode.get("resultTable").get("rows").get(i).get(1).textValue()); + } + } + + query = String.format("\n" + + "SELECT TIMESTAMPADD(HOUR, 12, ts1), ts2\n" + + "FROM %s\n" + + "LIMIT %d\n", getTableName(), getCountStarResult()); + jsonNode = postQuery(query); + for (int i = 0; i < getCountStarResult(); i++) { + if (useMultiStageQueryEngine) { + assertEquals(jsonNode.get("resultTable").get("rows").get(i).get(0).asText(), + jsonNode.get("resultTable").get("rows").get(i).get(1).textValue()); + } else { + assertEquals(new Timestamp(jsonNode.get("resultTable").get("rows").get(i).get(0).longValue()).toString(), + jsonNode.get("resultTable").get("rows").get(i).get(1).textValue()); + } + } + } + + @Test(dataProvider = "useBothQueryEngines") + public void testToDateTimeQueries(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = String.format("\n" + + "SELECT " + + ( + useMultiStageQueryEngine + ? "TODATETIME(CAST(MIN(ts1) AS BIGINT), 'yyyy-MM-dd HH:mm:ss'),\n" + : "TODATETIME(MIN(ts1), 'yyyy-MM-dd HH:mm:ss'),\n") + + ( + useMultiStageQueryEngine + ? "TODATETIME(CAST(MIN(ts2) AS BIGINT), 'yyyy-MM-dd HH:mm:ss')\n" + : "TODATETIME(MIN(ts2), 'yyyy-MM-dd HH:mm:ss')\n") + + "FROM %s\n", getTableName()); + JsonNode jsonNode = postQuery(query); + assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asText(), "2019-01-01 00:00:00"); + assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(1).textValue(), "2019-01-01 12:00:00"); + } + + @Override + public String getTableName() { + return DEFAULT_TABLE_NAME; + } + + @Override + public Schema createSchema() { + return new Schema.SchemaBuilder().setSchemaName(getTableName()) + .addSingleValueDimension(TIMESTAMP_1, FieldSpec.DataType.TIMESTAMP) + .addSingleValueDimension(TIMESTAMP_2, FieldSpec.DataType.TIMESTAMP) + .addSingleValueDimension(LONG_1, FieldSpec.DataType.LONG) + .addSingleValueDimension(LONG_2, FieldSpec.DataType.LONG) + .build(); + } + + @Override + public File createAvroFile() + throws Exception { + // create avro schema + org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); + avroSchema.setFields(ImmutableList.of( + new org.apache.avro.Schema.Field(TIMESTAMP_1, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), + null, null), + new org.apache.avro.Schema.Field(TIMESTAMP_2, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), + null, null), + new org.apache.avro.Schema.Field(LONG_1, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), null, + null), + new org.apache.avro.Schema.Field(LONG_2, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), null, + null) + )); + + // create avro file + File avroFile = new File(_tempDir, "data.avro"); + try (DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) { + fileWriter.create(avroSchema, avroFile); + long ts1 = DateTimeFunctions.fromDateTime("2019-01-01 00:00:00", "yyyy-MM-dd HH:mm:ss"); + long ts2 = DateTimeFunctions.fromDateTime("2019-01-01 12:00:00", "yyyy-MM-dd HH:mm:ss"); + + for (int i = 0; i < getCountStarResult(); i++) { + // create avro record + GenericData.Record record = new GenericData.Record(avroSchema); + record.put(TIMESTAMP_1, ts1); + record.put(TIMESTAMP_2, ts2); + record.put(LONG_1, ts1); + record.put(LONG_2, ts2); + // add avro record to file + fileWriter.append(record); + ts1 += 86400000; + ts2 += 86400000; + } + } + return avroFile; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java index 4c31fc86f4..7e4fe191dd 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java @@ -20,6 +20,7 @@ package org.apache.pinot.query.parser; import java.util.ArrayList; import java.util.Arrays; +import java.util.GregorianCalendar; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -35,6 +36,7 @@ import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.planner.plannode.SortNode; import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.sql.FilterKind; import org.apache.pinot.sql.parsers.SqlCompilationException; import org.slf4j.Logger; @@ -194,6 +196,9 @@ public class CalciteRexExpressionParser { private static Expression rexLiteralToExpression(RexExpression.Literal rexLiteral) { // TODO: currently literals are encoded as strings for V1, remove this and use directly literal type when it // supports strong-type in V1. + if (rexLiteral.getDataType() == FieldSpec.DataType.TIMESTAMP) { + return RequestUtils.getLiteralExpression(((GregorianCalendar) rexLiteral.getValue()).getTimeInMillis()); + } return RequestUtils.getLiteralExpression(rexLiteral.getValue()); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java index b0b7545677..7beaab8e42 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java @@ -217,6 +217,19 @@ public final class RelToPlanNodeConverter { case INTEGER: return isArray ? DataSchema.ColumnDataType.INT_ARRAY : DataSchema.ColumnDataType.INT; case BIGINT: + case INTERVAL_DAY: + case INTERVAL_DAY_HOUR: + case INTERVAL_DAY_MINUTE: + case INTERVAL_DAY_SECOND: + case INTERVAL_HOUR: + case INTERVAL_HOUR_MINUTE: + case INTERVAL_HOUR_SECOND: + case INTERVAL_MINUTE: + case INTERVAL_MINUTE_SECOND: + case INTERVAL_SECOND: + case INTERVAL_MONTH: + case INTERVAL_YEAR: + case INTERVAL_YEAR_MONTH: return isArray ? DataSchema.ColumnDataType.LONG_ARRAY : DataSchema.ColumnDataType.LONG; case DECIMAL: return resolveDecimal(relDataType, isArray); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java index ab78924548..bacde0d30b 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java @@ -20,6 +20,7 @@ package org.apache.pinot.query.planner.logical; import java.math.BigDecimal; import java.util.ArrayList; +import java.util.GregorianCalendar; import java.util.List; import java.util.stream.Collectors; import org.apache.calcite.rel.core.AggregateCall; @@ -62,6 +63,8 @@ public interface RexExpression { switch (rexCall.getKind()) { case CAST: return RexExpressionUtils.handleCast(rexCall); + case REINTERPRET: + return RexExpressionUtils.handleReinterpret(rexCall); case SEARCH: return RexExpressionUtils.handleSearch(rexCall); case CASE: @@ -99,6 +102,8 @@ public interface RexExpression { return ((BigDecimal) value).doubleValue(); case STRING: return ((NlsString) value).getValue(); + case TIMESTAMP: + return ((GregorianCalendar) value).getTimeInMillis(); default: return value; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java index 242291c280..d3bf8d5a3b 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java @@ -62,6 +62,15 @@ public class RexExpressionUtils { "CAST", operands); } + /** + * Reinterpret is a pass-through function that does not change the type of the input. + */ + static RexExpression handleReinterpret(RexCall rexCall) { + List<RexNode> operands = rexCall.getOperands(); + Preconditions.checkState(operands.size() == 1, "REINTERPRET takes only 1 argument"); + return RexExpression.toRexExpression(operands.get(0)); + } + // TODO: Add support for range filter expressions (e.g. a > 0 and a < 30) static RexExpression handleSearch(RexCall rexCall) { List<RexNode> operands = rexCall.getOperands(); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ValueNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ValueNode.java index 248af5d8a2..acb00487e8 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ValueNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ValueNode.java @@ -20,11 +20,14 @@ package org.apache.pinot.query.planner.plannode; import com.google.common.collect.ImmutableList; import java.util.ArrayList; +import java.util.GregorianCalendar; import java.util.List; import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.planner.serde.ProtoProperties; +import org.apache.pinot.spi.data.FieldSpec; public class ValueNode extends AbstractPlanNode { @@ -42,6 +45,15 @@ public class ValueNode extends AbstractPlanNode { for (List<RexLiteral> literalTuple : literalTuples) { List<RexExpression> literalRow = new ArrayList<>(); for (RexLiteral literal : literalTuple) { + if (literal == null) { + literalRow.add(null); + continue; + } + if (literal.getTypeName() == SqlTypeName.TIMESTAMP) { + GregorianCalendar tsLiteral = (GregorianCalendar) literal.getValue(); + literalRow.add(new RexExpression.Literal(FieldSpec.DataType.TIMESTAMP, tsLiteral.getTimeInMillis())); + continue; + } literalRow.add(RexExpression.toRexExpression(literal)); } _literalRows.add(literalRow); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index 9eff0bc6c4..7b0ad3e3cb 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -65,7 +65,6 @@ import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult; import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils; import org.apache.pinot.query.service.QueryConfig; import org.apache.pinot.spi.trace.RequestContext; -import org.apache.pinot.spi.utils.ByteArray; import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -251,6 +250,7 @@ public class QueryDispatcher { DataBlock dataBlock = transferableBlock.getDataBlock(); int numColumns = resultSchema.getColumnNames().length; int numRows = dataBlock.getNumberOfRows(); + DataSchema.ColumnDataType[] columnDataTypes = resultSchema.getColumnDataTypes(); List<Object[]> rows = new ArrayList<>(dataBlock.getNumberOfRows()); if (numRows > 0) { RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns]; @@ -268,11 +268,8 @@ public class QueryDispatcher { row[colId++] = null; } else { int colRef = field.left; - if (rawRow[colRef] instanceof ByteArray) { - row[colId++] = ((ByteArray) rawRow[colRef]).toHexString(); - } else { - row[colId++] = rawRow[colRef]; - } + DataSchema.ColumnDataType dataType = columnDataTypes[colId]; + row[colId++] = dataType.convertAndFormat(rawRow[colRef]); } } rows.add(row); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java index c23e8f29fc..52f9c6b0b9 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java @@ -228,6 +228,8 @@ public abstract class QueryRunnerTestBase extends QueryTestSet { } else if (l instanceof String) { if (r instanceof byte[]) { return ((String) l).compareTo(BytesUtils.toHexString((byte[]) r)); + } else if (r instanceof Timestamp) { + return ((String) l).compareTo((r).toString()); } return ((String) l).compareTo((String) r); } else if (l instanceof Boolean) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org