This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 0c0281dbaf [timeseries] Introducing integration tests for time series engine (#15960) 0c0281dbaf is described below commit 0c0281dbafb51b2ced732ecfc499d2b82ee2c56e Author: Shaurya Chaturvedi <shauryach...@gmail.com> AuthorDate: Mon Jun 2 21:26:24 2025 -0700 [timeseries] Introducing integration tests for time series engine (#15960) --- .../tests/ClusterIntegrationTestUtils.java | 4 + .../pinot/integration/tests/ClusterTest.java | 27 ++ pinot-integration-tests/pom.xml | 8 + .../tests/TimeSeriesIntegrationTest.java | 305 +++++++++++++++++++++ pom.xml | 5 + 5 files changed, 349 insertions(+) diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java index e53c2ce300..12dc965d62 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java @@ -880,6 +880,10 @@ public class ClusterIntegrationTestUtils { return useMultiStageQueryEngine ? brokerBaseApiUrl + "/query" : brokerBaseApiUrl + "/query/sql"; } + public static String getTimeSeriesQueryApiUrl(String timeSeriesBaseApiUrl) { + return timeSeriesBaseApiUrl + "/timeseries/api/v1/query_range"; + } + public static String getBrokerQueryCancelUrl(String brokerBaseApiUrl, String brokerId, String clientQueryId) { return brokerBaseApiUrl + "/clientQuery/" + brokerId + "/" + clientQueryId; } diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java index 6f8e86be0a..e1fc48bdb0 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java @@ -48,6 +48,7 @@ import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.NameValuePair; import org.apache.hc.core5.http.message.BasicHeader; import org.apache.hc.core5.http.message.BasicNameValuePair; +import org.apache.http.client.utils.URIBuilder; import org.apache.pinot.broker.broker.helix.BaseBrokerStarter; import org.apache.pinot.broker.broker.helix.HelixBrokerStarter; import org.apache.pinot.client.ConnectionFactory; @@ -87,6 +88,7 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Listeners; import static org.apache.pinot.integration.tests.ClusterIntegrationTestUtils.getBrokerQueryApiUrl; +import static org.apache.pinot.integration.tests.ClusterIntegrationTestUtils.getTimeSeriesQueryApiUrl; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -549,6 +551,22 @@ public abstract class ClusterTest extends ControllerTest { return queryBrokerHttpEndpoint(query); } + /** + * Queries the broker's timeseries query endpoint (/timeseries/api/v1/query_range). + * This is used for testing timeseries queries. + */ + public JsonNode getTimeseriesQuery(String query, long startTime, long endTime) { + try { + Map<String, String> queryParams = Map.of("language", "m3ql", "query", query, "start", + String.valueOf(startTime), "end", String.valueOf(endTime)); + String url = buildQueryUrl(getTimeSeriesQueryApiUrl(getBrokerBaseApiUrl()), queryParams); + JsonNode responseJsonNode = JsonUtils.stringToJsonNode(sendGetRequest(url, Map.of())); + return sanitizeResponse(responseJsonNode); + } catch (Exception e) { + throw new RuntimeException("Failed to get timeseries query: " + query, e); + } + } + /** * Queries the broker's query endpoint (/query/sql) */ @@ -838,4 +856,13 @@ public abstract class ClusterTest extends ControllerTest { throw new SkipException("Some queries fail when using multi-stage engine"); } } + + private static String buildQueryUrl(String baseUrl, Map<String, String> params) throws Exception { + URIBuilder builder = new URIBuilder(baseUrl); + for (Map.Entry<String, String> entry : params.entrySet()) { + builder.addParameter(entry.getKey(), entry.getValue()); + } + URI uri = builder.build(); + return uri.toString(); + } } diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml index 4ca5268cba..8e76f3ea6d 100644 --- a/pinot-integration-tests/pom.xml +++ b/pinot-integration-tests/pom.xml @@ -177,6 +177,14 @@ <groupId>org.apache.pinot</groupId> <artifactId>pinot-segment-writer-file-based</artifactId> </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-timeseries-m3ql</artifactId> + </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-timeseries-planner</artifactId> + </dependency> <dependency> <groupId>org.apache.pinot</groupId> diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java new file mode 100644 index 0000000000..c6e21bc486 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java @@ -0,0 +1,305 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableList; +import java.io.File; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration; +import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory; +import org.apache.pinot.util.TestUtils; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + +public class TimeSeriesIntegrationTest extends BaseClusterIntegrationTest { + + protected static final Logger LOGGER = LoggerFactory.getLogger(TimeSeriesIntegrationTest.class); + private static final String TS_COLUMN = "ts"; + private static final String DAYS_SINCE_FIRST_TRIP_COLUMN = "daysSinceFirstTrip"; + private static final String DEVICE_OS_COLUMN = "deviceOs"; + private static final String REFERRAL_COLUMN = "referralCol"; + private static final String TOTAL_TRIPS_COLUMN = "totalTrips"; + + private static final String[] DEVICES = new String[]{"windows", "android", "ios"}; + private static final long NUMBER_OF_ROWS = 1000L; + private static final long VIEWS_MIN_VALUE = 20L; + private static final long VIEWS_MAX_VALUE = 30L; + private static final long DATA_START_TIME_SEC = 1747008000L; + private static final long QUERY_START_TIME_SEC = DATA_START_TIME_SEC - 60; // 1 minute before start time + private static final long QUERY_END_TIME_SEC = DATA_START_TIME_SEC + 300; // 5 minutes after start time + + @Test + public void testGroupByMax() { + String query = String.format( + "fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}" + + " | max{%s} | transformNull{0} | keepLastValue{}", + TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN + ); + runGroupedTimeSeriesQuery(query, 3, (ts, val, row) -> + assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MAX_VALUE) + ); + } + + @Test + public void testGroupByMin() { + String query = String.format( + "fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}" + + " | min{%s} | transformNull{0} | keepLastValue{}", + TS_COLUMN, TOTAL_TRIPS_COLUMN, DAYS_SINCE_FIRST_TRIP_COLUMN + ); + runGroupedTimeSeriesQuery(query, 5, (ts, val, row) -> + assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : VIEWS_MIN_VALUE) + ); + } + + @Test + public void testGroupBySum() { + String query = String.format( + "fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}" + + " | sum{%s} | transformNull{0} | keepLastValue{}", + TS_COLUMN, TOTAL_TRIPS_COLUMN, REFERRAL_COLUMN + ); + runGroupedTimeSeriesQuery(query, 2, (ts, val, row) -> { + String referral = row.get("metric").get(REFERRAL_COLUMN).asText(); + long expected = ts <= DATA_START_TIME_SEC ? 0L + // If referral is true, views are MAX_VALUE, otherwise 20 + : "1".equals(referral) ? 30 * VIEWS_MIN_VALUE : 30 * VIEWS_MAX_VALUE; + assertEquals(val, expected); + }); + } + + @Test + public void testGroupByTwoColumnsAndExpressionValue() { + String query = String.format( + "fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s*10\"}" + + " | max{%s,%s} | transformNull{0} | keepLastValue{}", + TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN, DAYS_SINCE_FIRST_TRIP_COLUMN + ); + runGroupedTimeSeriesQuery(query, 15, (ts, val, row) -> { + long expected = ts <= DATA_START_TIME_SEC ? 0L : 10 * VIEWS_MAX_VALUE; + assertEquals(val, expected); + }); + } + + @Test + public void testGroupByThreeColumnsAndConstantValue() { + String query = String.format( + "fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"1\"}" + + " | sum{%s,%s,%s} | transformNull{0} | keepLastValue{}", + TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN, DAYS_SINCE_FIRST_TRIP_COLUMN, REFERRAL_COLUMN + ); + runGroupedTimeSeriesQuery(query, 30, (ts, val, row) -> { + // Since there are 30 groups, each minute will have 2 rows. + long expected = ts <= DATA_START_TIME_SEC ? 0L : 2L; + assertEquals(val, expected); + }); + } + + @Test + public void testGroupByWithFilter() { + String query = String.format( + "fetch{table=\"mytable_OFFLINE\",filter=\"%s='windows'\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"1\"}" + + " | sum{%s,%s,%s} | transformNull{0} | keepLastValue{}", + DEVICE_OS_COLUMN, TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN, DAYS_SINCE_FIRST_TRIP_COLUMN, REFERRAL_COLUMN + ); + runGroupedTimeSeriesQuery(query, 10, (ts, val, row) -> + assertEquals(val, ts <= DATA_START_TIME_SEC ? 0L : 2L) + ); + } + + @Test + public void testTransformNull() { + String query = String.format( + "fetch{table=\"mytable_OFFLINE\",filter=\"\",ts_column=\"%s\",ts_unit=\"MILLISECONDS\",value=\"%s\"}" + + " | max{%s} | transformNull{42} | keepLastValue{}", + TS_COLUMN, TOTAL_TRIPS_COLUMN, DEVICE_OS_COLUMN + ); + runGroupedTimeSeriesQuery(query, 3, (ts, val, row) -> + assertEquals(val, ts <= DATA_START_TIME_SEC ? 42L : VIEWS_MAX_VALUE) + ); + } + + private void runGroupedTimeSeriesQuery(String query, int expectedGroups, TimeSeriesValidator validator) { + JsonNode result = getTimeseriesQuery(query, QUERY_START_TIME_SEC, QUERY_END_TIME_SEC); + System.out.println(result); + assertEquals(result.get("status").asText(), "success"); + + JsonNode series = result.get("data").get("result"); + assertEquals(series.size(), expectedGroups); + + for (JsonNode row : series) { + for (JsonNode point : row.get("values")) { + long ts = point.get(0).asLong(); + long val = point.get(1).asLong(); + validator.validate(ts, val, row); + } + } + } + + @Override + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + addTimeSeriesConfigurations(brokerConf); + } + + @Override + protected void overrideServerConf(PinotConfiguration serverConf) { + addTimeSeriesConfigurations(serverConf); + } + + @Override + public String getTableName() { + return DEFAULT_TABLE_NAME; + } + + @Override + public long getCountStarResult() { + return NUMBER_OF_ROWS; + } + + @Override + public Schema createSchema() { + return new Schema.SchemaBuilder().setSchemaName(getTableName()) + .addSingleValueDimension(TS_COLUMN, FieldSpec.DataType.LONG) + .addSingleValueDimension(DAYS_SINCE_FIRST_TRIP_COLUMN, FieldSpec.DataType.LONG) + .addSingleValueDimension(DEVICE_OS_COLUMN, FieldSpec.DataType.STRING) + .addSingleValueDimension(REFERRAL_COLUMN, FieldSpec.DataType.BOOLEAN) + .addSingleValueDimension(TOTAL_TRIPS_COLUMN, FieldSpec.DataType.LONG) + .build(); + } + + @BeforeClass + public void setUp() + throws Exception { + LOGGER.info("Setting up integration test class: {}", getClass().getSimpleName()); + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServer(); + + if (_controllerRequestURLBuilder == null) { + _controllerRequestURLBuilder = ControllerRequestURLBuilder.baseUrl("http://localhost:" + getControllerPort()); + } + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + // create & upload schema AND table config + Schema schema = createSchema(); + addSchema(schema); + + File avroFile = createAvroFile(); + // create offline table + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + // create & upload segments + ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile, tableConfig, schema, 0, _segmentDir, _tarDir); + uploadSegments(getTableName(), _tarDir); + + waitForAllDocsLoaded(60_000); + LOGGER.info("Finished setting up integration test class: {}", getClass().getSimpleName()); + } + + @AfterClass + public void tearDown() + throws Exception { + LOGGER.info("Tearing down integration test class: {}", getClass().getSimpleName()); + dropOfflineTable(getTableName()); + FileUtils.deleteDirectory(_tempDir); + + // Shutdown the Pinot cluster + stopServer(); + stopBroker(); + stopController(); + stopZk(); + LOGGER.info("Finished tearing down integration test class: {}", getClass().getSimpleName()); + } + + @Override + public TableConfig createOfflineTableConfig() { + return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).build(); + } + + public File createAvroFile() + throws Exception { + org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); + avroSchema.setFields(ImmutableList.of( + createAvroField(TS_COLUMN, org.apache.avro.Schema.Type.LONG), + createAvroField(DAYS_SINCE_FIRST_TRIP_COLUMN, org.apache.avro.Schema.Type.LONG), + createAvroField(DEVICE_OS_COLUMN, org.apache.avro.Schema.Type.STRING), + createAvroField(REFERRAL_COLUMN, org.apache.avro.Schema.Type.BOOLEAN), + createAvroField(TOTAL_TRIPS_COLUMN, org.apache.avro.Schema.Type.LONG) + )); + + File avroFile = new File(_tempDir, "data.avro"); + try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) { + writer.create(avroSchema, avroFile); + for (int i = 0; i < getCountStarResult(); i++) { + writer.append(getRecord(avroSchema, i)); + } + } + return avroFile; + } + + private org.apache.avro.Schema.Field createAvroField(String name, org.apache.avro.Schema.Type type) { + return new org.apache.avro.Schema.Field(name, org.apache.avro.Schema.create(type), null, null); + } + + private static GenericData.@NotNull Record getRecord(org.apache.avro.Schema avroSchema, int i) { + GenericData.Record record = new GenericData.Record(avroSchema); + // Do not set DATA_START_TIME_SEC for easier assertion of values. + record.put(TS_COLUMN, (DATA_START_TIME_SEC + 1 + i) * 1000L); + record.put(DAYS_SINCE_FIRST_TRIP_COLUMN, i % 5); + record.put(DEVICE_OS_COLUMN, DEVICES[i % DEVICES.length]); + record.put(REFERRAL_COLUMN, (i % 2) == 0); + // Alternate between VIEWS_MIN_VALUE and VIEWS_MAX_VALUE. + record.put(TOTAL_TRIPS_COLUMN, VIEWS_MIN_VALUE + (VIEWS_MAX_VALUE - VIEWS_MIN_VALUE) * (i % 2)); + return record; + } + + private void addTimeSeriesConfigurations(PinotConfiguration conf) { + conf.setProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey(), "m3ql"); + conf.setProperty(PinotTimeSeriesConfiguration.getLogicalPlannerConfigKey("m3ql"), + "org.apache.pinot.tsdb.m3ql.M3TimeSeriesPlanner"); + conf.setProperty(PinotTimeSeriesConfiguration.getSeriesBuilderFactoryConfigKey("m3ql"), + SimpleTimeSeriesBuilderFactory.class.getName()); + } + + @FunctionalInterface + interface TimeSeriesValidator { + void validate(long timestamp, long value, JsonNode row); + } +} diff --git a/pom.xml b/pom.xml index e106ca70f9..c46882a2ea 100644 --- a/pom.xml +++ b/pom.xml @@ -582,6 +582,11 @@ <artifactId>pinot-timeseries-planner</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-timeseries-m3ql</artifactId> + <version>${project.version}</version> + </dependency> <!-- Pinot Plug-in Modules --> <!-- Batch Ingestion --> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org