This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9cf9095c68c Improve CustomDataQueryClusterIntegrationTest to use 2
servers and 2 segments to ensure the behavior of merge is expected. (#17284)
9cf9095c68c is described below
commit 9cf9095c68c709bd675dbff3571a48ba0f741059
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Dec 11 03:20:55 2025 -0800
Improve CustomDataQueryClusterIntegrationTest to use 2 servers and 2
segments to ensure the behavior of merge is expected. (#17284)
1. Increase all CustomDataQueryClusterIntegrationTest test suite to use 2
servers and 2 segments to ensure the query behaviors like intermediate results
serde/ merge/exchange etc are expected.
2. Change SumArrayLong and SumArrayDouble Agg function intermediate result
type to OBJECT so the DataTable serde will work automatically for them.
3. Fix StUnion object merge issue
4. Fix ULL merge issue when aggregation on bytes column which may use
different p, then return ULL object with default p will cause merge failure.
---
.../DistinctCountULLAggregationFunction.java | 26 ++--
.../function/StUnionAggregationFunction.java | 37 +++++-
.../array/SumArrayDoubleAggregationFunction.java | 5 +-
.../array/SumArrayLongAggregationFunction.java | 5 +-
.../function/StUnionAggregationFunctionTest.java | 106 +++++++++++++++
.../tests/CLPEncodingRealtimeIntegrationTest.java | 4 +-
.../OfflineTimestampIndexIntegrationTest.java | 5 +-
.../pinot/integration/tests/custom/ArrayTest.java | 146 +++++++++++----------
.../integration/tests/custom/BytesTypeTest.java | 10 +-
.../integration/tests/custom/CpcSketchTest.java | 15 +--
.../CustomDataQueryClusterIntegrationTest.java | 55 ++++++++
.../tests/custom/FloatingPointDataTypeTest.java | 15 +--
.../integration/tests/custom/GeoSpatialTest.java | 12 +-
.../integration/tests/custom/JsonPathTest.java | 58 ++++----
.../tests/custom/MapFieldTypeRealtimeTest.java | 10 +-
.../integration/tests/custom/MapFieldTypeTest.java | 71 +++++-----
.../integration/tests/custom/MapTypeTest.java | 93 ++++++-------
.../tests/custom/MultiColumnTextIndicesTest.java | 10 +-
.../integration/tests/custom/SumPrecisionTest.java | 19 +--
.../integration/tests/custom/TextIndicesTest.java | 10 +-
.../integration/tests/custom/ThetaSketchTest.java | 22 ++--
.../integration/tests/custom/TimestampTest.java | 17 +--
.../integration/tests/custom/TupleSketchTest.java | 15 +--
.../pinot/integration/tests/custom/ULLTest.java | 34 ++---
.../tests/custom/UnnestIntegrationTest.java | 42 +++---
.../pinot/integration/tests/custom/VectorTest.java | 15 +--
.../integration/tests/custom/WindowFunnelTest.java | 3 +-
.../tests/tpch/generator/TPCHQueryGeneratorV2.java | 78 +++++------
28 files changed, 547 insertions(+), 391 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountULLAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountULLAggregationFunction.java
index 1a036ae5342..edd3fcc41ed 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountULLAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountULLAggregationFunction.java
@@ -22,6 +22,7 @@ import com.dynatrace.hash4j.distinctcount.UltraLogLog;
import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.pinot.common.CustomObject;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
@@ -305,11 +306,12 @@ public class DistinctCountULLAggregationFunction extends
BaseSingleInputAggregat
}
}
+ @Nullable
@Override
public UltraLogLog extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
Object result = aggregationResultHolder.getResult();
if (result == null) {
- return UltraLogLog.create(_p);
+ return null;
}
if (result instanceof DictIdsWrapper) {
@@ -321,11 +323,12 @@ public class DistinctCountULLAggregationFunction extends
BaseSingleInputAggregat
}
}
+ @Nullable
@Override
public UltraLogLog extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
Object result = groupByResultHolder.getResult(groupKey);
if (result == null) {
- return UltraLogLog.create(_p);
+ return null;
}
if (result instanceof DictIdsWrapper) {
@@ -337,13 +340,20 @@ public class DistinctCountULLAggregationFunction extends
BaseSingleInputAggregat
}
}
+ @Nullable
@Override
- public UltraLogLog merge(UltraLogLog intermediateResult1, UltraLogLog
intermediateResult2) {
- int largerP = Math.max(intermediateResult1.getP(),
intermediateResult2.getP());
- UltraLogLog merged = UltraLogLog.create(largerP);
- merged.add(intermediateResult1);
- merged.add(intermediateResult2);
- return merged;
+ public UltraLogLog merge(@Nullable UltraLogLog intermediateResult1,
@Nullable UltraLogLog intermediateResult2) {
+ if (intermediateResult1 == null) {
+ return intermediateResult2;
+ }
+ if (intermediateResult2 == null) {
+ return intermediateResult1;
+ }
+ // UltraLogLog object doesn't support merging a smaller p object into a
larger p object.
+ if (intermediateResult1.getP() > intermediateResult2.getP()) {
+ return intermediateResult2.add(intermediateResult1);
+ }
+ return intermediateResult1.add(intermediateResult2);
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunction.java
index 3864899f819..6632d5a62d3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunction.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.pinot.common.CustomObject;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
@@ -34,6 +35,8 @@ import org.apache.pinot.segment.local.utils.GeometryUtils;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.utils.ByteArray;
import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.util.GeometryCombiner;
+import org.locationtech.jts.operation.union.UnaryUnionOp;
public class StUnionAggregationFunction extends
BaseSingleInputAggregationFunction<Geometry, ByteArray> {
@@ -63,8 +66,7 @@ public class StUnionAggregationFunction extends
BaseSingleInputAggregationFuncti
byte[][] bytesArray = blockValSetMap.get(_expression).getBytesValuesSV();
Geometry geometry = aggregationResultHolder.getResult();
for (int i = 0; i < length; i++) {
- Geometry value = GeometrySerializer.deserialize(bytesArray[i]);
- geometry = geometry == null ? value : geometry.union(value);
+ geometry = union(geometry,
GeometrySerializer.deserialize(bytesArray[i]));
}
aggregationResultHolder.setValue(geometry);
}
@@ -77,7 +79,7 @@ public class StUnionAggregationFunction extends
BaseSingleInputAggregationFuncti
int groupKey = groupKeyArray[i];
Geometry value = GeometrySerializer.deserialize(bytesArray[i]);
Geometry geometry = groupByResultHolder.getResult(groupKey);
- groupByResultHolder.setValueForKey(groupKey, geometry == null ? value :
geometry.union(value));
+ groupByResultHolder.setValueForKey(groupKey, union(geometry, value));
}
}
@@ -89,7 +91,7 @@ public class StUnionAggregationFunction extends
BaseSingleInputAggregationFuncti
Geometry value = GeometrySerializer.deserialize(bytesArray[i]);
for (int groupKey : groupKeysArray[i]) {
Geometry geometry = groupByResultHolder.getResult(groupKey);
- groupByResultHolder.setValueForKey(groupKey, geometry == null ? value
: geometry.union(value));
+ groupByResultHolder.setValueForKey(groupKey, union(geometry, value));
}
}
}
@@ -107,8 +109,8 @@ public class StUnionAggregationFunction extends
BaseSingleInputAggregationFuncti
}
@Override
- public Geometry merge(Geometry intermediateResult1, Geometry
intermediateResult2) {
- return intermediateResult1.union(intermediateResult2);
+ public Geometry merge(@Nullable Geometry intermediateResult1, @Nullable
Geometry intermediateResult2) {
+ return union(intermediateResult1, intermediateResult2);
}
@Override
@@ -136,4 +138,27 @@ public class StUnionAggregationFunction extends
BaseSingleInputAggregationFuncti
public ByteArray extractFinalResult(Geometry geometry) {
return new ByteArray(GeometrySerializer.serialize(geometry));
}
+
+ /**
+ * Returns the union of the supplied geometries.
+ *
+ * <p>When either operand is a {@code GeometryCollection}, {@link
Geometry#union(Geometry)} can produce invalid
+ * topologies or drop components because it expects homogeneous inputs. The
{@link UnaryUnionOp} implementation is
+ * purpose-built for arbitrary collections, so we first combine the
components and delegate to it to ensure a valid
+ * and deterministic result.</p>
+ */
+ @Nullable
+ private static Geometry union(@Nullable Geometry left, @Nullable Geometry
right) {
+ if (left == null) {
+ return right;
+ }
+ if (right == null) {
+ return left;
+ }
+ if (Geometry.TYPENAME_GEOMETRYCOLLECTION.equals(left.getGeometryType())
+ ||
Geometry.TYPENAME_GEOMETRYCOLLECTION.equals(right.getGeometryType())) {
+ return UnaryUnionOp.union(GeometryCombiner.combine(left, right));
+ }
+ return left.union(right);
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/SumArrayDoubleAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/SumArrayDoubleAggregationFunction.java
index df923c92d79..c67ecab1b43 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/SumArrayDoubleAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/SumArrayDoubleAggregationFunction.java
@@ -138,7 +138,10 @@ public class SumArrayDoubleAggregationFunction
@Override
public DataSchema.ColumnDataType getIntermediateResultColumnType() {
- return DataSchema.ColumnDataType.DOUBLE_ARRAY;
+ // AggregationResultsBlock#setIntermediateResult and
AggregationFunctionUtils#getIntermediateResult only serialize
+ // intermediate results whose stored type is
INT/LONG/DOUBLE/STRING/OBJECT, so we need the OBJECT/custom ser-de path
+ // for DoubleArrayList payloads.
+ return DataSchema.ColumnDataType.OBJECT;
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/SumArrayLongAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/SumArrayLongAggregationFunction.java
index 0b3d3094e06..487504b9057 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/SumArrayLongAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/SumArrayLongAggregationFunction.java
@@ -148,7 +148,10 @@ public class SumArrayLongAggregationFunction extends
BaseSingleInputAggregationF
@Override
public DataSchema.ColumnDataType getIntermediateResultColumnType() {
- return DataSchema.ColumnDataType.LONG_ARRAY;
+ // AggregationResultsBlock#setIntermediateResult and
AggregationFunctionUtils#getIntermediateResult only support
+ // INT/LONG/DOUBLE/STRING/OBJECT stored types, so LongArrayList must use
OBJECT to piggyback on the custom ser-de
+ // implemented in ObjectSerDeUtils.
+ return DataSchema.ColumnDataType.OBJECT;
}
@Override
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunctionTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunctionTest.java
new file mode 100644
index 00000000000..79f0acfabe2
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunctionTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.SyntheticBlockValSets;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.segment.local.utils.GeometrySerializer;
+import org.apache.pinot.segment.local.utils.GeometryUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.util.GeometryCombiner;
+import org.locationtech.jts.operation.union.UnaryUnionOp;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class StUnionAggregationFunctionTest {
+ @Test
+ public void testMergeHandlesGeometryCollectionInputs()
+ throws Exception {
+ ExpressionContext expression =
ExpressionContext.forIdentifier("geometryColumn");
+ StUnionAggregationFunction aggregationFunction = new
StUnionAggregationFunction(List.of(expression));
+ Geometry polygon = GeometryUtils.GEOMETRY_WKT_READER.read("POLYGON((0 0, 0
5, 5 5, 5 0, 0 0))");
+ Geometry line = GeometryUtils.GEOMETRY_WKT_READER.read("LINESTRING(20 20,
25 25)");
+ Geometry geometryCollection =
+ GeometryUtils.GEOMETRY_WKT_READER.read("GEOMETRYCOLLECTION (POINT (30
30), LINESTRING (35 35, 40 40))");
+
+ Geometry intermediate = aggregationFunction.merge(polygon, line);
+ Geometry result = aggregationFunction.merge(intermediate,
geometryCollection);
+
+ Geometry expected = UnaryUnionOp.union(GeometryCombiner.combine(polygon,
line, geometryCollection));
+ assertEquals(result, expected);
+ }
+
+ @Test
+ public void testAggregateHandlesMixedDimensionSequence()
+ throws Exception {
+ ExpressionContext expression =
ExpressionContext.forIdentifier("geometryColumn");
+ StUnionAggregationFunction aggregationFunction = new
StUnionAggregationFunction(List.of(expression));
+
+ Geometry polygon = GeometryUtils.GEOMETRY_WKT_READER.read("POLYGON((0 0, 0
5, 5 5, 5 0, 0 0))");
+ Geometry line = GeometryUtils.GEOMETRY_WKT_READER.read("LINESTRING(10 10,
15 15)");
+ Geometry geometryCollection =
+ GeometryUtils.GEOMETRY_WKT_READER.read("GEOMETRYCOLLECTION (POINT (20
20), LINESTRING (25 25, 30 30))");
+
+ byte[][] values = new byte[][]{
+ GeometrySerializer.serialize(polygon),
+ GeometrySerializer.serialize(line),
+ GeometrySerializer.serialize(geometryCollection)
+ };
+
+ AggregationResultHolder aggregationResultHolder =
aggregationFunction.createAggregationResultHolder();
+ Map<ExpressionContext, BlockValSet> blockValSetMap =
+ Collections.singletonMap(expression, new BytesBlockValSet(values));
+
+ aggregationFunction.aggregate(values.length, aggregationResultHolder,
blockValSetMap);
+
+ Geometry expected = UnaryUnionOp.union(GeometryCombiner.combine(polygon,
line, geometryCollection));
+ assertEquals(aggregationResultHolder.getResult(), expected);
+ }
+
+ private static class BytesBlockValSet extends SyntheticBlockValSets.Base {
+ private final byte[][] _values;
+
+ private BytesBlockValSet(byte[][] values) {
+ _values = values;
+ }
+
+ @Override
+ public FieldSpec.DataType getValueType() {
+ return FieldSpec.DataType.BYTES;
+ }
+
+ @Override
+ public boolean isSingleValue() {
+ return true;
+ }
+
+ @Override
+ public byte[][] getBytesValuesSV() {
+ return _values;
+ }
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
index cc762327ceb..dab36f2c1a9 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
@@ -22,7 +22,6 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Random;
import javax.annotation.Nullable;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -38,7 +37,6 @@ import org.testng.annotations.Test;
public class CLPEncodingRealtimeIntegrationTest extends
BaseClusterIntegrationTestSet {
private List<File> _avroFiles;
private FieldConfig.CompressionCodec _selectedCompressionCodec;
- private final Random _random = new Random();
@BeforeClass
public void setUp()
@@ -48,7 +46,7 @@ public class CLPEncodingRealtimeIntegrationTest extends
BaseClusterIntegrationTe
// Randomly select CLP or CLPV2 compression codec
_selectedCompressionCodec =
- _random.nextBoolean() ? FieldConfig.CompressionCodec.CLP :
FieldConfig.CompressionCodec.CLPV2;
+ RANDOM.nextBoolean() ? FieldConfig.CompressionCodec.CLP :
FieldConfig.CompressionCodec.CLPV2;
// Start the Pinot cluster
startZk();
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/OfflineTimestampIndexIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineTimestampIndexIntegrationTest.java
similarity index 97%
rename from
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/OfflineTimestampIndexIntegrationTest.java
rename to
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineTimestampIndexIntegrationTest.java
index 180b1abf1ad..1c040ebefdd 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/OfflineTimestampIndexIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineTimestampIndexIntegrationTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.integration.tests.custom;
+package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
@@ -24,9 +24,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
-import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
-import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
-import org.apache.pinot.integration.tests.ExplainIntegrationTestTrait;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TimestampConfig;
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
index 6ddc610a087..80349aaa192 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
@@ -25,7 +25,6 @@ import java.io.File;
import java.util.List;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
@@ -234,32 +233,40 @@ public class ArrayTest extends
CustomDataQueryClusterIntegrationTest {
query =
String.format("SELECT "
- + "listAgg(stringCol, ' | ') WITHIN GROUP (ORDER BY stringCol) "
- + "FROM %s LIMIT %d", getTableName(), getCountStarResult());
+ + "listAgg(stringCol, ' | ') WITHIN GROUP (ORDER BY stringCol),
intCol "
+ + "FROM %s GROUP BY intCol LIMIT %d", getTableName(),
getCountStarResult());
jsonNode = postQuery(query);
rows = jsonNode.get("resultTable").get("rows");
- assertEquals(rows.size(), 1);
- row = rows.get(0);
- assertEquals(row.size(), 1);
- String[] splits = row.get(0).asText().split(" \\| ");
- assertEquals(splits.length, getCountStarResult());
- for (int i = 1; i < splits.length; i++) {
- assertTrue(splits[i].compareTo(splits[i - 1]) >= 0);
+ assertEquals(rows.size(), getCountStarResult() / 10);
+ for (int rowId = 0; rowId < getCountStarResult() / 10; rowId++) {
+ row = rows.get(rowId);
+ assertEquals(row.size(), 2);
+ String[] splits = row.get(0).asText().split(" \\| ");
+ if (splits.length > 0) {
+ assertEquals(splits.length, 10);
+ for (int i = 1; i < splits.length; i++) {
+ assertTrue(splits[i].compareTo(splits[i - 1]) >= 0);
+ }
+ }
}
query =
String.format("SELECT "
- + "listAgg(cast(doubleCol AS VARCHAR), ' | ') WITHIN GROUP (ORDER
BY doubleCol) "
- + "FROM %s LIMIT %d", getTableName(), getCountStarResult());
+ + "listAgg(cast(doubleCol AS VARCHAR), ' | ') WITHIN GROUP (ORDER
BY doubleCol), stringCol "
+ + "FROM %s GROUP BY stringCol LIMIT %d", getTableName(),
getCountStarResult());
jsonNode = postQuery(query);
rows = jsonNode.get("resultTable").get("rows");
- assertEquals(rows.size(), 1);
- row = rows.get(0);
- assertEquals(row.size(), 1);
- splits = row.get(0).asText().split(" \\| ");
- assertEquals(splits.length, getCountStarResult());
- for (int i = 1; i < splits.length; i++) {
- assertTrue(Double.parseDouble(splits[i]) >= Double.parseDouble(splits[i
- 1]));
+ assertEquals(rows.size(), getCountStarResult() / 10);
+ for (int rowId = 0; rowId < getCountStarResult() / 10; rowId++) {
+ row = rows.get(rowId);
+ assertEquals(row.size(), 2);
+ String[] splits = row.get(0).asText().split(" \\| ");
+ if (splits.length > 0) {
+ assertEquals(splits.length, 10);
+ for (int i = 1; i < splits.length; i++) {
+ assertTrue(splits[i].compareTo(splits[i - 1]) >= 0);
+ }
+ }
}
}
@@ -341,32 +348,38 @@ public class ArrayTest extends
CustomDataQueryClusterIntegrationTest {
query =
String.format("SELECT "
- + "listAgg(DISTINCT stringCol, ' | ') WITHIN GROUP (ORDER BY
stringCol) "
- + "FROM %s LIMIT %d", getTableName(), getCountStarResult());
+ + "listAgg(DISTINCT stringCol, ' | ') WITHIN GROUP (ORDER BY
stringCol), intCol "
+ + "FROM %s GROUP BY intCol LIMIT %d", getTableName(),
getCountStarResult());
jsonNode = postQuery(query);
rows = jsonNode.get("resultTable").get("rows");
- assertEquals(rows.size(), 1);
- row = rows.get(0);
- assertEquals(row.size(), 1);
- String[] splits = row.get(0).asText().split(" \\| ");
- assertEquals(splits.length, getCountStarResult() / 10);
- for (int j = 1; j < splits.length; j++) {
- assertTrue(splits[j].compareTo(splits[j - 1]) > 0);
+ assertEquals(rows.size(), getCountStarResult() / 10);
+ for (int rowId = 0; rowId < getCountStarResult() / 10; rowId++) {
+ row = rows.get(rowId);
+ assertEquals(row.size(), 2);
+ String[] splits = row.get(0).asText().split(" \\| ");
+ if (splits.length > 1) {
+ assertEquals(splits.length, getCountStarResult());
+ for (int i = 1; i < splits.length; i++) {
+ assertTrue(splits[i].compareTo(splits[i - 1]) >= 0);
+ }
+ }
}
query =
String.format("SELECT "
- + "listAgg(DISTINCT cast(doubleCol AS VARCHAR), ' | ') WITHIN
GROUP (ORDER BY doubleCol) "
- + "FROM %s LIMIT %d", getTableName(), getCountStarResult());
+ + "listAgg(DISTINCT cast(doubleCol AS VARCHAR), ' | ') WITHIN
GROUP (ORDER BY doubleCol), stringCol "
+ + "FROM %s GROUP BY stringCol LIMIT %d", getTableName(),
getCountStarResult());
jsonNode = postQuery(query);
rows = jsonNode.get("resultTable").get("rows");
- assertEquals(rows.size(), 1);
- row = rows.get(0);
- assertEquals(row.size(), 1);
- splits = row.get(0).asText().split(" \\| ");
- assertEquals(splits.length, getCountStarResult() / 10);
- for (int j = 1; j < splits.length; j++) {
- assertTrue(Double.parseDouble(splits[j]) > Double.parseDouble(splits[j -
1]));
+ assertEquals(rows.size(), getCountStarResult() / 10);
+ for (int rowId = 0; rowId < getCountStarResult() / 10; rowId++) {
+ row = rows.get(rowId);
+ assertEquals(row.size(), 2);
+ String[] splits = row.get(0).asText().split(" \\| ");
+ assertEquals(splits.length, 1);
+ for (int j = 1; j < splits.length; j++) {
+ assertTrue(Double.parseDouble(splits[j]) > Double.parseDouble(splits[j
- 1]));
+ }
}
}
@@ -845,7 +858,7 @@ public class ArrayTest extends
CustomDataQueryClusterIntegrationTest {
+ "GENERATE_ARRAY(1, 3, -1) "
+ "FROM %s LIMIT 1", getTableName());
JsonNode jsonNode = postQuery(query);
- assertEquals(jsonNode.get("exceptions").size(), 1);
+ assertEquals(jsonNode.get("exceptions").size(), getNumAvroFiles());
}
@Test(dataProvider = "useV1QueryEngine")
@@ -894,7 +907,7 @@ public class ArrayTest extends
CustomDataQueryClusterIntegrationTest {
+ "GENERATE_ARRAY(2147483648, 2147483650, -1) "
+ "FROM %s LIMIT 1", getTableName());
JsonNode jsonNode = postQuery(query);
- assertEquals(jsonNode.get("exceptions").size(), 1);
+ assertEquals(jsonNode.get("exceptions").size(), getNumAvroFiles());
}
@Test(dataProvider = "useV1QueryEngine")
@@ -944,7 +957,7 @@ public class ArrayTest extends
CustomDataQueryClusterIntegrationTest {
+ "GENERATE_ARRAY(0.3, 0.1, 1.1) "
+ "FROM %s LIMIT 1", getTableName());
JsonNode jsonNode = postQuery(query);
- assertEquals(jsonNode.get("exceptions").size(), 1);
+ assertEquals(jsonNode.get("exceptions").size(), getNumAvroFiles());
}
@Test(dataProvider = "useV1QueryEngine")
@@ -994,7 +1007,7 @@ public class ArrayTest extends
CustomDataQueryClusterIntegrationTest {
+ "GENERATE_ARRAY(CAST(0.3 AS DOUBLE), CAST(0.1 AS DOUBLE),
CAST(1.1 AS DOUBLE)) "
+ "FROM %s LIMIT 1", getTableName());
JsonNode jsonNode = postQuery(query);
- assertEquals(jsonNode.get("exceptions").size(), 1);
+ assertEquals(jsonNode.get("exceptions").size(), getNumAvroFiles());
}
@Test(dataProvider = "useBothQueryEngines")
@@ -1139,37 +1152,36 @@ public class ArrayTest extends
CustomDataQueryClusterIntegrationTest {
null, null)
));
- // create avro file
- File avroFile = new File(_tempDir, "data.avro");
Cache<Integer, GenericData.Record> recordCache =
CacheBuilder.newBuilder().build();
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- fileWriter.create(avroSchema, avroFile);
+ try (AvroFilesAndWriters avroFilesAndWriters =
createAvroFilesAndWriters(avroSchema)) {
+ List<DataFileWriter<GenericData.Record>> writers =
avroFilesAndWriters.getWriters();
for (int i = 0; i < getCountStarResult(); i++) {
// add avro record to file
int finalI = i;
- fileWriter.append(recordCache.get((int) (i % (getCountStarResult() /
10)), () -> {
- // create avro record
- GenericData.Record record = new GenericData.Record(avroSchema);
- record.put(BOOLEAN_COLUMN, finalI % 4 == 0 || finalI % 4 == 1);
- record.put(BOOLEAN_FROM_INT_COLUMN, finalI % 4 == 0 || finalI %
4 == 1 ? 1 : 0);
- record.put(BOOLEAN_FROM_STRING_COLUMN, finalI % 4 == 0 || finalI
% 4 == 1 ? "true" : "false");
- record.put(INT_COLUMN, finalI);
- record.put(LONG_COLUMN, finalI);
- record.put(FLOAT_COLUMN, finalI + RANDOM.nextFloat());
- record.put(DOUBLE_COLUMN, finalI + RANDOM.nextDouble());
- record.put(STRING_COLUMN, RandomStringUtils.random(finalI));
- record.put(TIMESTAMP_COLUMN, finalI);
- record.put(GROUP_BY_COLUMN, String.valueOf(finalI % 10));
- record.put(BOOLEAN_ARRAY_COLUMN, List.of(true, true, false,
false));
- record.put(BOOLEAN_FROM_INT_ARRAY_COLUMN, List.of(1, 1, 0, 0));
- record.put(BOOLEAN_FROM_STRING_ARRAY_COLUMN, List.of("true",
"true", "false", "false"));
- record.put(LONG_ARRAY_COLUMN, List.of(0, 1, 2, 3));
- record.put(DOUBLE_ARRAY_COLUMN, List.of(0.0, 0.1, 0.2, 0.3));
- return record;
- }
- ));
+ writers.get(i % getNumAvroFiles())
+ .append(recordCache.get((int) (i % (getCountStarResult() / 10)),
() -> {
+ // create avro record
+ GenericData.Record record = new
GenericData.Record(avroSchema);
+ record.put(BOOLEAN_COLUMN, finalI % 4 == 0 || finalI % 4 ==
1);
+ record.put(BOOLEAN_FROM_INT_COLUMN, finalI % 4 == 0 ||
finalI % 4 == 1 ? 1 : 0);
+ record.put(BOOLEAN_FROM_STRING_COLUMN, finalI % 4 == 0 ||
finalI % 4 == 1 ? "true" : "false");
+ record.put(INT_COLUMN, finalI);
+ record.put(LONG_COLUMN, finalI);
+ record.put(FLOAT_COLUMN, finalI + RANDOM.nextFloat());
+ record.put(DOUBLE_COLUMN, finalI + RANDOM.nextDouble());
+ record.put(STRING_COLUMN, RandomStringUtils.random(finalI));
+ record.put(TIMESTAMP_COLUMN, finalI);
+ record.put(GROUP_BY_COLUMN, String.valueOf(finalI % 10));
+ record.put(BOOLEAN_ARRAY_COLUMN, List.of(true, true, false,
false));
+ record.put(BOOLEAN_FROM_INT_ARRAY_COLUMN, List.of(1, 1, 0,
0));
+ record.put(BOOLEAN_FROM_STRING_ARRAY_COLUMN, List.of("true",
"true", "false", "false"));
+ record.put(LONG_ARRAY_COLUMN, List.of(0, 1, 2, 3));
+ record.put(DOUBLE_ARRAY_COLUMN, List.of(0.0, 0.1, 0.2, 0.3));
+ return record;
+ }
+ ));
}
+ return avroFilesAndWriters.getAvroFiles();
}
- return List.of(avroFile);
}
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java
index 40a2739745a..1c1d54ccb91 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.UUID;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.codec.binary.Hex;
import org.apache.pinot.common.function.scalar.DataTypeConversionFunctions;
import org.apache.pinot.common.function.scalar.StringFunctions;
@@ -124,9 +123,8 @@ public class BytesTypeTest extends
CustomDataQueryClusterIntegrationTest {
));
- File avroFile = new File(_tempDir, "data.avro");
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- fileWriter.create(avroSchema, avroFile);
+ try (AvroFilesAndWriters avroFilesAndWriters =
createAvroFilesAndWriters(avroSchema)) {
+ List<DataFileWriter<GenericData.Record>> writers =
avroFilesAndWriters.getWriters();
for (int i = 0; i < NUM_TOTAL_DOCS; i++) {
GenericData.Record record = new GenericData.Record(avroSchema);
byte[] bytes = newRandomBytes(RANDOM.nextInt(100) * 2 + 2);
@@ -150,10 +148,10 @@ public class BytesTypeTest extends
CustomDataQueryClusterIntegrationTest {
record.put(RANDOM_BYTES, ByteBuffer.wrap(randomBytes));
record.put(FIXED_STRING, FIXED_HEX_STRIING_VALUE);
record.put(FIXED_BYTES,
ByteBuffer.wrap(DataTypeConversionFunctions.hexToBytes(FIXED_HEX_STRIING_VALUE)));
- fileWriter.append(record);
+ writers.get(i % getNumAvroFiles()).append(record);
}
+ return avroFilesAndWriters.getAvroFiles();
}
- return List.of(avroFile);
}
private static String newRandomBase64String() {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
index 121fc335e38..1eb8809fcdf 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
@@ -23,10 +23,8 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.List;
-import java.util.Random;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.datasketches.cpc.CpcSketch;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.spi.data.FieldSpec;
@@ -174,21 +172,18 @@ public class CpcSketchTest extends
CustomDataQueryClusterIntegrationTest {
null), new org.apache.avro.Schema.Field(MET_CPC_SKETCH_BYTES,
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES),
null, null)));
- // create avro file
- File avroFile = new File(_tempDir, "data.avro");
- Random random = new Random();
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- fileWriter.create(avroSchema, avroFile);
+ try (AvroFilesAndWriters avroFilesAndWriters =
createAvroFilesAndWriters(avroSchema)) {
+ List<DataFileWriter<GenericData.Record>> writers =
avroFilesAndWriters.getWriters();
for (int i = 0; i < getCountStarResult(); i++) {
// create avro record
GenericData.Record record = new GenericData.Record(avroSchema);
- record.put(ID, random.nextInt(10));
+ record.put(ID, RANDOM.nextInt(10));
record.put(MET_CPC_SKETCH_BYTES, ByteBuffer.wrap(getRandomRawValue()));
// add avro record to file
- fileWriter.append(record);
+ writers.get(i % getNumAvroFiles()).append(record);
}
+ return avroFilesAndWriters.getAvroFiles();
}
- return List.of(avroFile);
}
private byte[] getRandomRawValue() {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
index 5c3b9b2cf96..72f7acdc9dd 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
@@ -18,12 +18,17 @@
*/
package org.apache.pinot.integration.tests.custom;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
@@ -133,6 +138,12 @@ public abstract class
CustomDataQueryClusterIntegrationTest extends BaseClusterI
LOGGER.warn("Finished tearing down integration test class: {}",
getClass().getSimpleName());
}
+ @Override
+ protected void startServer()
+ throws Exception {
+ startServers(2);
+ }
+
@Override
protected void pushAvroIntoKafka(List<File> avroFiles)
throws Exception {
@@ -262,7 +273,51 @@ public abstract class
CustomDataQueryClusterIntegrationTest extends BaseClusterI
public abstract List<File> createAvroFiles()
throws Exception;
+ public int getNumAvroFiles() {
+ return 2;
+ }
+
public boolean isRealtimeTable() {
return false;
}
+
+ protected AvroFilesAndWriters
createAvroFilesAndWriters(org.apache.avro.Schema avroSchema)
+ throws IOException {
+ List<File> avroFiles = new ArrayList<>();
+ List<DataFileWriter<GenericData.Record>> writers = new ArrayList<>();
+ for (int i = 0; i < getNumAvroFiles(); i++) {
+ File avroFile = new File(_tempDir, "data-" + i + ".avro");
+ avroFiles.add(avroFile);
+ DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new
GenericDatumWriter<>(avroSchema));
+ writers.add(fileWriter);
+ fileWriter.create(avroSchema, avroFile);
+ }
+ return new AvroFilesAndWriters(avroFiles, writers);
+ }
+
+ protected static class AvroFilesAndWriters implements Closeable {
+ private final List<File> _avroFiles;
+ private final List<DataFileWriter<GenericData.Record>> _writers;
+
+ AvroFilesAndWriters(List<File> avroFiles,
List<DataFileWriter<GenericData.Record>> writers) {
+ _avroFiles = avroFiles;
+ _writers = writers;
+ }
+
+ public List<File> getAvroFiles() {
+ return _avroFiles;
+ }
+
+ public List<DataFileWriter<GenericData.Record>> getWriters() {
+ return _writers;
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ for (DataFileWriter<GenericData.Record> writer : _writers) {
+ writer.close();
+ }
+ }
+ }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java
index f2576ae4910..83048027b7e 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.util.List;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
@@ -96,12 +95,10 @@ public class FloatingPointDataTypeTest extends
CustomDataQueryClusterIntegration
new org.apache.avro.Schema.Field(MET_FLOAT_UNSORTED_NO_DIC,
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE),
null, null)));
- // create avro file
- File avroFile = new File(_tempDir, "data.avro");
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- fileWriter.create(avroSchema, avroFile);
- double sortedValue = 0.0;
- double unsortedValue = 0.05;
+ double sortedValue = 0.0;
+ double unsortedValue = 0.05;
+ try (AvroFilesAndWriters avroFilesAndWriters =
createAvroFilesAndWriters(avroSchema)) {
+ List<DataFileWriter<GenericData.Record>> writers =
avroFilesAndWriters.getWriters();
for (int i = 0; i < NUM_DOCS; i++) {
// create avro record
GenericData.Record record = new GenericData.Record(avroSchema);
@@ -120,10 +117,10 @@ public class FloatingPointDataTypeTest extends
CustomDataQueryClusterIntegration
}
// add avro record to file
- fileWriter.append(record);
+ writers.get(i % getNumAvroFiles()).append(record);
}
+ return avroFilesAndWriters.getAvroFiles();
}
- return List.of(avroFile);
}
@Override
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java
index 14cccfb0e34..3705ef70e9b 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.pinot.core.geospatial.transform.function.ScalarFunctions;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.segment.local.utils.GeometryUtils;
@@ -161,10 +160,10 @@ public class GeoSpatialTest extends
CustomDataQueryClusterIntegrationTest {
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE),
null, null)
));
- File avroFile = new File(_tempDir, "data.avro");
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- fileWriter.create(avroSchema, avroFile);
+ try (AvroFilesAndWriters avroFilesAndWriters =
createAvroFilesAndWriters(avroSchema)) {
+ List<DataFileWriter<GenericData.Record>> writers =
avroFilesAndWriters.getWriters();
for (int i = 0; i < NUM_TOTAL_DOCS; i++) {
+ int fileIdx = Math.min(i / (NUM_TOTAL_DOCS / getNumAvroFiles()),
getNumAvroFiles() - 1);
GenericData.Record record = new GenericData.Record(avroSchema);
record.put(DIM_NAME, "dim" + i);
Point point =
@@ -181,11 +180,10 @@ public class GeoSpatialTest extends
CustomDataQueryClusterIntegrationTest {
record.put(AREA_GEOM_SIZE_NAME, AREA_GEOM_SIZE_DATA[i %
AREA_GEOM_SIZE_DATA.length]);
record.put(AREA_GEOG_NAME, AREA_GEOG_DATA[i % AREA_GEOG_DATA.length]);
record.put(AREA_GEOG_SIZE_NAME, AREA_GEOG_SIZE_DATA[i %
AREA_GEOG_SIZE_DATA.length]);
- fileWriter.append(record);
+ writers.get(fileIdx).append(record);
}
+ return avroFilesAndWriters.getAvroFiles();
}
-
- return List.of(avroFile);
}
@Test(dataProvider = "useBothQueryEngines")
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
index 593b0c40301..dd19bd51348 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
@@ -31,7 +31,6 @@ import java.util.List;
import java.util.Map;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.pinot.common.function.JsonPathCache;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -51,18 +50,18 @@ public class JsonPathTest extends
CustomDataQueryClusterIntegrationTest {
protected static final String DEFAULT_TABLE_NAME = "JsonPathTest";
- protected static final int NUM_TOTAL_DOCS = 1000;
+ protected static final int NUM_DOCS_PER_SEGMENT = 1000;
private static final String MY_MAP_STR_FIELD_NAME = "myMapStr";
private static final String MY_MAP_STR_K1_FIELD_NAME = "myMapStr_k1";
private static final String MY_MAP_STR_K2_FIELD_NAME = "myMapStr_k2";
private static final String COMPLEX_MAP_STR_FIELD_NAME = "complexMapStr";
private static final String COMPLEX_MAP_STR_K3_FIELD_NAME =
"complexMapStr_k3";
- protected final List<String> _sortedSequenceIds = new
ArrayList<>(NUM_TOTAL_DOCS);
+ protected final List<String> _sortedSequenceIds = new
ArrayList<>(NUM_DOCS_PER_SEGMENT);
@Override
protected long getCountStarResult() {
- return NUM_TOTAL_DOCS;
+ return (long) NUM_DOCS_PER_SEGMENT * getNumAvroFiles();
}
@Override
@@ -105,10 +104,8 @@ public class JsonPathTest extends
CustomDataQueryClusterIntegrationTest {
org.apache.avro.Schema.Type.STRING), null, null));
avroSchema.setFields(fields);
- File avroFile = new File(_tempDir, "data.avro");
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- fileWriter.create(avroSchema, avroFile);
- for (int i = 0; i < NUM_TOTAL_DOCS; i++) {
+ try (AvroFilesAndWriters avroFilesAndWriters =
createAvroFilesAndWriters(avroSchema)) {
+ for (int i = 0; i < NUM_DOCS_PER_SEGMENT; i++) {
Map<String, String> map = new HashMap<>();
map.put("k1", "value-k1-" + i);
map.put("k2", "value-k2-" + i);
@@ -123,13 +120,14 @@ public class JsonPathTest extends
CustomDataQueryClusterIntegrationTest {
Map.of("k4-k1", "value-k4-k1-" + i, "k4-k2", "value-k4-k2-" + i,
"k4-k3", "value-k4-k3-" + i,
"met", i));
record.put(COMPLEX_MAP_STR_FIELD_NAME,
JsonUtils.objectToString(complexMap));
- fileWriter.append(record);
+ for (DataFileWriter<GenericData.Record> writer :
avroFilesAndWriters.getWriters()) {
+ writer.append(record);
+ }
_sortedSequenceIds.add(String.valueOf(i));
}
+ Collections.sort(_sortedSequenceIds);
+ return avroFilesAndWriters.getAvroFiles();
}
- Collections.sort(_sortedSequenceIds);
-
- return List.of(avroFile);
}
@Test(dataProvider = "useBothQueryEngines")
@@ -199,19 +197,19 @@ public class JsonPathTest extends
CustomDataQueryClusterIntegrationTest {
String value = rows.get(i).get(0).textValue();
Map<?, ?> results = JsonUtils.stringToObject(value, Map.class);
Assert.assertTrue(value.indexOf("-k1-") > 0);
- Assert.assertEquals(results.get("k1"), "value-k1-" + i);
- Assert.assertEquals(results.get("k2"), "value-k2-" + i);
+ Assert.assertEquals(results.get("k1"), "value-k1-" + i %
NUM_DOCS_PER_SEGMENT);
+ Assert.assertEquals(results.get("k2"), "value-k2-" + i %
NUM_DOCS_PER_SEGMENT);
final List<?> k3 = (List<?>) results.get("k3");
Assert.assertEquals(k3.size(), 3);
- Assert.assertEquals(k3.get(0), "value-k3-0-" + i);
- Assert.assertEquals(k3.get(1), "value-k3-1-" + i);
- Assert.assertEquals(k3.get(2), "value-k3-2-" + i);
+ Assert.assertEquals(k3.get(0), "value-k3-0-" + i % NUM_DOCS_PER_SEGMENT);
+ Assert.assertEquals(k3.get(1), "value-k3-1-" + i % NUM_DOCS_PER_SEGMENT);
+ Assert.assertEquals(k3.get(2), "value-k3-2-" + i % NUM_DOCS_PER_SEGMENT);
final Map<?, ?> k4 = (Map<?, ?>) results.get("k4");
Assert.assertEquals(k4.size(), 4);
- Assert.assertEquals(k4.get("k4-k1"), "value-k4-k1-" + i);
- Assert.assertEquals(k4.get("k4-k2"), "value-k4-k2-" + i);
- Assert.assertEquals(k4.get("k4-k3"), "value-k4-k3-" + i);
- Assert.assertEquals(Double.parseDouble(k4.get("met").toString()), i);
+ Assert.assertEquals(k4.get("k4-k1"), "value-k4-k1-" + i %
NUM_DOCS_PER_SEGMENT);
+ Assert.assertEquals(k4.get("k4-k2"), "value-k4-k2-" + i %
NUM_DOCS_PER_SEGMENT);
+ Assert.assertEquals(k4.get("k4-k3"), "value-k4-k3-" + i %
NUM_DOCS_PER_SEGMENT);
+ Assert.assertEquals(Double.parseDouble(k4.get("met").toString()), i %
NUM_DOCS_PER_SEGMENT);
}
//Filter Query
@@ -220,7 +218,7 @@ public class JsonPathTest extends
CustomDataQueryClusterIntegrationTest {
pinotResponse = postQuery(query);
rows = (ArrayNode) pinotResponse.get("resultTable").get("rows");
Assert.assertNotNull(rows);
- Assert.assertEquals(rows.size(), 1);
+ Assert.assertEquals(rows.size(), getNumAvroFiles());
for (int i = 0; i < rows.size(); i++) {
String value = rows.get(i).get(0).textValue();
Map<?, ?> k4 = JsonUtils.stringToObject(value, Map.class);
@@ -233,7 +231,7 @@ public class JsonPathTest extends
CustomDataQueryClusterIntegrationTest {
//selection order by
query = "Select complexMapStr from " + getTableName()
- + " order by jsonExtractScalar(complexMapStr,'$.k4.k4-k1','STRING')
DESC LIMIT " + NUM_TOTAL_DOCS;
+ + " order by jsonExtractScalar(complexMapStr,'$.k4.k4-k1','STRING')
DESC LIMIT " + NUM_DOCS_PER_SEGMENT;
pinotResponse = postQuery(query);
rows = (ArrayNode) pinotResponse.get("resultTable").get("rows");
Assert.assertNotNull(rows);
@@ -242,7 +240,7 @@ public class JsonPathTest extends
CustomDataQueryClusterIntegrationTest {
String value = rows.get(i).get(0).textValue();
Assert.assertTrue(value.indexOf("-k1-") > 0);
Map<?, ?> results = JsonUtils.stringToObject(value, Map.class);
- String seqId = _sortedSequenceIds.get(NUM_TOTAL_DOCS - 1 - i);
+ String seqId = _sortedSequenceIds.get(NUM_DOCS_PER_SEGMENT - 1 - i /
getNumAvroFiles());
Assert.assertEquals(results.get("k1"), "value-k1-" + seqId);
Assert.assertEquals(results.get("k2"), "value-k2-" + seqId);
final List<?> k3 = (List<?>) results.get("k3");
@@ -271,10 +269,10 @@ public class JsonPathTest extends
CustomDataQueryClusterIntegrationTest {
Assert.assertNotNull(pinotResponse.get("resultTable").get("rows"));
ArrayNode rows = (ArrayNode) pinotResponse.get("resultTable").get("rows");
for (int i = 0; i < rows.size(); i++) {
- String seqId = _sortedSequenceIds.get(NUM_TOTAL_DOCS - 1 - i);
+ String seqId = _sortedSequenceIds.get(NUM_DOCS_PER_SEGMENT - 1 - i);
final JsonNode row = rows.get(i);
Assert.assertEquals(row.get(0).asText(), "value-k1-" + seqId);
- Assert.assertEquals(row.get(1).asDouble(), Double.parseDouble(seqId));
+ Assert.assertEquals(row.get(1).asDouble(), Double.parseDouble(seqId) *
getNumAvroFiles());
}
}
@@ -291,10 +289,10 @@ public class JsonPathTest extends
CustomDataQueryClusterIntegrationTest {
Assert.assertNotNull(pinotResponse.get("resultTable").get("rows"));
ArrayNode rows = (ArrayNode) pinotResponse.get("resultTable").get("rows");
for (int i = 0; i < rows.size(); i++) {
- String seqId = String.valueOf(NUM_TOTAL_DOCS - 1 - i);
+ String seqId = String.valueOf(NUM_DOCS_PER_SEGMENT - 1 - i);
final JsonNode row = rows.get(i);
Assert.assertEquals(row.get(0).asText(), "value-k1-" + seqId);
- Assert.assertEquals(row.get(1).asDouble(), Double.parseDouble(seqId));
+ Assert.assertEquals(row.get(1).asDouble(), Double.parseDouble(seqId) *
getNumAvroFiles());
}
}
@@ -313,7 +311,7 @@ public class JsonPathTest extends
CustomDataQueryClusterIntegrationTest {
Assert.assertEquals(rows.size(), 1);
final JsonNode row = rows.get(0);
Assert.assertEquals(row.get(0).asText(), "defaultKey");
- Assert.assertEquals(row.get(1).asDouble(), 1000.0);
+ Assert.assertEquals(row.get(1).asDouble(), 1000.0 * getNumAvroFiles());
}
@Test(dataProvider = "useBothQueryEngines")
@@ -331,7 +329,7 @@ public class JsonPathTest extends
CustomDataQueryClusterIntegrationTest {
Assert.assertEquals(rows.size(), 1);
final JsonNode row = rows.get(0);
Assert.assertEquals(row.get(0).asText(), "defaultKey");
- Assert.assertTrue(Math.abs(row.get(1).asDouble() - 100.0) < 1e-10);
+ Assert.assertTrue(Math.abs(row.get(1).asDouble() - 100.0 *
getNumAvroFiles()) < 1e-10);
}
@Test(dataProvider = "useBothQueryEngines")
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java
index 8d54850f23e..1273a28666e 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
@@ -107,10 +106,9 @@ public class MapFieldTypeRealtimeTest extends
CustomDataQueryClusterIntegrationT
null));
avroSchema.setFields(fields);
- File avroFile = new File(_tempDir, "data.avro");
long tsBase = System.currentTimeMillis();
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- fileWriter.create(avroSchema, avroFile);
+ try (AvroFilesAndWriters avroFilesAndWriters =
createAvroFilesAndWriters(avroSchema)) {
+ List<DataFileWriter<GenericData.Record>> writers =
avroFilesAndWriters.getWriters();
for (int i = 0; i < NUM_DOCS; i++) {
Map<String, String> stringMap = new HashMap<>();
Map<String, Integer> intMap = new HashMap<>();
@@ -123,10 +121,10 @@ public class MapFieldTypeRealtimeTest extends
CustomDataQueryClusterIntegrationT
record.put(STRING_MAP_FIELD_NAME, stringMap);
record.put(INT_MAP_FIELD_NAME, intMap);
record.put(TIMESTAMP_FIELD_NAME, tsBase + i);
- fileWriter.append(record);
+ writers.get(i % getNumAvroFiles()).append(record);
}
+ return avroFilesAndWriters.getAvroFiles();
}
- return List.of(avroFile);
}
protected int getSelectionDefaultDocCount() {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
index 2974d20482f..7c8a3904485 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -48,14 +47,14 @@ public class MapFieldTypeTest extends
CustomDataQueryClusterIntegrationTest {
// Default settings
private static final int V1_DEFAULT_SELECTION_COUNT = 10;
protected static final String DEFAULT_TABLE_NAME = "MapFieldTypeTest";
- private static final int NUM_DOCS = 1000;
+ private static final int NUM_DOCS_PER_SEGMENT = 1000;
private static final String STRING_MAP_FIELD_NAME = "stringMap";
private static final String INT_MAP_FIELD_NAME = "intMap";
private int _setSelectionDefaultDocCount = 10;
@Override
protected long getCountStarResult() {
- return NUM_DOCS;
+ return (long) NUM_DOCS_PER_SEGMENT * getNumAvroFiles();
}
@Override
@@ -141,10 +140,8 @@ public class MapFieldTypeTest extends
CustomDataQueryClusterIntegrationTest {
new org.apache.avro.Schema.Field(INT_MAP_FIELD_NAME,
intMapAvroSchema, null, null));
avroSchema.setFields(fields);
- File avroFile = new File(_tempDir, "data.avro");
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- fileWriter.create(avroSchema, avroFile);
- for (int i = 0; i < NUM_DOCS; i++) {
+ try (AvroFilesAndWriters avroFilesAndWriters =
createAvroFilesAndWriters(avroSchema)) {
+ for (int i = 0; i < NUM_DOCS_PER_SEGMENT; i++) {
Map<String, String> stringMap = new HashMap<>();
Map<String, Integer> intMap = new HashMap<>();
for (int j = 0; j < i; j++) {
@@ -155,10 +152,12 @@ public class MapFieldTypeTest extends
CustomDataQueryClusterIntegrationTest {
GenericData.Record record = new GenericData.Record(avroSchema);
record.put(STRING_MAP_FIELD_NAME, stringMap);
record.put(INT_MAP_FIELD_NAME, intMap);
- fileWriter.append(record);
+ for (DataFileWriter<GenericData.Record> writer :
avroFilesAndWriters.getWriters()) {
+ writer.append(record);
+ }
}
+ return avroFilesAndWriters.getAvroFiles();
}
- return List.of(avroFile);
}
protected int getSelectionDefaultDocCount() {
@@ -178,9 +177,9 @@ public class MapFieldTypeTest extends
CustomDataQueryClusterIntegrationTest {
for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
JsonNode intMap = rows.get(i).get(0);
JsonNode stringMap = rows.get(i).get(1);
- for (int j = 0; j < i; j++) {
- assertEquals(intMap.get("k" + j).intValue(), i);
- assertEquals(stringMap.get("k" + j).textValue(), "v" + i);
+ for (int j = 0; j < i % NUM_DOCS_PER_SEGMENT; j++) {
+ assertEquals(intMap.get("k" + j).intValue(), i % NUM_DOCS_PER_SEGMENT);
+ assertEquals(stringMap.get("k" + j).textValue(), "v" + i %
NUM_DOCS_PER_SEGMENT);
}
}
// Selection only
@@ -192,9 +191,13 @@ public class MapFieldTypeTest extends
CustomDataQueryClusterIntegrationTest {
assertEquals(rows.get(0).get(0).textValue(), "null");
assertEquals(rows.get(0).get(1).intValue(), -2147483648);
- for (int i = 1; i < getSelectionDefaultDocCount(); i++) {
- assertEquals(rows.get(i).get(0).textValue(), "v" + i);
- assertEquals(rows.get(i).get(1).intValue(), i);
+ for (int i = 1; i < NUM_DOCS_PER_SEGMENT; i++) {
+ for (int j = 0; j < getNumAvroFiles(); j++) {
+ if (i + NUM_DOCS_PER_SEGMENT < getSelectionDefaultDocCount()) {
+ assertEquals(rows.get(i + NUM_DOCS_PER_SEGMENT *
j).get(0).textValue(), "v" + i);
+ assertEquals(rows.get(i + NUM_DOCS_PER_SEGMENT *
j).get(1).intValue(), i);
+ }
+ }
}
// Selection order-by
@@ -208,22 +211,28 @@ public class MapFieldTypeTest extends
CustomDataQueryClusterIntegrationTest {
assertEquals(rows.get(0).get(0).intValue(), -2147483648);
assertEquals(rows.get(0).get(1).intValue(), -2147483648);
assertEquals(rows.get(0).get(2).textValue(), "null");
- assertEquals(rows.get(1).get(0).intValue(), 1);
+ assertEquals(rows.get(1).get(0).intValue(), -2147483648);
assertEquals(rows.get(1).get(1).intValue(), -2147483648);
- assertEquals(rows.get(1).get(2).textValue(), "v1");
- for (int i = 2; i < getSelectionDefaultDocCount(); i++) {
- assertEquals(rows.get(i).get(0).intValue(), i);
- assertEquals(rows.get(i).get(1).intValue(), i);
- assertEquals(rows.get(i).get(2).textValue(), "v" + i);
- assertEquals(rows.get(i).get(3).textValue(), "v" + i);
+ assertEquals(rows.get(1).get(2).textValue(), "null");
+ assertEquals(rows.get(2).get(0).intValue(), 1);
+ assertEquals(rows.get(2).get(1).intValue(), -2147483648);
+ assertEquals(rows.get(2).get(2).textValue(), "v1");
+ assertEquals(rows.get(3).get(0).intValue(), 1);
+ assertEquals(rows.get(3).get(1).intValue(), -2147483648);
+ assertEquals(rows.get(3).get(2).textValue(), "v1");
+ for (int i = 4; i < getSelectionDefaultDocCount(); i++) {
+ assertEquals(rows.get(i).get(0).intValue(), i / getNumAvroFiles());
+ assertEquals(rows.get(i).get(1).intValue(), i / getNumAvroFiles());
+ assertEquals(rows.get(i).get(2).textValue(), "v" + i /
getNumAvroFiles());
+ assertEquals(rows.get(i).get(3).textValue(), "v" + i /
getNumAvroFiles());
}
// Aggregation only
query = "SELECT MAX(intMap['k0']), MAX(intMap['k1']) FROM " +
getTableName();
pinotResponse = postQuery(query);
assertEquals(pinotResponse.get("exceptions").size(), 0);
-
assertEquals(pinotResponse.get("resultTable").get("rows").get(0).get(0).intValue(),
NUM_DOCS - 1);
-
assertEquals(pinotResponse.get("resultTable").get("rows").get(0).get(1).intValue(),
NUM_DOCS - 1);
+
assertEquals(pinotResponse.get("resultTable").get("rows").get(0).get(0).intValue(),
NUM_DOCS_PER_SEGMENT - 1);
+
assertEquals(pinotResponse.get("resultTable").get("rows").get(0).get(1).intValue(),
NUM_DOCS_PER_SEGMENT - 1);
// Aggregation group-by
query = "SELECT stringMap['k0'] AS key, MIN(intMap['k0']) AS value FROM "
+ getTableName()
@@ -231,10 +240,10 @@ public class MapFieldTypeTest extends
CustomDataQueryClusterIntegrationTest {
pinotResponse = postQuery(query);
assertEquals(pinotResponse.get("exceptions").size(), 0);
rows = pinotResponse.get("resultTable").get("rows");
- assertEquals(rows.size(), getSelectionDefaultDocCount());
+ assertEquals(rows.size(), Math.min(getSelectionDefaultDocCount(),
NUM_DOCS_PER_SEGMENT));
assertEquals(rows.get(0).get(0).textValue(), "null");
assertEquals(rows.get(0).get(1).intValue(), Integer.MIN_VALUE);
- for (int i = 1; i < getSelectionDefaultDocCount(); i++) {
+ for (int i = 1; i < Math.min(getSelectionDefaultDocCount(),
NUM_DOCS_PER_SEGMENT); i++) {
assertEquals(rows.get(i).get(0).textValue(), "v" + i);
assertEquals(rows.get(i).get(1).intValue(), i);
}
@@ -244,14 +253,14 @@ public class MapFieldTypeTest extends
CustomDataQueryClusterIntegrationTest {
pinotResponse = postQuery(query);
assertEquals(pinotResponse.get("exceptions").size(), 0);
rows = pinotResponse.get("resultTable").get("rows");
- assertEquals(rows.size(), 1);
+ assertEquals(rows.size(), getNumAvroFiles());
assertEquals(rows.get(0).get(0).textValue(), "v25");
query = "SELECT intMap['k2'] FROM " + getTableName() + " WHERE
intMap['k1'] = 25";
pinotResponse = postQuery(query);
assertEquals(pinotResponse.get("exceptions").size(), 0);
rows = pinotResponse.get("resultTable").get("rows");
- assertEquals(rows.size(), 1);
+ assertEquals(rows.size(), getNumAvroFiles());
assertEquals(rows.get(0).get(0).intValue(), 25);
// Filter on non-existing key
@@ -324,7 +333,7 @@ public class MapFieldTypeTest extends
CustomDataQueryClusterIntegrationTest {
assertEquals(pinotResponse.get("exceptions").size(), 0);
JsonNode rows = pinotResponse.get("resultTable").get("rows");
// Only records with k1 = 'v25' or 'v26' should be returned
- assertEquals(rows.size(), 2);
+ assertEquals(rows.size(), 2 * getNumAvroFiles());
// Verify the returned values
for (int i = 0; i < rows.size(); i++) {
@@ -338,7 +347,7 @@ public class MapFieldTypeTest extends
CustomDataQueryClusterIntegrationTest {
assertEquals(pinotResponse.get("exceptions").size(), 0);
rows = pinotResponse.get("resultTable").get("rows");
// Only records with k1 = 25 or 26 should be returned
- assertEquals(rows.size(), 2);
+ assertEquals(rows.size(), 2 * getNumAvroFiles());
// Verify the returned values
for (int i = 0; i < rows.size(); i++) {
@@ -443,6 +452,6 @@ public class MapFieldTypeTest extends
CustomDataQueryClusterIntegrationTest {
@Override
protected void setUseMultiStageQueryEngine(boolean useMultiStageQueryEngine)
{
super.setUseMultiStageQueryEngine(useMultiStageQueryEngine);
- _setSelectionDefaultDocCount = useMultiStageQueryEngine ? NUM_DOCS :
V1_DEFAULT_SELECTION_COUNT;
+ _setSelectionDefaultDocCount = useMultiStageQueryEngine ? (int)
getCountStarResult() : V1_DEFAULT_SELECTION_COUNT;
}
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
index fdf88db4108..55285092bc8 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.pinot.segment.local.utils.SchemaUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -46,7 +45,7 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
// Default settings
protected static final String DEFAULT_TABLE_NAME = "MapTypeTest";
- private static final int NUM_DOCS = 1000;
+ private static final int NUM_DOCS_PER_SEGMENT = 1000;
private static final String STRING_KEY_MAP_FIELD_NAME = "stringKeyMap";
private static final String INT_KEY_MAP_FIELD_NAME = "intKeyMap";
private static final String STRING_KEY_MAP_STR_FIELD_NAME =
"stringKeyMapStr";
@@ -55,7 +54,7 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
@Override
protected long getCountStarResult() {
- return NUM_DOCS;
+ return (long) NUM_DOCS_PER_SEGMENT * getNumAvroFiles();
}
@Override
@@ -99,24 +98,23 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
new org.apache.avro.Schema.Field(INT_KEY_MAP_FIELD_NAME,
intKeyMapAvroSchema, null, null));
avroSchema.setFields(fields);
- File avroFile = new File(_tempDir, "data.avro");
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- fileWriter.create(avroSchema, avroFile);
- for (int i = 0; i < NUM_DOCS; i++) {
+ try (AvroFilesAndWriters avroFilesAndWriters =
createAvroFilesAndWriters(avroSchema)) {
+ for (int i = 0; i < NUM_DOCS_PER_SEGMENT; i++) {
Map<String, Integer> stringKeyMap = new HashMap<>();
stringKeyMap.put("k1", i);
- stringKeyMap.put("k2", NUM_DOCS + i);
+ stringKeyMap.put("k2", NUM_DOCS_PER_SEGMENT + i);
Map<Integer, String> intKeyMap = new HashMap<>();
intKeyMap.put(95, Integer.toString(i));
- intKeyMap.put(717, Integer.toString(NUM_DOCS + i));
+ intKeyMap.put(717, Integer.toString(NUM_DOCS_PER_SEGMENT + i));
GenericData.Record record = new GenericData.Record(avroSchema);
record.put(STRING_KEY_MAP_FIELD_NAME, stringKeyMap);
record.put(INT_KEY_MAP_FIELD_NAME, intKeyMap);
- fileWriter.append(record);
+ for (DataFileWriter<GenericData.Record> writer :
avroFilesAndWriters.getWriters()) {
+ writer.append(record);
+ }
}
+ return avroFilesAndWriters.getAvroFiles();
}
-
- return List.of(avroFile);
}
protected int getSelectionDefaultDocCount() {
@@ -134,7 +132,8 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
JsonNode rows = pinotResponse.get("resultTable").get("rows");
assertEquals(rows.size(), getSelectionDefaultDocCount());
for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
- assertEquals(rows.get(i).get(0).textValue(),
String.format("{\"k1\":%d,\"k2\":%d}", i, NUM_DOCS + i));
+ assertEquals(rows.get(i).get(0).textValue(),
String.format("{\"k1\":%d,\"k2\":%d}", i % NUM_DOCS_PER_SEGMENT,
+ NUM_DOCS_PER_SEGMENT + i % NUM_DOCS_PER_SEGMENT));
}
query = "SELECT jsonExtractScalar(stringKeyMapStr, '$.k1', 'INT') FROM " +
getTableName();
pinotResponse = postQuery(query);
@@ -142,7 +141,7 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
rows = pinotResponse.get("resultTable").get("rows");
assertEquals(rows.size(), getSelectionDefaultDocCount());
for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
- assertEquals(rows.get(i).get(0).intValue(), i);
+ assertEquals(rows.get(i).get(0).intValue(), i % NUM_DOCS_PER_SEGMENT);
}
query = "SELECT jsonExtractScalar(intKeyMapStr, '$.95', 'INT') FROM " +
getTableName();
pinotResponse = postQuery(query);
@@ -150,7 +149,7 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
rows = pinotResponse.get("resultTable").get("rows");
assertEquals(rows.size(), getSelectionDefaultDocCount());
for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
- assertEquals(rows.get(i).get(0).intValue(), i);
+ assertEquals(rows.get(i).get(0).intValue(), i % NUM_DOCS_PER_SEGMENT);
}
// Selection order-by
@@ -161,7 +160,7 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
rows = pinotResponse.get("resultTable").get("rows");
assertEquals(rows.size(), getSelectionDefaultDocCount());
for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
- assertEquals(rows.get(i).get(0).intValue(), NUM_DOCS + i);
+ assertEquals(rows.get(i).get(0).intValue(), NUM_DOCS_PER_SEGMENT + i /
getNumAvroFiles());
}
query = "SELECT jsonExtractScalar(intKeyMapStr, '$.717', 'INT') FROM " +
getTableName()
+ " ORDER BY jsonExtractScalar(intKeyMapStr, '$.95', 'INT')";
@@ -170,7 +169,7 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
rows = pinotResponse.get("resultTable").get("rows");
assertEquals(rows.size(), getSelectionDefaultDocCount());
for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
- assertEquals(rows.get(i).get(0).intValue(), NUM_DOCS + i);
+ assertEquals(rows.get(i).get(0).intValue(), NUM_DOCS_PER_SEGMENT + i /
getNumAvroFiles());
}
// Aggregation only
@@ -178,12 +177,12 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
pinotResponse = postQuery(query);
assertEquals(pinotResponse.get("exceptions").size(), 0);
JsonNode aggregationResult =
pinotResponse.get("resultTable").get("rows").get(0).get(0);
- assertEquals(aggregationResult.intValue(), NUM_DOCS - 1);
+ assertEquals(aggregationResult.intValue(), NUM_DOCS_PER_SEGMENT - 1);
query = "SELECT MAX(jsonExtractScalar(intKeyMapStr, '$.95', 'INT')) FROM "
+ getTableName();
pinotResponse = postQuery(query);
assertEquals(pinotResponse.get("exceptions").size(), 0);
aggregationResult =
pinotResponse.get("resultTable").get("rows").get(0).get(0);
- assertEquals(aggregationResult.intValue(), NUM_DOCS - 1);
+ assertEquals(aggregationResult.intValue(), NUM_DOCS_PER_SEGMENT - 1);
// Aggregation group-by
query = "SELECT jsonExtractScalar(stringKeyMapStr, '$.k1', 'INT') AS key, "
@@ -192,10 +191,11 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
pinotResponse = postQuery(query);
assertEquals(pinotResponse.get("exceptions").size(), 0);
rows = pinotResponse.get("resultTable").get("rows");
- assertEquals(rows.size(), getSelectionDefaultDocCount());
- for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
+ int expectedReturnRows = Math.min(getSelectionDefaultDocCount(),
NUM_DOCS_PER_SEGMENT);
+ assertEquals(rows.size(), expectedReturnRows);
+ for (int i = 0; i < expectedReturnRows; i++) {
assertEquals(rows.get(i).get(0).intValue(), i);
- assertEquals(rows.get(i).get(1).intValue(), NUM_DOCS + i);
+ assertEquals(rows.get(i).get(1).intValue(), NUM_DOCS_PER_SEGMENT + i);
}
query = "SELECT jsonExtractScalar(intKeyMapStr, '$.95', 'INT') AS key, "
+ "MIN(jsonExtractScalar(intKeyMapStr, '$.717', 'INT')) AS value FROM
" + getTableName()
@@ -203,10 +203,10 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
pinotResponse = postQuery(query);
assertEquals(pinotResponse.get("exceptions").size(), 0);
rows = pinotResponse.get("resultTable").get("rows");
- assertEquals(rows.size(), getSelectionDefaultDocCount());
- for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
+ assertEquals(rows.size(), expectedReturnRows);
+ for (int i = 0; i < expectedReturnRows; i++) {
assertEquals(rows.get(i).get(0).intValue(), i);
- assertEquals(rows.get(i).get(1).intValue(), NUM_DOCS + i);
+ assertEquals(rows.get(i).get(1).intValue(), NUM_DOCS_PER_SEGMENT + i);
}
// Filter
@@ -215,15 +215,15 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
pinotResponse = postQuery(query);
assertEquals(pinotResponse.get("exceptions").size(), 0);
rows = pinotResponse.get("resultTable").get("rows");
- assertEquals(rows.size(), 1);
- assertEquals(rows.get(0).get(0).intValue(), NUM_DOCS + 25);
+ assertEquals(rows.size(), getNumAvroFiles());
+ assertEquals(rows.get(0).get(0).intValue(), NUM_DOCS_PER_SEGMENT + 25);
query = "SELECT jsonExtractScalar(intKeyMapStr, '$.717', 'INT') FROM " +
getTableName()
+ " WHERE jsonExtractScalar(intKeyMapStr, '$.95', 'INT') = 25";
pinotResponse = postQuery(query);
assertEquals(pinotResponse.get("exceptions").size(), 0);
rows = pinotResponse.get("resultTable").get("rows");
- assertEquals(rows.size(), 1);
- assertEquals(rows.get(0).get(0).intValue(), NUM_DOCS + 25);
+ assertEquals(rows.size(), getNumAvroFiles());
+ assertEquals(rows.get(0).get(0).intValue(), NUM_DOCS_PER_SEGMENT + 25);
// Select non-existing key (illegal query)
query = "SELECT jsonExtractScalar(stringKeyMapStr, '$.k3', 'INT') FROM " +
getTableName();
@@ -253,7 +253,7 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
JsonNode rows = pinotResponse.get("resultTable").get("rows");
assertEquals(rows.size(), getSelectionDefaultDocCount());
for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
- assertEquals(rows.get(i).get(0).intValue(), i);
+ assertEquals(rows.get(i).get(0).intValue(), i % NUM_DOCS_PER_SEGMENT);
}
query = "SELECT mapValue(intKeyMap__KEYS, 95, intKeyMap__VALUES) FROM " +
getTableName();
pinotResponse = postQuery(query);
@@ -261,7 +261,7 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
rows = pinotResponse.get("resultTable").get("rows");
assertEquals(rows.size(), getSelectionDefaultDocCount());
for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
- assertEquals(rows.get(i).get(0).intValue(), i);
+ assertEquals(rows.get(i).get(0).intValue(), i % NUM_DOCS_PER_SEGMENT);
}
// Selection order-by
@@ -272,7 +272,7 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
rows = pinotResponse.get("resultTable").get("rows");
assertEquals(rows.size(), getSelectionDefaultDocCount());
for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
- assertEquals(rows.get(i).get(0).intValue(), NUM_DOCS + i);
+ assertEquals(rows.get(i).get(0).intValue(), NUM_DOCS_PER_SEGMENT + i /
getNumAvroFiles());
}
query = "SELECT mapValue(intKeyMap__KEYS, 717, intKeyMap__VALUES) FROM " +
getTableName()
+ " ORDER BY mapValue(intKeyMap__KEYS, 95, intKeyMap__VALUES)";
@@ -281,7 +281,7 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
rows = pinotResponse.get("resultTable").get("rows");
assertEquals(rows.size(), getSelectionDefaultDocCount());
for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
- assertEquals(rows.get(i).get(0).intValue(), NUM_DOCS + i);
+ assertEquals(rows.get(i).get(0).intValue(), NUM_DOCS_PER_SEGMENT + i /
getNumAvroFiles());
}
// Aggregation only
@@ -289,12 +289,12 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
pinotResponse = postQuery(query);
assertEquals(pinotResponse.get("exceptions").size(), 0);
JsonNode aggregationResult =
pinotResponse.get("resultTable").get("rows").get(0).get(0);
- assertEquals(aggregationResult.intValue(), NUM_DOCS - 1);
+ assertEquals(aggregationResult.intValue(), NUM_DOCS_PER_SEGMENT - 1);
query = "SELECT MAX(mapValue(intKeyMap__KEYS, 95, intKeyMap__VALUES)) FROM
" + getTableName();
pinotResponse = postQuery(query);
assertEquals(pinotResponse.get("exceptions").size(), 0);
aggregationResult =
pinotResponse.get("resultTable").get("rows").get(0).get(0);
- assertEquals(aggregationResult.intValue(), NUM_DOCS - 1);
+ assertEquals(aggregationResult.intValue(), NUM_DOCS_PER_SEGMENT - 1);
// Aggregation group-by
query = "SELECT mapValue(stringKeyMap__KEYS, 'k1', stringKeyMap__VALUES)
AS key, "
@@ -303,10 +303,11 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
pinotResponse = postQuery(query);
assertEquals(pinotResponse.get("exceptions").size(), 0);
rows = pinotResponse.get("resultTable").get("rows");
- assertEquals(rows.size(), getSelectionDefaultDocCount());
- for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
+ int expectedReturnRows = Math.min(getSelectionDefaultDocCount(),
NUM_DOCS_PER_SEGMENT);
+ assertEquals(rows.size(), expectedReturnRows);
+ for (int i = 0; i < expectedReturnRows; i++) {
assertEquals(rows.get(i).get(0).intValue(), i);
- assertEquals(rows.get(i).get(1).intValue(), NUM_DOCS + i);
+ assertEquals(rows.get(i).get(1).intValue(), NUM_DOCS_PER_SEGMENT + i);
}
query = "SELECT mapValue(intKeyMap__KEYS, 95, intKeyMap__VALUES) AS key, "
+ "MIN(mapValue(intKeyMap__KEYS, 717, intKeyMap__VALUES)) AS value
FROM " + getTableName()
@@ -314,10 +315,10 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
pinotResponse = postQuery(query);
assertEquals(pinotResponse.get("exceptions").size(), 0);
rows = pinotResponse.get("resultTable").get("rows");
- assertEquals(rows.size(), getSelectionDefaultDocCount());
- for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
+ assertEquals(rows.size(), expectedReturnRows);
+ for (int i = 0; i < expectedReturnRows; i++) {
assertEquals(rows.get(i).get(0).intValue(), i);
- assertEquals(rows.get(i).get(1).intValue(), NUM_DOCS + i);
+ assertEquals(rows.get(i).get(1).intValue(), NUM_DOCS_PER_SEGMENT + i);
}
// Filter
@@ -326,15 +327,15 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
pinotResponse = postQuery(query);
assertEquals(pinotResponse.get("exceptions").size(), 0);
rows = pinotResponse.get("resultTable").get("rows");
- assertEquals(rows.size(), 1);
- assertEquals(rows.get(0).get(0).intValue(), NUM_DOCS + 25);
+ assertEquals(rows.size(), getNumAvroFiles());
+ assertEquals(rows.get(0).get(0).intValue(), NUM_DOCS_PER_SEGMENT + 25);
query = "SELECT mapValue(intKeyMap__KEYS, 717, intKeyMap__VALUES) FROM " +
getTableName()
+ " WHERE mapValue(intKeyMap__KEYS, 95, intKeyMap__VALUES) = 25";
pinotResponse = postQuery(query);
assertEquals(pinotResponse.get("exceptions").size(), 0);
rows = pinotResponse.get("resultTable").get("rows");
- assertEquals(rows.size(), 1);
- assertEquals(rows.get(0).get(0).intValue(), NUM_DOCS + 25);
+ assertEquals(rows.size(), getNumAvroFiles());
+ assertEquals(rows.get(0).get(0).intValue(), NUM_DOCS_PER_SEGMENT + 25);
// Filter on non-existing key
query = "SELECT mapValue(stringKeyMap__KEYS, 'k2', stringKeyMap__VALUES)
FROM " + getTableName()
@@ -414,6 +415,6 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
@Override
protected void setUseMultiStageQueryEngine(boolean useMultiStageQueryEngine)
{
super.setUseMultiStageQueryEngine(useMultiStageQueryEngine);
- _setSelectionDefaultDocCount = useMultiStageQueryEngine ? 1000 : 10;
+ _setSelectionDefaultDocCount = useMultiStageQueryEngine ?
NUM_DOCS_PER_SEGMENT * getNumAvroFiles() : 10;
}
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiColumnTextIndicesTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiColumnTextIndicesTest.java
index cea7e334061..ed1f439401d 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiColumnTextIndicesTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiColumnTextIndicesTest.java
@@ -37,7 +37,6 @@ import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.MultiColumnTextIndexConfig;
@@ -230,7 +229,6 @@ public class MultiColumnTextIndicesTest extends
CustomDataQueryClusterIntegratio
}
assertEquals(skills.size(), NUM_SKILLS);
- File avroFile = new File(_tempDir, "data.avro");
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
avroSchema.setFields(Arrays.asList(
new Field(NULLABLE_TEXT_COL, createUnion(create(Type.NULL),
create(Type.STRING)), null, null),
@@ -248,8 +246,8 @@ public class MultiColumnTextIndicesTest extends
CustomDataQueryClusterIntegratio
List<String> valueList = List.of("value");
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- fileWriter.create(avroSchema, avroFile);
+ try (AvroFilesAndWriters avroFilesAndWriters =
createAvroFilesAndWriters(avroSchema)) {
+ List<DataFileWriter<GenericData.Record>> writers =
avroFilesAndWriters.getWriters();
for (int i = 0; i < NUM_RECORDS; i++) {
GenericData.Record record = new GenericData.Record(avroSchema);
record.put(NULLABLE_TEXT_COL, (i & 1) == 1 ? null : "value");
@@ -264,10 +262,10 @@ public class MultiColumnTextIndicesTest extends
CustomDataQueryClusterIntegratio
record.put(DICT_TEXT_COL_CASE_SENSITIVE_MV, List.of(skills.get(i %
NUM_SKILLS), "" + i));
record.put(TEXT_COL_NATIVE, skills.get(i % NUM_SKILLS));
record.put(TIME_COL, System.currentTimeMillis());
- fileWriter.append(record);
+ writers.get(i % getNumAvroFiles()).append(record);
}
+ return avroFilesAndWriters.getAvroFiles();
}
- return List.of(avroFile);
}
@Override
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java
index 2687f0529d9..7bd54a4d195 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java
@@ -24,10 +24,8 @@ import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.List;
-import java.util.Random;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -81,17 +79,14 @@ public class SumPrecisionTest extends
CustomDataQueryClusterIntegrationTest {
new org.apache.avro.Schema.Field(MET_LONG,
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG),
null, null)));
- // create avro file
- File avroFile = new File(_tempDir, "data.avro");
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- fileWriter.create(avroSchema, avroFile);
- int dimCardinality = 50;
- BigDecimal bigDecimalBase = BigDecimal.valueOf(Integer.MAX_VALUE + 1L);
- Random random = new Random();
+ int dimCardinality = 50;
+ BigDecimal bigDecimalBase = BigDecimal.valueOf(Integer.MAX_VALUE + 1L);
+ try (AvroFilesAndWriters avroFilesAndWriters =
createAvroFilesAndWriters(avroSchema)) {
+ List<DataFileWriter<GenericData.Record>> writers =
avroFilesAndWriters.getWriters();
for (int i = 0; i < getCountStarResult(); i++) {
// create avro record
GenericData.Record record = new GenericData.Record(avroSchema);
- record.put(DIM_NAME, "dim" + (random.nextInt() % dimCardinality));
+ record.put(DIM_NAME, "dim" + (RANDOM.nextInt() % dimCardinality));
BigDecimal bigDecimalValue = bigDecimalBase.add(BigDecimal.valueOf(i));
record.put(MET_BIG_DECIMAL_BYTES,
ByteBuffer.wrap(BigDecimalUtils.serialize(bigDecimalValue)));
@@ -100,10 +95,10 @@ public class SumPrecisionTest extends
CustomDataQueryClusterIntegrationTest {
record.put(MET_LONG, bigDecimalValue.longValue());
// add avro record to file
- fileWriter.append(record);
+ writers.get(i % getNumAvroFiles()).append(record);
}
+ return avroFilesAndWriters.getAvroFiles();
}
- return List.of(avroFile);
}
@Override
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
index 09d63a027f2..34d374e4a70 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
@@ -33,7 +33,6 @@ import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -185,7 +184,6 @@ public class TextIndicesTest extends
CustomDataQueryClusterIntegrationTest {
}
assertEquals(skills.size(), NUM_SKILLS);
- File avroFile = new File(_tempDir, "data.avro");
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
avroSchema.setFields(Arrays.asList(
new Field(TEXT_COLUMN_NULL_NAME, createUnion(create(Type.NULL),
create(Type.STRING)), null, null),
@@ -197,8 +195,8 @@ public class TextIndicesTest extends
CustomDataQueryClusterIntegrationTest {
create(Type.STRING), null, null),
new Field(TIME_COLUMN_NAME,
create(Type.LONG), null, null)));
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- fileWriter.create(avroSchema, avroFile);
+ try (AvroFilesAndWriters avroFilesAndWriters =
createAvroFilesAndWriters(avroSchema)) {
+ List<DataFileWriter<GenericData.Record>> writers =
avroFilesAndWriters.getWriters();
for (int i = 0; i < NUM_RECORDS; i++) {
GenericData.Record record = new GenericData.Record(avroSchema);
record.put(TEXT_COLUMN_NULL_NAME, i % 2 == 0 ? null : skills.get(i %
NUM_SKILLS));
@@ -206,10 +204,10 @@ public class TextIndicesTest extends
CustomDataQueryClusterIntegrationTest {
record.put(TEXT_COLUMN_NAME_CASE_SENSITIVE, skills.get(i %
NUM_SKILLS));
record.put(TEXT_COLUMN_NAME_NATIVE, skills.get(i % NUM_SKILLS));
record.put(TIME_COLUMN_NAME, System.currentTimeMillis());
- fileWriter.append(record);
+ writers.get(i % getNumAvroFiles()).append(record);
}
+ return avroFilesAndWriters.getAvroFiles();
}
- return List.of(avroFile);
}
@Override
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
index 43c5a79ee0d..04f2964de89 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
@@ -28,7 +28,6 @@ import java.util.List;
import java.util.Map;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.datasketches.theta.UpdateSketch;
@@ -99,13 +98,11 @@ public class ThetaSketchTest extends
CustomDataQueryClusterIntegrationTest {
new org.apache.avro.Schema.Field(THETA_SKETCH,
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES),
null, null)));
- // create avro file
- File avroFile = new File(_tempDir, "data.avro");
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- fileWriter.create(avroSchema, avroFile);
-
- int studentId = 0;
- int cardinality = 50;
+ int studentId = 0;
+ int cardinality = 50;
+ int recordId = 0;
+ try (AvroFilesAndWriters avroFilesAndWriters =
createAvroFilesAndWriters(avroSchema)) {
+ List<DataFileWriter<GenericData.Record>> writers =
avroFilesAndWriters.getWriters();
for (int shardId = 0; shardId < 2; shardId++) {
// populate student-course data (studentId, gender, course) for this
shard id
@@ -142,7 +139,8 @@ public class ThetaSketchTest extends
CustomDataQueryClusterIntegrationTest {
record.put(THETA_SKETCH,
ByteBuffer.wrap(sketch.compact().toByteArray()));
// add avro record to file
- fileWriter.append(record);
+ writers.get(recordId % getNumAvroFiles()).append(record);
+ recordId++;
}
// [course dimension] calculate theta sketches & add them to avro file
@@ -164,12 +162,12 @@ public class ThetaSketchTest extends
CustomDataQueryClusterIntegrationTest {
record.put(THETA_SKETCH,
ByteBuffer.wrap(sketch.compact().toByteArray()));
// add avro record to file
- fileWriter.append(record);
+ writers.get(recordId % getNumAvroFiles()).append(record);
+ recordId++;
}
}
+ return avroFilesAndWriters.getAvroFiles();
}
-
- return List.of(avroFile);
}
@Test(dataProvider = "useV1QueryEngine")
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java
index b86f384bc46..f218edf6423 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java
@@ -27,7 +27,6 @@ import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.pinot.common.function.DateTimeUtils;
import org.apache.pinot.common.function.scalar.DateTimeFunctions;
import org.apache.pinot.spi.data.FieldSpec;
@@ -89,8 +88,8 @@ public class TimestampTest extends
CustomDataQueryClusterIntegrationTest {
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
String query =
- String.format("SELECT tsBase, tsHalfDayAfter,
longBase,longHalfDayAfter FROM %s LIMIT %d", getTableName(),
- getCountStarResult());
+ String.format("SELECT tsBase, tsHalfDayAfter,
longBase,longHalfDayAfter FROM %s ORDER BY tsBase LIMIT %d",
+ getTableName(), getCountStarResult());
JsonNode jsonNode = postQuery(query);
long expectedTsBase = DateTimeFunctions.fromDateTime("2019-01-01
00:00:00", "yyyy-MM-dd HH:mm:ss");
long expectedTsHalfDayAfter = DateTimeFunctions.fromDateTime("2019-01-01
12:00:00", "yyyy-MM-dd HH:mm:ss");
@@ -489,12 +488,10 @@ public class TimestampTest extends
CustomDataQueryClusterIntegrationTest {
new Field(YYYY_MM_DD_ONE_YEAR_AFTER, create(Type.STRING), null, null)
));
- // create avro file
- File avroFile = new File(_tempDir, "data.avro");
ISOChronology chronology = ISOChronology.getInstanceUTC();
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- fileWriter.create(avroSchema, avroFile);
- long tsBaseLong = DateTimeFunctions.fromDateTime("2019-01-01 00:00:00",
"yyyy-MM-dd HH:mm:ss");
+ long tsBaseLong = DateTimeFunctions.fromDateTime("2019-01-01 00:00:00",
"yyyy-MM-dd HH:mm:ss");
+ try (AvroFilesAndWriters avroFilesAndWriters =
createAvroFilesAndWriters(avroSchema)) {
+ List<DataFileWriter<GenericData.Record>> writers =
avroFilesAndWriters.getWriters();
for (int i = 0; i < getCountStarResult(); i++) {
// Generate data
long tsHalfDayAfter = DateTimeUtils.getTimestampField(chronology,
"HOUR").add(tsBaseLong, 12);
@@ -529,10 +526,10 @@ public class TimestampTest extends
CustomDataQueryClusterIntegrationTest {
record.put(YYYY_MM_DD_ONE_YEAR_AFTER,
DateTimeFunctions.toDateTime(tsOneYearAfter, "yyyy-MM-dd"));
// add avro record to file
- fileWriter.append(record);
+ writers.get(i % getNumAvroFiles()).append(record);
tsBaseLong += 86400000;
}
+ return avroFilesAndWriters.getAvroFiles();
}
- return List.of(avroFile);
}
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
index dac31032cd2..e7408295f11 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
@@ -23,10 +23,8 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.List;
-import java.util.Random;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.datasketches.tuple.Intersection;
import org.apache.datasketches.tuple.Sketch;
import org.apache.datasketches.tuple.aninteger.IntegerSketch;
@@ -289,21 +287,18 @@ public class TupleSketchTest extends
CustomDataQueryClusterIntegrationTest {
new org.apache.avro.Schema.Field(MET_TUPLE_SKETCH_BYTES,
org.apache.avro.Schema.create(
org.apache.avro.Schema.Type.BYTES), null, null)));
- // create avro file
- File avroFile = new File(_tempDir, "data.avro");
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- fileWriter.create(avroSchema, avroFile);
- Random random = new Random();
+ try (AvroFilesAndWriters avroFilesAndWriters =
createAvroFilesAndWriters(avroSchema)) {
+ List<DataFileWriter<GenericData.Record>> writers =
avroFilesAndWriters.getWriters();
for (int i = 0; i < getCountStarResult(); i++) {
// create avro record
GenericData.Record record = new GenericData.Record(avroSchema);
- record.put(ID, random.nextInt(10));
+ record.put(ID, RANDOM.nextInt(10));
record.put(MET_TUPLE_SKETCH_BYTES,
ByteBuffer.wrap(getRandomRawValue()));
// add avro record to file
- fileWriter.append(record);
+ writers.get(i % getNumAvroFiles()).append(record);
}
+ return avroFilesAndWriters.getAvroFiles();
}
- return List.of(avroFile);
}
private byte[] getRandomRawValue() {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java
index 0c294e7de8e..18fd9296ba2 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java
@@ -24,10 +24,8 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.List;
-import java.util.Random;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.segment.local.utils.UltraLogLogUtils;
import org.apache.pinot.spi.data.FieldSpec;
@@ -55,7 +53,7 @@ public class ULLTest extends
CustomDataQueryClusterIntegrationTest {
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
String query = String.format("SELECT DISTINCT_COUNT_ULL(%s),
DISTINCT_COUNT_RAW_ULL(%s) FROM %s",
- COLUMN, COLUMN, getTableName());
+ COLUMN, COLUMN, getTableName());
JsonNode jsonNode = postQuery(query);
long distinctCount =
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
byte[] rawSketchBytes =
Base64.getDecoder().decode(jsonNode.get("resultTable").get("rows").get(0).get(1).asText());
@@ -69,13 +67,18 @@ public class ULLTest extends
CustomDataQueryClusterIntegrationTest {
public void testUnionWithSketchQueries(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
-
String query = String.format(
- "SELECT " + "DISTINCT_COUNT_ULL(%s), " + "DISTINCT_COUNT_RAW_ULL(%s) "
+ "FROM " + "("
- + "SELECT %s FROM %s WHERE %s = 4 " + "UNION ALL " + "SELECT %s
FROM %s WHERE %s = 5 " + "UNION ALL "
- + "SELECT %s FROM %s WHERE %s = 6 " + "UNION ALL " + "SELECT %s
FROM %s WHERE %s = 7 " + ")",
- COLUMN, COLUMN, COLUMN, getTableName(), ID, COLUMN,
- getTableName(), ID, COLUMN, getTableName(), ID, COLUMN,
getTableName(), ID);
+ "SELECT " + "DISTINCT_COUNT_ULL(%s), " + "DISTINCT_COUNT_RAW_ULL(%s) "
+ + "FROM " + "("
+ + "SELECT %s FROM %s WHERE %s = 4 " + "UNION ALL "
+ + "SELECT %s FROM %s WHERE %s = 5 " + "UNION ALL "
+ + "SELECT %s FROM %s WHERE %s = 6 " + "UNION ALL "
+ + "SELECT %s FROM %s WHERE %s = 7 " + ")",
+ COLUMN, COLUMN,
+ COLUMN, getTableName(), ID,
+ COLUMN, getTableName(), ID,
+ COLUMN, getTableName(), ID,
+ COLUMN, getTableName(), ID);
JsonNode jsonNode = postQuery(query);
long distinctCount =
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
byte[] rawSketchBytes =
Base64.getDecoder().decode(jsonNode.get("resultTable").get("rows").get(0).get(1).asText());
@@ -132,21 +135,18 @@ public class ULLTest extends
CustomDataQueryClusterIntegrationTest {
null), new org.apache.avro.Schema.Field(COLUMN,
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES),
null, null)));
- // create avro file
- File avroFile = new File(_tempDir, "data.avro");
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- Random random = new Random();
- fileWriter.create(avroSchema, avroFile);
+ try (AvroFilesAndWriters avroFilesAndWriters =
createAvroFilesAndWriters(avroSchema)) {
+ List<DataFileWriter<GenericData.Record>> writers =
avroFilesAndWriters.getWriters();
for (int i = 0; i < getCountStarResult(); i++) {
// create avro record
GenericData.Record record = new GenericData.Record(avroSchema);
- record.put(ID, random.nextInt(10));
+ record.put(ID, RANDOM.nextInt(10));
record.put(COLUMN, ByteBuffer.wrap(getRandomRawValue()));
// add avro record to file
- fileWriter.append(record);
+ writers.get(i % getNumAvroFiles()).append(record);
}
+ return avroFilesAndWriters.getAvroFiles();
}
- return List.of(avroFile);
}
private byte[] getRandomRawValue() {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/UnnestIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/UnnestIntegrationTest.java
index 1587aa382a9..87cd6f33ac2 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/UnnestIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/UnnestIntegrationTest.java
@@ -19,13 +19,10 @@
package org.apache.pinot.integration.tests.custom;
import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
import java.io.File;
import java.util.List;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
@@ -366,32 +363,25 @@ public class UnnestIntegrationTest extends
CustomDataQueryClusterIntegrationTest
null, null)
));
- // create avro file
- File avroFile = new File(_tempDir, "data.avro");
- Cache<Integer, GenericData.Record> recordCache =
CacheBuilder.newBuilder().build();
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- fileWriter.create(avroSchema, avroFile);
+ try (AvroFilesAndWriters avroFilesAndWriters =
createAvroFilesAndWriters(avroSchema)) {
+ List<DataFileWriter<GenericData.Record>> writers =
avroFilesAndWriters.getWriters();
for (int i = 0; i < getCountStarResult(); i++) {
+ // create avro record
+ GenericData.Record record = new GenericData.Record(avroSchema);
+ record.put(INT_COLUMN, i);
+ record.put(LONG_COLUMN, i);
+ record.put(FLOAT_COLUMN, i + RANDOM.nextFloat());
+ record.put(DOUBLE_COLUMN, i + RANDOM.nextDouble());
+ record.put(STRING_COLUMN, RandomStringUtils.insecure().next(i));
+ record.put(TIMESTAMP_COLUMN, i);
+ record.put(GROUP_BY_COLUMN, String.valueOf(i % 10));
+ record.put(LONG_ARRAY_COLUMN, List.of(0, 1, 2, 3));
+ record.put(DOUBLE_ARRAY_COLUMN, List.of(0.0, 0.1, 0.2, 0.3));
+ record.put(STRING_ARRAY_COLUMN, List.of("a", "b", "c"));
// add avro record to file
- int finalI = i;
- fileWriter.append(recordCache.get(i, () -> {
- // create avro record
- GenericData.Record record = new GenericData.Record(avroSchema);
- record.put(INT_COLUMN, finalI);
- record.put(LONG_COLUMN, finalI);
- record.put(FLOAT_COLUMN, finalI + RANDOM.nextFloat());
- record.put(DOUBLE_COLUMN, finalI + RANDOM.nextDouble());
- record.put(STRING_COLUMN,
RandomStringUtils.insecure().next(finalI));
- record.put(TIMESTAMP_COLUMN, finalI);
- record.put(GROUP_BY_COLUMN, String.valueOf(finalI % 10));
- record.put(LONG_ARRAY_COLUMN, List.of(0, 1, 2, 3));
- record.put(DOUBLE_ARRAY_COLUMN, List.of(0.0, 0.1, 0.2, 0.3));
- record.put(STRING_ARRAY_COLUMN, List.of("a", "b", "c"));
- return record;
- }
- ));
+ writers.get(i % getNumAvroFiles()).append(record);
}
+ return avroFilesAndWriters.getAvroFiles();
}
- return List.of(avroFile);
}
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java
index 2cbee3a2c2e..5f9b0a25d02 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java
@@ -24,11 +24,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.stream.IntStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.function.scalar.VectorFunctions;
import org.apache.pinot.spi.config.table.FieldConfig;
@@ -291,10 +289,8 @@ public class VectorTest extends
CustomDataQueryClusterIntegrationTest {
null, null)
));
- // create avro file
- File avroFile = new File(_tempDir, "data.avro");
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- fileWriter.create(avroSchema, avroFile);
+ try (AvroFilesAndWriters avroFilesAndWriters =
createAvroFilesAndWriters(avroSchema)) {
+ List<DataFileWriter<GenericData.Record>> writers =
avroFilesAndWriters.getWriters();
for (int i = 0; i < getCountStarResult(); i++) {
// create avro record
GenericData.Record record = new GenericData.Record(avroSchema);
@@ -316,10 +312,10 @@ public class VectorTest extends
CustomDataQueryClusterIntegrationTest {
record.put(VECTOR_ZERO_L2_DIST, VectorFunctions.l2Distance(vector1,
zeroVector));
// add avro record to file
- fileWriter.append(record);
+ writers.get(i % getNumAvroFiles()).append(record);
}
+ return avroFilesAndWriters.getAvroFiles();
}
- return List.of(avroFile);
}
private float[] createZeroVector(int vectorDimSize) {
@@ -330,9 +326,8 @@ public class VectorTest extends
CustomDataQueryClusterIntegrationTest {
private float[] createRandomVector(int vectorDimSize) {
float[] vector = new float[vectorDimSize];
- Random random = new Random();
for (int i = 0; i < vectorDimSize; i++) {
- vector[i] = random.nextFloat();
+ vector[i] = RANDOM.nextFloat();
}
return vector;
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
index cd93002c02d..612847e0270 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.File;
import java.util.List;
-import java.util.Random;
import org.apache.pinot.integration.tests.window.utils.WindowFunnelUtils;
import org.apache.pinot.spi.data.Schema;
import org.testng.annotations.Test;
@@ -472,7 +471,7 @@ public class WindowFunnelTest extends
CustomDataQueryClusterIntegrationTest {
public void testFunnelMatchStepWithMultiThreadsReduce(boolean
useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
- int numThreadsExtractFinalResult = 2 + new Random().nextInt(10);
+ int numThreadsExtractFinalResult = 2 + RANDOM.nextInt(10);
LOGGER.info("Running testFunnelMatchStepWithMultiThreadsReduce with
numThreadsExtractFinalResult: {}",
numThreadsExtractFinalResult);
String query =
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/tpch/generator/TPCHQueryGeneratorV2.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/tpch/generator/TPCHQueryGeneratorV2.java
index 678dee8a3ee..02dd39d0778 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/tpch/generator/TPCHQueryGeneratorV2.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/tpch/generator/TPCHQueryGeneratorV2.java
@@ -30,6 +30,7 @@ import org.codehaus.jettison.json.JSONException;
public class TPCHQueryGeneratorV2 {
+ private static final Random RANDOM = new Random();
private static final Map<String, Table> TABLES_MAP = new HashMap<>();
private static final List<String> TABLE_NAMES =
List.of("nation", "region", "supplier", "customer", "part", "partsupp",
"orders", "lineitem");
@@ -48,8 +49,7 @@ public class TPCHQueryGeneratorV2 {
}
private static Table getRandomTable() {
- Random random = new Random();
- int index = random.nextInt(TABLES_MAP.size());
+ int index = RANDOM.nextInt(TABLES_MAP.size());
return TABLES_MAP.get(TABLE_NAMES.get(index));
}
@@ -138,12 +138,11 @@ public class TPCHQueryGeneratorV2 {
}
private List<String> getRandomProjections(Table t1) {
- Random random = new Random();
- int numColumns = random.nextInt(t1.getColumns().size()) + 1;
+ int numColumns = RANDOM.nextInt(t1.getColumns().size()) + 1;
List<String> selectedColumns = new ArrayList<>();
while (selectedColumns.size() < numColumns) {
- String columnName =
t1.getColumns().get(random.nextInt(t1.getColumns().size())).getColumnName();
+ String columnName =
t1.getColumns().get(RANDOM.nextInt(t1.getColumns().size())).getColumnName();
if (!selectedColumns.contains(columnName)) {
selectedColumns.add(columnName);
}
@@ -155,12 +154,11 @@ public class TPCHQueryGeneratorV2 {
private String generateInnerQueryForPredicate(Table t1, Column c) {
QuerySkeleton innerQuery = new QuerySkeleton();
- Random random = new Random();
List<String> predicates = new ArrayList<>();
innerQuery.addTable(t1.getTableName());
// Limit to maximum of 1 join
- if (random.nextBoolean()) {
- RelatedTable relatedTable =
t1.getRelatedTables().get(random.nextInt(t1.getRelatedTables().size()));
+ if (RANDOM.nextBoolean()) {
+ RelatedTable relatedTable =
t1.getRelatedTables().get(RANDOM.nextInt(t1.getRelatedTables().size()));
if (relatedTable != null) {
innerQuery.addTable(relatedTable.getForeignTableName());
predicates.add(relatedTable.getLocalTableKey() + "=" +
relatedTable.getForeignTableKey());
@@ -169,7 +167,7 @@ public class TPCHQueryGeneratorV2 {
predicates.addAll(inp);
}
}
- String aggregation =
c.getColumnType()._aggregations.get(random.nextInt(c.getColumnType()._aggregations.size()));
+ String aggregation =
c.getColumnType()._aggregations.get(RANDOM.nextInt(c.getColumnType()._aggregations.size()));
innerQuery.addProjection(aggregation + "(" + c.getColumnName() + ")");
List<String> inp = getRandomPredicates(t1, false);
@@ -181,8 +179,7 @@ public class TPCHQueryGeneratorV2 {
}
private String getRandomValueForPredicate(Table t1, Column c, boolean
useNextedQueries) {
- Random random = new Random();
- if (random.nextBoolean() && useNextedQueries &&
c.getColumnType()._aggregations.size() > 0) {
+ if (RANDOM.nextBoolean() && useNextedQueries &&
c.getColumnType()._aggregations.size() > 0) {
// Use nested query for predicate
String nestedQueries = generateInnerQueryForPredicate(t1, c);
return "(" + nestedQueries + ")";
@@ -196,15 +193,14 @@ public class TPCHQueryGeneratorV2 {
}
private List<String> getRandomPredicates(Table t1, boolean useNestedQueries)
{
- Random random = new Random();
- int predicateCount = random.nextInt(5) + 1;
+ int predicateCount = RANDOM.nextInt(5) + 1;
List<String> predicates = new ArrayList<>();
List<String> results = new ArrayList<>();
while (predicates.size() < predicateCount) {
- Column column =
t1.getColumns().get(random.nextInt(t1.getColumns().size()));
+ Column column =
t1.getColumns().get(RANDOM.nextInt(t1.getColumns().size()));
predicates.add(column.getColumnName());
ColumnType columnType = column.getColumnType();
- String operator =
columnType._operators.get(random.nextInt(columnType._operators.size()));
+ String operator =
columnType._operators.get(RANDOM.nextInt(columnType._operators.size()));
String value = getRandomValueForPredicate(t1, column, useNestedQueries);
String predicateBuilder = column.getColumnName() + " " + operator + " "
+ value + " ";
results.add(predicateBuilder);
@@ -218,17 +214,16 @@ public class TPCHQueryGeneratorV2 {
}
private List<String> getRandomOrderBys(Table t1) {
- Random random = new Random();
- int orderByCount = random.nextInt(2) + 1;
+ int orderByCount = RANDOM.nextInt(2) + 1;
List<String> orderBys = new ArrayList<>();
List<String> results = new ArrayList<>();
while (orderBys.size() < orderByCount) {
- Column column =
t1.getColumns().get(random.nextInt(t1.getColumns().size()));
+ Column column =
t1.getColumns().get(RANDOM.nextInt(t1.getColumns().size()));
orderBys.add(column.getColumnName());
String name = column.getColumnName();
StringBuilder orderByBuilder = new StringBuilder();
orderByBuilder.append(name).append(" ");
- if (random.nextBoolean()) {
+ if (RANDOM.nextBoolean()) {
orderByBuilder.append(" DESC ");
}
results.add(orderByBuilder.toString());
@@ -261,15 +256,14 @@ public class TPCHQueryGeneratorV2 {
if (groupByCols.size() == 0) {
return result;
}
- Random random = new Random();
List<String> orderBys = new ArrayList<>();
- int orderByCount = random.nextInt(groupByCols.size()) + 1;
+ int orderByCount = RANDOM.nextInt(groupByCols.size()) + 1;
while (orderBys.size() < orderByCount) {
- String column = groupByCols.get(random.nextInt(groupByCols.size()));
+ String column = groupByCols.get(RANDOM.nextInt(groupByCols.size()));
if (groupByCols.contains(column)) {
orderBys.add(column);
- if (random.nextBoolean()) {
+ if (RANDOM.nextBoolean()) {
result.add(column + " DESC");
} else {
result.add(column);
@@ -291,14 +285,13 @@ public class TPCHQueryGeneratorV2 {
}
}
- Random random = new Random();
- RelatedTable rt =
t1.getRelatedTables().get(random.nextInt(t1.getRelatedTables().size()));
+ RelatedTable rt =
t1.getRelatedTables().get(RANDOM.nextInt(t1.getRelatedTables().size()));
Table t2 = TABLES_MAP.get(rt.getForeignTableName());
getRandomProjections(t1).forEach(querySkeleton::addProjection);
getRandomProjections(t2).forEach(querySkeleton::addProjection);
String t2NameWithJoin =
- t1.getTableName() + " " +
JOIN_TYPES[random.nextInt(JOIN_TYPES.length)] + " " + t2.getTableName() + " ON "
+ t1.getTableName() + " " +
JOIN_TYPES[RANDOM.nextInt(JOIN_TYPES.length)] + " " + t2.getTableName() + " ON "
+ rt.getLocalTableKey() + " = " + rt.getForeignTableKey() + " ";
querySkeleton.addTable(t2NameWithJoin);
@@ -318,20 +311,19 @@ public class TPCHQueryGeneratorV2 {
}
private Pair<List<String>, List<String>> getGroupByAndAggregates(Table t1) {
- Random random = new Random();
- int numColumns = random.nextInt(t1.getColumns().size()) + 1;
+ int numColumns = RANDOM.nextInt(t1.getColumns().size()) + 1;
List<String> selectedColumns = new ArrayList<>();
List<String> groupByColumns = new ArrayList<>();
List<String> resultProjections = new ArrayList<>();
while (selectedColumns.size() < numColumns) {
- Column column =
t1.getColumns().get(random.nextInt(t1.getColumns().size()));
+ Column column =
t1.getColumns().get(RANDOM.nextInt(t1.getColumns().size()));
String columnName = column.getColumnName();
if (!selectedColumns.contains(columnName)) {
- if (random.nextBoolean() &&
column.getColumnType()._aggregations.size() > 0) {
+ if (RANDOM.nextBoolean() &&
column.getColumnType()._aggregations.size() > 0) {
// Use as aggregation
String aggregation =
-
column.getColumnType()._aggregations.get(random.nextInt(column.getColumnType()._aggregations.size()));
+
column.getColumnType()._aggregations.get(RANDOM.nextInt(column.getColumnType()._aggregations.size()));
resultProjections.add(aggregation + "(" + columnName + ")");
} else {
// Use as group by
@@ -377,8 +369,7 @@ public class TPCHQueryGeneratorV2 {
}
}
- Random random = new Random();
- RelatedTable rt =
t1.getRelatedTables().get(random.nextInt(t1.getRelatedTables().size()));
+ RelatedTable rt =
t1.getRelatedTables().get(RANDOM.nextInt(t1.getRelatedTables().size()));
Table t2 = TABLES_MAP.get(rt.getForeignTableName());
Pair<List<String>, List<String>> groupByColumns =
getGroupByAndAggregates(t1);
groupByColumns.getLeft().forEach(querySkeleton::addProjection);
@@ -387,7 +378,7 @@ public class TPCHQueryGeneratorV2 {
groupByColumnsT2.getLeft().forEach(querySkeleton::addProjection);
String tName =
- t1.getTableName() + " " +
JOIN_TYPES[random.nextInt(JOIN_TYPES.length)] + " " + t2.getTableName() + " ON "
+ t1.getTableName() + " " +
JOIN_TYPES[RANDOM.nextInt(JOIN_TYPES.length)] + " " + t2.getTableName() + " ON "
+ " " + rt.getLocalTableKey() + " = " + " " +
rt.getForeignTableKey() + " ";
querySkeleton.addTable(tName);
@@ -415,7 +406,6 @@ public class TPCHQueryGeneratorV2 {
List<Table> tables = new ArrayList<>();
Set<String> tableNames = new HashSet<>();
- Random random = new Random();
// Start off with a random table with related tables
while (true) {
@@ -428,10 +418,10 @@ public class TPCHQueryGeneratorV2 {
}
// Add more tables
- while (random.nextInt() % 8 != 0) {
- int tableToAddIdx = random.nextInt(tables.size());
+ while (RANDOM.nextInt() % 8 != 0) {
+ int tableToAddIdx = RANDOM.nextInt(tables.size());
RelatedTable relatedTable = tables.get(tableToAddIdx).getRelatedTables()
-
.get(random.nextInt(tables.get(tableToAddIdx).getRelatedTables().size()));
+
.get(RANDOM.nextInt(tables.get(tableToAddIdx).getRelatedTables().size()));
if (!tableNames.contains(relatedTable.getForeignTableName())) {
tableNames.add(relatedTable.getForeignTableName());
tables.add(TPCHQueryGeneratorV2.TABLES_MAP.get(relatedTable.getForeignTableName()));
@@ -475,7 +465,6 @@ public class TPCHQueryGeneratorV2 {
List<Table> tables = new ArrayList<>();
Set<String> tableNames = new HashSet<>();
- Random random = new Random();
// Start off with a random table with related tables
while (true) {
@@ -488,10 +477,10 @@ public class TPCHQueryGeneratorV2 {
}
// Add more tables
- while (random.nextInt() % 8 != 0) {
- int tableToAddIdx = random.nextInt(tables.size());
+ while (RANDOM.nextInt() % 8 != 0) {
+ int tableToAddIdx = RANDOM.nextInt(tables.size());
RelatedTable relatedTable = tables.get(tableToAddIdx).getRelatedTables()
-
.get(random.nextInt(tables.get(tableToAddIdx).getRelatedTables().size()));
+
.get(RANDOM.nextInt(tables.get(tableToAddIdx).getRelatedTables().size()));
if (!tableNames.contains(relatedTable.getForeignTableName())) {
tableNames.add(relatedTable.getForeignTableName());
tables.add(TPCHQueryGeneratorV2.TABLES_MAP.get(relatedTable.getForeignTableName()));
@@ -529,9 +518,8 @@ public class TPCHQueryGeneratorV2 {
}
public String generateRandomQuery() {
- Random random = new Random();
- int queryType = random.nextInt(6);
- boolean includePredicates = random.nextBoolean();
+ int queryType = RANDOM.nextInt(6);
+ boolean includePredicates = RANDOM.nextBoolean();
boolean includeOrderBy = true;
switch (queryType) {
case 0:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]