This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cf9095c68c Improve CustomDataQueryClusterIntegrationTest to use 2 
servers and 2 segments to ensure the behavior of merge is expected. (#17284)
9cf9095c68c is described below

commit 9cf9095c68c709bd675dbff3571a48ba0f741059
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Dec 11 03:20:55 2025 -0800

    Improve CustomDataQueryClusterIntegrationTest to use 2 servers and 2 
segments to ensure the behavior of merge is expected. (#17284)
    
    1. Increase all CustomDataQueryClusterIntegrationTest test suite to use 2 
servers and 2 segments to ensure the query behaviors like intermediate results 
serde/ merge/exchange etc are expected.
    2. Change SumArrayLong and SumArrayDouble Agg function intermediate result 
type to OBJECT so the DataTable serde will work automatically for them.
    3. Fix StUnion object merge issue
    4. Fix ULL merge issue when aggregation on bytes column which may use 
different p, then return ULL object with default p will cause merge failure.
---
 .../DistinctCountULLAggregationFunction.java       |  26 ++--
 .../function/StUnionAggregationFunction.java       |  37 +++++-
 .../array/SumArrayDoubleAggregationFunction.java   |   5 +-
 .../array/SumArrayLongAggregationFunction.java     |   5 +-
 .../function/StUnionAggregationFunctionTest.java   | 106 +++++++++++++++
 .../tests/CLPEncodingRealtimeIntegrationTest.java  |   4 +-
 .../OfflineTimestampIndexIntegrationTest.java      |   5 +-
 .../pinot/integration/tests/custom/ArrayTest.java  | 146 +++++++++++----------
 .../integration/tests/custom/BytesTypeTest.java    |  10 +-
 .../integration/tests/custom/CpcSketchTest.java    |  15 +--
 .../CustomDataQueryClusterIntegrationTest.java     |  55 ++++++++
 .../tests/custom/FloatingPointDataTypeTest.java    |  15 +--
 .../integration/tests/custom/GeoSpatialTest.java   |  12 +-
 .../integration/tests/custom/JsonPathTest.java     |  58 ++++----
 .../tests/custom/MapFieldTypeRealtimeTest.java     |  10 +-
 .../integration/tests/custom/MapFieldTypeTest.java |  71 +++++-----
 .../integration/tests/custom/MapTypeTest.java      |  93 ++++++-------
 .../tests/custom/MultiColumnTextIndicesTest.java   |  10 +-
 .../integration/tests/custom/SumPrecisionTest.java |  19 +--
 .../integration/tests/custom/TextIndicesTest.java  |  10 +-
 .../integration/tests/custom/ThetaSketchTest.java  |  22 ++--
 .../integration/tests/custom/TimestampTest.java    |  17 +--
 .../integration/tests/custom/TupleSketchTest.java  |  15 +--
 .../pinot/integration/tests/custom/ULLTest.java    |  34 ++---
 .../tests/custom/UnnestIntegrationTest.java        |  42 +++---
 .../pinot/integration/tests/custom/VectorTest.java |  15 +--
 .../integration/tests/custom/WindowFunnelTest.java |   3 +-
 .../tests/tpch/generator/TPCHQueryGeneratorV2.java |  78 +++++------
 28 files changed, 547 insertions(+), 391 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountULLAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountULLAggregationFunction.java
index 1a036ae5342..edd3fcc41ed 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountULLAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountULLAggregationFunction.java
@@ -22,6 +22,7 @@ import com.dynatrace.hash4j.distinctcount.UltraLogLog;
 import com.google.common.base.Preconditions;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
@@ -305,11 +306,12 @@ public class DistinctCountULLAggregationFunction extends 
BaseSingleInputAggregat
     }
   }
 
+  @Nullable
   @Override
   public UltraLogLog extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
     Object result = aggregationResultHolder.getResult();
     if (result == null) {
-      return UltraLogLog.create(_p);
+      return null;
     }
 
     if (result instanceof DictIdsWrapper) {
@@ -321,11 +323,12 @@ public class DistinctCountULLAggregationFunction extends 
BaseSingleInputAggregat
     }
   }
 
+  @Nullable
   @Override
   public UltraLogLog extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
     Object result = groupByResultHolder.getResult(groupKey);
     if (result == null) {
-      return UltraLogLog.create(_p);
+      return null;
     }
 
     if (result instanceof DictIdsWrapper) {
@@ -337,13 +340,20 @@ public class DistinctCountULLAggregationFunction extends 
BaseSingleInputAggregat
     }
   }
 
+  @Nullable
   @Override
-  public UltraLogLog merge(UltraLogLog intermediateResult1, UltraLogLog 
intermediateResult2) {
-    int largerP = Math.max(intermediateResult1.getP(), 
intermediateResult2.getP());
-    UltraLogLog merged = UltraLogLog.create(largerP);
-    merged.add(intermediateResult1);
-    merged.add(intermediateResult2);
-    return merged;
+  public UltraLogLog merge(@Nullable UltraLogLog intermediateResult1, 
@Nullable UltraLogLog intermediateResult2) {
+    if (intermediateResult1 == null) {
+      return intermediateResult2;
+    }
+    if (intermediateResult2 == null) {
+      return intermediateResult1;
+    }
+    // UltraLogLog object doesn't support merging a smaller p object into a 
larger p object.
+    if (intermediateResult1.getP() > intermediateResult2.getP()) {
+      return intermediateResult2.add(intermediateResult1);
+    }
+    return intermediateResult1.add(intermediateResult2);
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunction.java
index 3864899f819..6632d5a62d3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunction.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function;
 
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
@@ -34,6 +35,8 @@ import org.apache.pinot.segment.local.utils.GeometryUtils;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.util.GeometryCombiner;
+import org.locationtech.jts.operation.union.UnaryUnionOp;
 
 
 public class StUnionAggregationFunction extends 
BaseSingleInputAggregationFunction<Geometry, ByteArray> {
@@ -63,8 +66,7 @@ public class StUnionAggregationFunction extends 
BaseSingleInputAggregationFuncti
     byte[][] bytesArray = blockValSetMap.get(_expression).getBytesValuesSV();
     Geometry geometry = aggregationResultHolder.getResult();
     for (int i = 0; i < length; i++) {
-      Geometry value = GeometrySerializer.deserialize(bytesArray[i]);
-      geometry = geometry == null ? value : geometry.union(value);
+      geometry = union(geometry, 
GeometrySerializer.deserialize(bytesArray[i]));
     }
     aggregationResultHolder.setValue(geometry);
   }
@@ -77,7 +79,7 @@ public class StUnionAggregationFunction extends 
BaseSingleInputAggregationFuncti
       int groupKey = groupKeyArray[i];
       Geometry value = GeometrySerializer.deserialize(bytesArray[i]);
       Geometry geometry = groupByResultHolder.getResult(groupKey);
-      groupByResultHolder.setValueForKey(groupKey, geometry == null ? value : 
geometry.union(value));
+      groupByResultHolder.setValueForKey(groupKey, union(geometry, value));
     }
   }
 
@@ -89,7 +91,7 @@ public class StUnionAggregationFunction extends 
BaseSingleInputAggregationFuncti
       Geometry value = GeometrySerializer.deserialize(bytesArray[i]);
       for (int groupKey : groupKeysArray[i]) {
         Geometry geometry = groupByResultHolder.getResult(groupKey);
-        groupByResultHolder.setValueForKey(groupKey, geometry == null ? value 
: geometry.union(value));
+        groupByResultHolder.setValueForKey(groupKey, union(geometry, value));
       }
     }
   }
@@ -107,8 +109,8 @@ public class StUnionAggregationFunction extends 
BaseSingleInputAggregationFuncti
   }
 
   @Override
-  public Geometry merge(Geometry intermediateResult1, Geometry 
intermediateResult2) {
-    return intermediateResult1.union(intermediateResult2);
+  public Geometry merge(@Nullable Geometry intermediateResult1, @Nullable 
Geometry intermediateResult2) {
+    return union(intermediateResult1, intermediateResult2);
   }
 
   @Override
@@ -136,4 +138,27 @@ public class StUnionAggregationFunction extends 
BaseSingleInputAggregationFuncti
   public ByteArray extractFinalResult(Geometry geometry) {
     return new ByteArray(GeometrySerializer.serialize(geometry));
   }
+
+  /**
+   * Returns the union of the supplied geometries.
+   *
+   * <p>When either operand is a {@code GeometryCollection}, {@link 
Geometry#union(Geometry)} can produce invalid
+   * topologies or drop components because it expects homogeneous inputs.  The 
{@link UnaryUnionOp} implementation is
+   * purpose-built for arbitrary collections, so we first combine the 
components and delegate to it to ensure a valid
+   * and deterministic result.</p>
+   */
+  @Nullable
+  private static Geometry union(@Nullable Geometry left, @Nullable Geometry 
right) {
+    if (left == null) {
+      return right;
+    }
+    if (right == null) {
+      return left;
+    }
+    if (Geometry.TYPENAME_GEOMETRYCOLLECTION.equals(left.getGeometryType())
+        || 
Geometry.TYPENAME_GEOMETRYCOLLECTION.equals(right.getGeometryType())) {
+      return UnaryUnionOp.union(GeometryCombiner.combine(left, right));
+    }
+    return left.union(right);
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/SumArrayDoubleAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/SumArrayDoubleAggregationFunction.java
index df923c92d79..c67ecab1b43 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/SumArrayDoubleAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/SumArrayDoubleAggregationFunction.java
@@ -138,7 +138,10 @@ public class SumArrayDoubleAggregationFunction
 
   @Override
   public DataSchema.ColumnDataType getIntermediateResultColumnType() {
-    return DataSchema.ColumnDataType.DOUBLE_ARRAY;
+    // AggregationResultsBlock#setIntermediateResult and 
AggregationFunctionUtils#getIntermediateResult only serialize
+    // intermediate results whose stored type is 
INT/LONG/DOUBLE/STRING/OBJECT, so we need the OBJECT/custom ser-de path
+    // for DoubleArrayList payloads.
+    return DataSchema.ColumnDataType.OBJECT;
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/SumArrayLongAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/SumArrayLongAggregationFunction.java
index 0b3d3094e06..487504b9057 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/SumArrayLongAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/SumArrayLongAggregationFunction.java
@@ -148,7 +148,10 @@ public class SumArrayLongAggregationFunction extends 
BaseSingleInputAggregationF
 
   @Override
   public DataSchema.ColumnDataType getIntermediateResultColumnType() {
-    return DataSchema.ColumnDataType.LONG_ARRAY;
+    // AggregationResultsBlock#setIntermediateResult and 
AggregationFunctionUtils#getIntermediateResult only support
+    // INT/LONG/DOUBLE/STRING/OBJECT stored types, so LongArrayList must use 
OBJECT to piggyback on the custom ser-de
+    // implemented in ObjectSerDeUtils.
+    return DataSchema.ColumnDataType.OBJECT;
   }
 
   @Override
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunctionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunctionTest.java
new file mode 100644
index 00000000000..79f0acfabe2
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunctionTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.SyntheticBlockValSets;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.segment.local.utils.GeometrySerializer;
+import org.apache.pinot.segment.local.utils.GeometryUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.util.GeometryCombiner;
+import org.locationtech.jts.operation.union.UnaryUnionOp;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class StUnionAggregationFunctionTest {
+  @Test
+  public void testMergeHandlesGeometryCollectionInputs()
+      throws Exception {
+    ExpressionContext expression = 
ExpressionContext.forIdentifier("geometryColumn");
+    StUnionAggregationFunction aggregationFunction = new 
StUnionAggregationFunction(List.of(expression));
+    Geometry polygon = GeometryUtils.GEOMETRY_WKT_READER.read("POLYGON((0 0, 0 
5, 5 5, 5 0, 0 0))");
+    Geometry line = GeometryUtils.GEOMETRY_WKT_READER.read("LINESTRING(20 20, 
25 25)");
+    Geometry geometryCollection =
+        GeometryUtils.GEOMETRY_WKT_READER.read("GEOMETRYCOLLECTION (POINT (30 
30), LINESTRING (35 35, 40 40))");
+
+    Geometry intermediate = aggregationFunction.merge(polygon, line);
+    Geometry result = aggregationFunction.merge(intermediate, 
geometryCollection);
+
+    Geometry expected = UnaryUnionOp.union(GeometryCombiner.combine(polygon, 
line, geometryCollection));
+    assertEquals(result, expected);
+  }
+
+  @Test
+  public void testAggregateHandlesMixedDimensionSequence()
+      throws Exception {
+    ExpressionContext expression = 
ExpressionContext.forIdentifier("geometryColumn");
+    StUnionAggregationFunction aggregationFunction = new 
StUnionAggregationFunction(List.of(expression));
+
+    Geometry polygon = GeometryUtils.GEOMETRY_WKT_READER.read("POLYGON((0 0, 0 
5, 5 5, 5 0, 0 0))");
+    Geometry line = GeometryUtils.GEOMETRY_WKT_READER.read("LINESTRING(10 10, 
15 15)");
+    Geometry geometryCollection =
+        GeometryUtils.GEOMETRY_WKT_READER.read("GEOMETRYCOLLECTION (POINT (20 
20), LINESTRING (25 25, 30 30))");
+
+    byte[][] values = new byte[][]{
+        GeometrySerializer.serialize(polygon),
+        GeometrySerializer.serialize(line),
+        GeometrySerializer.serialize(geometryCollection)
+    };
+
+    AggregationResultHolder aggregationResultHolder = 
aggregationFunction.createAggregationResultHolder();
+    Map<ExpressionContext, BlockValSet> blockValSetMap =
+        Collections.singletonMap(expression, new BytesBlockValSet(values));
+
+    aggregationFunction.aggregate(values.length, aggregationResultHolder, 
blockValSetMap);
+
+    Geometry expected = UnaryUnionOp.union(GeometryCombiner.combine(polygon, 
line, geometryCollection));
+    assertEquals(aggregationResultHolder.getResult(), expected);
+  }
+
+  private static class BytesBlockValSet extends SyntheticBlockValSets.Base {
+    private final byte[][] _values;
+
+    private BytesBlockValSet(byte[][] values) {
+      _values = values;
+    }
+
+    @Override
+    public FieldSpec.DataType getValueType() {
+      return FieldSpec.DataType.BYTES;
+    }
+
+    @Override
+    public boolean isSingleValue() {
+      return true;
+    }
+
+    @Override
+    public byte[][] getBytesValuesSV() {
+      return _values;
+    }
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
index cc762327ceb..dab36f2c1a9 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
@@ -22,7 +22,6 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Random;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -38,7 +37,6 @@ import org.testng.annotations.Test;
 public class CLPEncodingRealtimeIntegrationTest extends 
BaseClusterIntegrationTestSet {
   private List<File> _avroFiles;
   private FieldConfig.CompressionCodec _selectedCompressionCodec;
-  private final Random _random = new Random();
 
   @BeforeClass
   public void setUp()
@@ -48,7 +46,7 @@ public class CLPEncodingRealtimeIntegrationTest extends 
BaseClusterIntegrationTe
 
     // Randomly select CLP or CLPV2 compression codec
     _selectedCompressionCodec =
-        _random.nextBoolean() ? FieldConfig.CompressionCodec.CLP : 
FieldConfig.CompressionCodec.CLPV2;
+        RANDOM.nextBoolean() ? FieldConfig.CompressionCodec.CLP : 
FieldConfig.CompressionCodec.CLPV2;
 
     // Start the Pinot cluster
     startZk();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/OfflineTimestampIndexIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineTimestampIndexIntegrationTest.java
similarity index 97%
rename from 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/OfflineTimestampIndexIntegrationTest.java
rename to 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineTimestampIndexIntegrationTest.java
index 180b1abf1ad..1c040ebefdd 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/OfflineTimestampIndexIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineTimestampIndexIntegrationTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.integration.tests.custom;
+package org.apache.pinot.integration.tests;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
@@ -24,9 +24,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.regex.Pattern;
-import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
-import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
-import org.apache.pinot.integration.tests.ExplainIntegrationTestTrait;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TimestampConfig;
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
index 6ddc610a087..80349aaa192 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
@@ -25,7 +25,6 @@ import java.io.File;
 import java.util.List;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
@@ -234,32 +233,40 @@ public class ArrayTest extends 
CustomDataQueryClusterIntegrationTest {
 
     query =
         String.format("SELECT "
-            + "listAgg(stringCol, ' | ') WITHIN GROUP (ORDER BY stringCol) "
-            + "FROM %s LIMIT %d", getTableName(), getCountStarResult());
+            + "listAgg(stringCol, ' | ') WITHIN GROUP (ORDER BY stringCol), 
intCol "
+            + "FROM %s GROUP BY intCol LIMIT %d", getTableName(), 
getCountStarResult());
     jsonNode = postQuery(query);
     rows = jsonNode.get("resultTable").get("rows");
-    assertEquals(rows.size(), 1);
-    row = rows.get(0);
-    assertEquals(row.size(), 1);
-    String[] splits = row.get(0).asText().split(" \\| ");
-    assertEquals(splits.length, getCountStarResult());
-    for (int i = 1; i < splits.length; i++) {
-      assertTrue(splits[i].compareTo(splits[i - 1]) >= 0);
+    assertEquals(rows.size(), getCountStarResult() / 10);
+    for (int rowId = 0; rowId < getCountStarResult() / 10; rowId++) {
+      row = rows.get(rowId);
+      assertEquals(row.size(), 2);
+      String[] splits = row.get(0).asText().split(" \\| ");
+      if (splits.length > 0) {
+        assertEquals(splits.length, 10);
+        for (int i = 1; i < splits.length; i++) {
+          assertTrue(splits[i].compareTo(splits[i - 1]) >= 0);
+        }
+      }
     }
 
     query =
         String.format("SELECT "
-            + "listAgg(cast(doubleCol AS VARCHAR), ' | ') WITHIN GROUP (ORDER 
BY doubleCol) "
-            + "FROM %s LIMIT %d", getTableName(), getCountStarResult());
+            + "listAgg(cast(doubleCol AS VARCHAR), ' | ') WITHIN GROUP (ORDER 
BY doubleCol), stringCol "
+            + "FROM %s GROUP BY stringCol LIMIT %d", getTableName(), 
getCountStarResult());
     jsonNode = postQuery(query);
     rows = jsonNode.get("resultTable").get("rows");
-    assertEquals(rows.size(), 1);
-    row = rows.get(0);
-    assertEquals(row.size(), 1);
-    splits = row.get(0).asText().split(" \\| ");
-    assertEquals(splits.length, getCountStarResult());
-    for (int i = 1; i < splits.length; i++) {
-      assertTrue(Double.parseDouble(splits[i]) >= Double.parseDouble(splits[i 
- 1]));
+    assertEquals(rows.size(), getCountStarResult() / 10);
+    for (int rowId = 0; rowId < getCountStarResult() / 10; rowId++) {
+      row = rows.get(rowId);
+      assertEquals(row.size(), 2);
+      String[] splits = row.get(0).asText().split(" \\| ");
+      if (splits.length > 0) {
+        assertEquals(splits.length, 10);
+        for (int i = 1; i < splits.length; i++) {
+          assertTrue(splits[i].compareTo(splits[i - 1]) >= 0);
+        }
+      }
     }
   }
 
@@ -341,32 +348,38 @@ public class ArrayTest extends 
CustomDataQueryClusterIntegrationTest {
 
     query =
         String.format("SELECT "
-            + "listAgg(DISTINCT stringCol, ' | ') WITHIN GROUP (ORDER BY 
stringCol) "
-            + "FROM %s LIMIT %d", getTableName(), getCountStarResult());
+            + "listAgg(DISTINCT stringCol, ' | ') WITHIN GROUP (ORDER BY 
stringCol), intCol "
+            + "FROM %s GROUP BY intCol LIMIT %d", getTableName(), 
getCountStarResult());
     jsonNode = postQuery(query);
     rows = jsonNode.get("resultTable").get("rows");
-    assertEquals(rows.size(), 1);
-    row = rows.get(0);
-    assertEquals(row.size(), 1);
-    String[] splits = row.get(0).asText().split(" \\| ");
-    assertEquals(splits.length, getCountStarResult() / 10);
-    for (int j = 1; j < splits.length; j++) {
-      assertTrue(splits[j].compareTo(splits[j - 1]) > 0);
+    assertEquals(rows.size(), getCountStarResult() / 10);
+    for (int rowId = 0; rowId < getCountStarResult() / 10; rowId++) {
+      row = rows.get(rowId);
+      assertEquals(row.size(), 2);
+      String[] splits = row.get(0).asText().split(" \\| ");
+      if (splits.length > 1) {
+        assertEquals(splits.length, getCountStarResult());
+        for (int i = 1; i < splits.length; i++) {
+          assertTrue(splits[i].compareTo(splits[i - 1]) >= 0);
+        }
+      }
     }
 
     query =
         String.format("SELECT "
-            + "listAgg(DISTINCT cast(doubleCol AS VARCHAR), ' | ') WITHIN 
GROUP (ORDER BY doubleCol) "
-            + "FROM %s LIMIT %d", getTableName(), getCountStarResult());
+            + "listAgg(DISTINCT cast(doubleCol AS VARCHAR), ' | ') WITHIN 
GROUP (ORDER BY doubleCol), stringCol "
+            + "FROM %s GROUP BY stringCol LIMIT %d", getTableName(), 
getCountStarResult());
     jsonNode = postQuery(query);
     rows = jsonNode.get("resultTable").get("rows");
-    assertEquals(rows.size(), 1);
-    row = rows.get(0);
-    assertEquals(row.size(), 1);
-    splits = row.get(0).asText().split(" \\| ");
-    assertEquals(splits.length, getCountStarResult() / 10);
-    for (int j = 1; j < splits.length; j++) {
-      assertTrue(Double.parseDouble(splits[j]) > Double.parseDouble(splits[j - 
1]));
+    assertEquals(rows.size(), getCountStarResult() / 10);
+    for (int rowId = 0; rowId < getCountStarResult() / 10; rowId++) {
+      row = rows.get(rowId);
+      assertEquals(row.size(), 2);
+      String[] splits = row.get(0).asText().split(" \\| ");
+      assertEquals(splits.length, 1);
+      for (int j = 1; j < splits.length; j++) {
+        assertTrue(Double.parseDouble(splits[j]) > Double.parseDouble(splits[j 
- 1]));
+      }
     }
   }
 
@@ -845,7 +858,7 @@ public class ArrayTest extends 
CustomDataQueryClusterIntegrationTest {
             + "GENERATE_ARRAY(1, 3, -1) "
             + "FROM %s LIMIT 1", getTableName());
     JsonNode jsonNode = postQuery(query);
-    assertEquals(jsonNode.get("exceptions").size(), 1);
+    assertEquals(jsonNode.get("exceptions").size(), getNumAvroFiles());
   }
 
   @Test(dataProvider = "useV1QueryEngine")
@@ -894,7 +907,7 @@ public class ArrayTest extends 
CustomDataQueryClusterIntegrationTest {
             + "GENERATE_ARRAY(2147483648, 2147483650, -1) "
             + "FROM %s LIMIT 1", getTableName());
     JsonNode jsonNode = postQuery(query);
-    assertEquals(jsonNode.get("exceptions").size(), 1);
+    assertEquals(jsonNode.get("exceptions").size(), getNumAvroFiles());
   }
 
   @Test(dataProvider = "useV1QueryEngine")
@@ -944,7 +957,7 @@ public class ArrayTest extends 
CustomDataQueryClusterIntegrationTest {
             + "GENERATE_ARRAY(0.3, 0.1, 1.1) "
             + "FROM %s LIMIT 1", getTableName());
     JsonNode jsonNode = postQuery(query);
-    assertEquals(jsonNode.get("exceptions").size(), 1);
+    assertEquals(jsonNode.get("exceptions").size(), getNumAvroFiles());
   }
 
   @Test(dataProvider = "useV1QueryEngine")
@@ -994,7 +1007,7 @@ public class ArrayTest extends 
CustomDataQueryClusterIntegrationTest {
             + "GENERATE_ARRAY(CAST(0.3 AS DOUBLE), CAST(0.1 AS DOUBLE), 
CAST(1.1 AS DOUBLE)) "
             + "FROM %s LIMIT 1", getTableName());
     JsonNode jsonNode = postQuery(query);
-    assertEquals(jsonNode.get("exceptions").size(), 1);
+    assertEquals(jsonNode.get("exceptions").size(), getNumAvroFiles());
   }
 
   @Test(dataProvider = "useBothQueryEngines")
@@ -1139,37 +1152,36 @@ public class ArrayTest extends 
CustomDataQueryClusterIntegrationTest {
             null, null)
     ));
 
-    // create avro file
-    File avroFile = new File(_tempDir, "data.avro");
     Cache<Integer, GenericData.Record> recordCache = 
CacheBuilder.newBuilder().build();
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      fileWriter.create(avroSchema, avroFile);
+    try (AvroFilesAndWriters avroFilesAndWriters = 
createAvroFilesAndWriters(avroSchema)) {
+      List<DataFileWriter<GenericData.Record>> writers = 
avroFilesAndWriters.getWriters();
       for (int i = 0; i < getCountStarResult(); i++) {
         // add avro record to file
         int finalI = i;
-        fileWriter.append(recordCache.get((int) (i % (getCountStarResult() / 
10)), () -> {
-              // create avro record
-              GenericData.Record record = new GenericData.Record(avroSchema);
-              record.put(BOOLEAN_COLUMN, finalI % 4 == 0 || finalI % 4 == 1);
-              record.put(BOOLEAN_FROM_INT_COLUMN, finalI % 4 == 0 || finalI % 
4 == 1 ? 1 : 0);
-              record.put(BOOLEAN_FROM_STRING_COLUMN, finalI % 4 == 0 || finalI 
% 4 == 1 ? "true" : "false");
-              record.put(INT_COLUMN, finalI);
-              record.put(LONG_COLUMN, finalI);
-              record.put(FLOAT_COLUMN, finalI + RANDOM.nextFloat());
-              record.put(DOUBLE_COLUMN, finalI + RANDOM.nextDouble());
-              record.put(STRING_COLUMN, RandomStringUtils.random(finalI));
-              record.put(TIMESTAMP_COLUMN, finalI);
-              record.put(GROUP_BY_COLUMN, String.valueOf(finalI % 10));
-              record.put(BOOLEAN_ARRAY_COLUMN, List.of(true, true, false, 
false));
-              record.put(BOOLEAN_FROM_INT_ARRAY_COLUMN, List.of(1, 1, 0, 0));
-              record.put(BOOLEAN_FROM_STRING_ARRAY_COLUMN, List.of("true", 
"true", "false", "false"));
-              record.put(LONG_ARRAY_COLUMN, List.of(0, 1, 2, 3));
-              record.put(DOUBLE_ARRAY_COLUMN, List.of(0.0, 0.1, 0.2, 0.3));
-              return record;
-            }
-        ));
+        writers.get(i % getNumAvroFiles())
+            .append(recordCache.get((int) (i % (getCountStarResult() / 10)), 
() -> {
+                  // create avro record
+                  GenericData.Record record = new 
GenericData.Record(avroSchema);
+                  record.put(BOOLEAN_COLUMN, finalI % 4 == 0 || finalI % 4 == 
1);
+                  record.put(BOOLEAN_FROM_INT_COLUMN, finalI % 4 == 0 || 
finalI % 4 == 1 ? 1 : 0);
+                  record.put(BOOLEAN_FROM_STRING_COLUMN, finalI % 4 == 0 || 
finalI % 4 == 1 ? "true" : "false");
+                  record.put(INT_COLUMN, finalI);
+                  record.put(LONG_COLUMN, finalI);
+                  record.put(FLOAT_COLUMN, finalI + RANDOM.nextFloat());
+                  record.put(DOUBLE_COLUMN, finalI + RANDOM.nextDouble());
+                  record.put(STRING_COLUMN, RandomStringUtils.random(finalI));
+                  record.put(TIMESTAMP_COLUMN, finalI);
+                  record.put(GROUP_BY_COLUMN, String.valueOf(finalI % 10));
+                  record.put(BOOLEAN_ARRAY_COLUMN, List.of(true, true, false, 
false));
+                  record.put(BOOLEAN_FROM_INT_ARRAY_COLUMN, List.of(1, 1, 0, 
0));
+                  record.put(BOOLEAN_FROM_STRING_ARRAY_COLUMN, List.of("true", 
"true", "false", "false"));
+                  record.put(LONG_ARRAY_COLUMN, List.of(0, 1, 2, 3));
+                  record.put(DOUBLE_ARRAY_COLUMN, List.of(0.0, 0.1, 0.2, 0.3));
+                  return record;
+                }
+            ));
       }
+      return avroFilesAndWriters.getAvroFiles();
     }
-    return List.of(avroFile);
   }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java
index 40a2739745a..1c1d54ccb91 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.UUID;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.pinot.common.function.scalar.DataTypeConversionFunctions;
 import org.apache.pinot.common.function.scalar.StringFunctions;
@@ -124,9 +123,8 @@ public class BytesTypeTest extends 
CustomDataQueryClusterIntegrationTest {
 
     ));
 
-    File avroFile = new File(_tempDir, "data.avro");
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      fileWriter.create(avroSchema, avroFile);
+    try (AvroFilesAndWriters avroFilesAndWriters = 
createAvroFilesAndWriters(avroSchema)) {
+      List<DataFileWriter<GenericData.Record>> writers = 
avroFilesAndWriters.getWriters();
       for (int i = 0; i < NUM_TOTAL_DOCS; i++) {
         GenericData.Record record = new GenericData.Record(avroSchema);
         byte[] bytes = newRandomBytes(RANDOM.nextInt(100) * 2 + 2);
@@ -150,10 +148,10 @@ public class BytesTypeTest extends 
CustomDataQueryClusterIntegrationTest {
         record.put(RANDOM_BYTES, ByteBuffer.wrap(randomBytes));
         record.put(FIXED_STRING, FIXED_HEX_STRIING_VALUE);
         record.put(FIXED_BYTES, 
ByteBuffer.wrap(DataTypeConversionFunctions.hexToBytes(FIXED_HEX_STRIING_VALUE)));
-        fileWriter.append(record);
+        writers.get(i % getNumAvroFiles()).append(record);
       }
+      return avroFilesAndWriters.getAvroFiles();
     }
-    return List.of(avroFile);
   }
 
   private static String newRandomBase64String() {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
index 121fc335e38..1eb8809fcdf 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
@@ -23,10 +23,8 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Base64;
 import java.util.List;
-import java.util.Random;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.datasketches.cpc.CpcSketch;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -174,21 +172,18 @@ public class CpcSketchTest extends 
CustomDataQueryClusterIntegrationTest {
             null), new org.apache.avro.Schema.Field(MET_CPC_SKETCH_BYTES,
             org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES), 
null, null)));
 
-    // create avro file
-    File avroFile = new File(_tempDir, "data.avro");
-    Random random = new Random();
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      fileWriter.create(avroSchema, avroFile);
+    try (AvroFilesAndWriters avroFilesAndWriters = 
createAvroFilesAndWriters(avroSchema)) {
+      List<DataFileWriter<GenericData.Record>> writers = 
avroFilesAndWriters.getWriters();
       for (int i = 0; i < getCountStarResult(); i++) {
         // create avro record
         GenericData.Record record = new GenericData.Record(avroSchema);
-        record.put(ID, random.nextInt(10));
+        record.put(ID, RANDOM.nextInt(10));
         record.put(MET_CPC_SKETCH_BYTES, ByteBuffer.wrap(getRandomRawValue()));
         // add avro record to file
-        fileWriter.append(record);
+        writers.get(i % getNumAvroFiles()).append(record);
       }
+      return avroFilesAndWriters.getAvroFiles();
     }
-    return List.of(avroFile);
   }
 
   private byte[] getRandomRawValue() {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
index 5c3b9b2cf96..72f7acdc9dd 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
@@ -18,12 +18,17 @@
  */
 package org.apache.pinot.integration.tests.custom;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
 import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
@@ -133,6 +138,12 @@ public abstract class 
CustomDataQueryClusterIntegrationTest extends BaseClusterI
     LOGGER.warn("Finished tearing down integration test class: {}", 
getClass().getSimpleName());
   }
 
+  @Override
+  protected void startServer()
+      throws Exception {
+    startServers(2);
+  }
+
   @Override
   protected void pushAvroIntoKafka(List<File> avroFiles)
       throws Exception {
@@ -262,7 +273,51 @@ public abstract class 
CustomDataQueryClusterIntegrationTest extends BaseClusterI
   public abstract List<File> createAvroFiles()
       throws Exception;
 
+  public int getNumAvroFiles() {
+    return 2;
+  }
+
   public boolean isRealtimeTable() {
     return false;
   }
+
+  protected AvroFilesAndWriters 
createAvroFilesAndWriters(org.apache.avro.Schema avroSchema)
+      throws IOException {
+    List<File> avroFiles = new ArrayList<>();
+    List<DataFileWriter<GenericData.Record>> writers = new ArrayList<>();
+    for (int i = 0; i < getNumAvroFiles(); i++) {
+      File avroFile = new File(_tempDir, "data-" + i + ".avro");
+      avroFiles.add(avroFile);
+      DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new 
GenericDatumWriter<>(avroSchema));
+      writers.add(fileWriter);
+      fileWriter.create(avroSchema, avroFile);
+    }
+    return new AvroFilesAndWriters(avroFiles, writers);
+  }
+
+  protected static class AvroFilesAndWriters implements Closeable {
+    private final List<File> _avroFiles;
+    private final List<DataFileWriter<GenericData.Record>> _writers;
+
+    AvroFilesAndWriters(List<File> avroFiles, 
List<DataFileWriter<GenericData.Record>> writers) {
+      _avroFiles = avroFiles;
+      _writers = writers;
+    }
+
+    public List<File> getAvroFiles() {
+      return _avroFiles;
+    }
+
+    public List<DataFileWriter<GenericData.Record>> getWriters() {
+      return _writers;
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+      for (DataFileWriter<GenericData.Record> writer : _writers) {
+        writer.close();
+      }
+    }
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java
index f2576ae4910..83048027b7e 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.util.List;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -96,12 +95,10 @@ public class FloatingPointDataTypeTest extends 
CustomDataQueryClusterIntegration
         new org.apache.avro.Schema.Field(MET_FLOAT_UNSORTED_NO_DIC,
             org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE), 
null, null)));
 
-    // create avro file
-    File avroFile = new File(_tempDir, "data.avro");
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      fileWriter.create(avroSchema, avroFile);
-      double sortedValue = 0.0;
-      double unsortedValue = 0.05;
+    double sortedValue = 0.0;
+    double unsortedValue = 0.05;
+    try (AvroFilesAndWriters avroFilesAndWriters = 
createAvroFilesAndWriters(avroSchema)) {
+      List<DataFileWriter<GenericData.Record>> writers = 
avroFilesAndWriters.getWriters();
       for (int i = 0; i < NUM_DOCS; i++) {
         // create avro record
         GenericData.Record record = new GenericData.Record(avroSchema);
@@ -120,10 +117,10 @@ public class FloatingPointDataTypeTest extends 
CustomDataQueryClusterIntegration
         }
 
         // add avro record to file
-        fileWriter.append(record);
+        writers.get(i % getNumAvroFiles()).append(record);
       }
+      return avroFilesAndWriters.getAvroFiles();
     }
-    return List.of(avroFile);
   }
 
   @Override
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java
index 14cccfb0e34..3705ef70e9b 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.pinot.core.geospatial.transform.function.ScalarFunctions;
 import org.apache.pinot.segment.local.utils.GeometrySerializer;
 import org.apache.pinot.segment.local.utils.GeometryUtils;
@@ -161,10 +160,10 @@ public class GeoSpatialTest extends 
CustomDataQueryClusterIntegrationTest {
             org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE), 
null, null)
     ));
 
-    File avroFile = new File(_tempDir, "data.avro");
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      fileWriter.create(avroSchema, avroFile);
+    try (AvroFilesAndWriters avroFilesAndWriters = 
createAvroFilesAndWriters(avroSchema)) {
+      List<DataFileWriter<GenericData.Record>> writers = 
avroFilesAndWriters.getWriters();
       for (int i = 0; i < NUM_TOTAL_DOCS; i++) {
+        int fileIdx = Math.min(i / (NUM_TOTAL_DOCS / getNumAvroFiles()), 
getNumAvroFiles() - 1);
         GenericData.Record record = new GenericData.Record(avroSchema);
         record.put(DIM_NAME, "dim" + i);
         Point point =
@@ -181,11 +180,10 @@ public class GeoSpatialTest extends 
CustomDataQueryClusterIntegrationTest {
         record.put(AREA_GEOM_SIZE_NAME, AREA_GEOM_SIZE_DATA[i % 
AREA_GEOM_SIZE_DATA.length]);
         record.put(AREA_GEOG_NAME, AREA_GEOG_DATA[i % AREA_GEOG_DATA.length]);
         record.put(AREA_GEOG_SIZE_NAME, AREA_GEOG_SIZE_DATA[i % 
AREA_GEOG_SIZE_DATA.length]);
-        fileWriter.append(record);
+        writers.get(fileIdx).append(record);
       }
+      return avroFilesAndWriters.getAvroFiles();
     }
-
-    return List.of(avroFile);
   }
 
   @Test(dataProvider = "useBothQueryEngines")
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
index 593b0c40301..dd19bd51348 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
@@ -31,7 +31,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.pinot.common.function.JsonPathCache;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -51,18 +50,18 @@ public class JsonPathTest extends 
CustomDataQueryClusterIntegrationTest {
 
   protected static final String DEFAULT_TABLE_NAME = "JsonPathTest";
 
-  protected static final int NUM_TOTAL_DOCS = 1000;
+  protected static final int NUM_DOCS_PER_SEGMENT = 1000;
   private static final String MY_MAP_STR_FIELD_NAME = "myMapStr";
   private static final String MY_MAP_STR_K1_FIELD_NAME = "myMapStr_k1";
   private static final String MY_MAP_STR_K2_FIELD_NAME = "myMapStr_k2";
   private static final String COMPLEX_MAP_STR_FIELD_NAME = "complexMapStr";
   private static final String COMPLEX_MAP_STR_K3_FIELD_NAME = 
"complexMapStr_k3";
 
-  protected final List<String> _sortedSequenceIds = new 
ArrayList<>(NUM_TOTAL_DOCS);
+  protected final List<String> _sortedSequenceIds = new 
ArrayList<>(NUM_DOCS_PER_SEGMENT);
 
   @Override
   protected long getCountStarResult() {
-    return NUM_TOTAL_DOCS;
+    return (long) NUM_DOCS_PER_SEGMENT * getNumAvroFiles();
   }
 
   @Override
@@ -105,10 +104,8 @@ public class JsonPathTest extends 
CustomDataQueryClusterIntegrationTest {
                 org.apache.avro.Schema.Type.STRING), null, null));
     avroSchema.setFields(fields);
 
-    File avroFile = new File(_tempDir, "data.avro");
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      fileWriter.create(avroSchema, avroFile);
-      for (int i = 0; i < NUM_TOTAL_DOCS; i++) {
+    try (AvroFilesAndWriters avroFilesAndWriters = 
createAvroFilesAndWriters(avroSchema)) {
+      for (int i = 0; i < NUM_DOCS_PER_SEGMENT; i++) {
         Map<String, String> map = new HashMap<>();
         map.put("k1", "value-k1-" + i);
         map.put("k2", "value-k2-" + i);
@@ -123,13 +120,14 @@ public class JsonPathTest extends 
CustomDataQueryClusterIntegrationTest {
             Map.of("k4-k1", "value-k4-k1-" + i, "k4-k2", "value-k4-k2-" + i, 
"k4-k3", "value-k4-k3-" + i,
                 "met", i));
         record.put(COMPLEX_MAP_STR_FIELD_NAME, 
JsonUtils.objectToString(complexMap));
-        fileWriter.append(record);
+        for (DataFileWriter<GenericData.Record> writer : 
avroFilesAndWriters.getWriters()) {
+          writer.append(record);
+        }
         _sortedSequenceIds.add(String.valueOf(i));
       }
+      Collections.sort(_sortedSequenceIds);
+      return avroFilesAndWriters.getAvroFiles();
     }
-    Collections.sort(_sortedSequenceIds);
-
-    return List.of(avroFile);
   }
 
   @Test(dataProvider = "useBothQueryEngines")
@@ -199,19 +197,19 @@ public class JsonPathTest extends 
CustomDataQueryClusterIntegrationTest {
       String value = rows.get(i).get(0).textValue();
       Map<?, ?> results = JsonUtils.stringToObject(value, Map.class);
       Assert.assertTrue(value.indexOf("-k1-") > 0);
-      Assert.assertEquals(results.get("k1"), "value-k1-" + i);
-      Assert.assertEquals(results.get("k2"), "value-k2-" + i);
+      Assert.assertEquals(results.get("k1"), "value-k1-" + i % 
NUM_DOCS_PER_SEGMENT);
+      Assert.assertEquals(results.get("k2"), "value-k2-" + i % 
NUM_DOCS_PER_SEGMENT);
       final List<?> k3 = (List<?>) results.get("k3");
       Assert.assertEquals(k3.size(), 3);
-      Assert.assertEquals(k3.get(0), "value-k3-0-" + i);
-      Assert.assertEquals(k3.get(1), "value-k3-1-" + i);
-      Assert.assertEquals(k3.get(2), "value-k3-2-" + i);
+      Assert.assertEquals(k3.get(0), "value-k3-0-" + i % NUM_DOCS_PER_SEGMENT);
+      Assert.assertEquals(k3.get(1), "value-k3-1-" + i % NUM_DOCS_PER_SEGMENT);
+      Assert.assertEquals(k3.get(2), "value-k3-2-" + i % NUM_DOCS_PER_SEGMENT);
       final Map<?, ?> k4 = (Map<?, ?>) results.get("k4");
       Assert.assertEquals(k4.size(), 4);
-      Assert.assertEquals(k4.get("k4-k1"), "value-k4-k1-" + i);
-      Assert.assertEquals(k4.get("k4-k2"), "value-k4-k2-" + i);
-      Assert.assertEquals(k4.get("k4-k3"), "value-k4-k3-" + i);
-      Assert.assertEquals(Double.parseDouble(k4.get("met").toString()), i);
+      Assert.assertEquals(k4.get("k4-k1"), "value-k4-k1-" + i % 
NUM_DOCS_PER_SEGMENT);
+      Assert.assertEquals(k4.get("k4-k2"), "value-k4-k2-" + i % 
NUM_DOCS_PER_SEGMENT);
+      Assert.assertEquals(k4.get("k4-k3"), "value-k4-k3-" + i % 
NUM_DOCS_PER_SEGMENT);
+      Assert.assertEquals(Double.parseDouble(k4.get("met").toString()), i % 
NUM_DOCS_PER_SEGMENT);
     }
 
     //Filter Query
@@ -220,7 +218,7 @@ public class JsonPathTest extends 
CustomDataQueryClusterIntegrationTest {
     pinotResponse = postQuery(query);
     rows = (ArrayNode) pinotResponse.get("resultTable").get("rows");
     Assert.assertNotNull(rows);
-    Assert.assertEquals(rows.size(), 1);
+    Assert.assertEquals(rows.size(), getNumAvroFiles());
     for (int i = 0; i < rows.size(); i++) {
       String value = rows.get(i).get(0).textValue();
       Map<?, ?> k4 = JsonUtils.stringToObject(value, Map.class);
@@ -233,7 +231,7 @@ public class JsonPathTest extends 
CustomDataQueryClusterIntegrationTest {
 
     //selection order by
     query = "Select complexMapStr from " + getTableName()
-        + " order by jsonExtractScalar(complexMapStr,'$.k4.k4-k1','STRING') 
DESC LIMIT " + NUM_TOTAL_DOCS;
+        + " order by jsonExtractScalar(complexMapStr,'$.k4.k4-k1','STRING') 
DESC LIMIT " + NUM_DOCS_PER_SEGMENT;
     pinotResponse = postQuery(query);
     rows = (ArrayNode) pinotResponse.get("resultTable").get("rows");
     Assert.assertNotNull(rows);
@@ -242,7 +240,7 @@ public class JsonPathTest extends 
CustomDataQueryClusterIntegrationTest {
       String value = rows.get(i).get(0).textValue();
       Assert.assertTrue(value.indexOf("-k1-") > 0);
       Map<?, ?> results = JsonUtils.stringToObject(value, Map.class);
-      String seqId = _sortedSequenceIds.get(NUM_TOTAL_DOCS - 1 - i);
+      String seqId = _sortedSequenceIds.get(NUM_DOCS_PER_SEGMENT - 1 - i / 
getNumAvroFiles());
       Assert.assertEquals(results.get("k1"), "value-k1-" + seqId);
       Assert.assertEquals(results.get("k2"), "value-k2-" + seqId);
       final List<?> k3 = (List<?>) results.get("k3");
@@ -271,10 +269,10 @@ public class JsonPathTest extends 
CustomDataQueryClusterIntegrationTest {
     Assert.assertNotNull(pinotResponse.get("resultTable").get("rows"));
     ArrayNode rows = (ArrayNode) pinotResponse.get("resultTable").get("rows");
     for (int i = 0; i < rows.size(); i++) {
-      String seqId = _sortedSequenceIds.get(NUM_TOTAL_DOCS - 1 - i);
+      String seqId = _sortedSequenceIds.get(NUM_DOCS_PER_SEGMENT - 1 - i);
       final JsonNode row = rows.get(i);
       Assert.assertEquals(row.get(0).asText(), "value-k1-" + seqId);
-      Assert.assertEquals(row.get(1).asDouble(), Double.parseDouble(seqId));
+      Assert.assertEquals(row.get(1).asDouble(), Double.parseDouble(seqId) * 
getNumAvroFiles());
     }
   }
 
@@ -291,10 +289,10 @@ public class JsonPathTest extends 
CustomDataQueryClusterIntegrationTest {
     Assert.assertNotNull(pinotResponse.get("resultTable").get("rows"));
     ArrayNode rows = (ArrayNode) pinotResponse.get("resultTable").get("rows");
     for (int i = 0; i < rows.size(); i++) {
-      String seqId = String.valueOf(NUM_TOTAL_DOCS - 1 - i);
+      String seqId = String.valueOf(NUM_DOCS_PER_SEGMENT - 1 - i);
       final JsonNode row = rows.get(i);
       Assert.assertEquals(row.get(0).asText(), "value-k1-" + seqId);
-      Assert.assertEquals(row.get(1).asDouble(), Double.parseDouble(seqId));
+      Assert.assertEquals(row.get(1).asDouble(), Double.parseDouble(seqId) * 
getNumAvroFiles());
     }
   }
 
@@ -313,7 +311,7 @@ public class JsonPathTest extends 
CustomDataQueryClusterIntegrationTest {
     Assert.assertEquals(rows.size(), 1);
     final JsonNode row = rows.get(0);
     Assert.assertEquals(row.get(0).asText(), "defaultKey");
-    Assert.assertEquals(row.get(1).asDouble(), 1000.0);
+    Assert.assertEquals(row.get(1).asDouble(), 1000.0 * getNumAvroFiles());
   }
 
   @Test(dataProvider = "useBothQueryEngines")
@@ -331,7 +329,7 @@ public class JsonPathTest extends 
CustomDataQueryClusterIntegrationTest {
     Assert.assertEquals(rows.size(), 1);
     final JsonNode row = rows.get(0);
     Assert.assertEquals(row.get(0).asText(), "defaultKey");
-    Assert.assertTrue(Math.abs(row.get(1).asDouble() - 100.0) < 1e-10);
+    Assert.assertTrue(Math.abs(row.get(1).asDouble() - 100.0 * 
getNumAvroFiles()) < 1e-10);
   }
 
   @Test(dataProvider = "useBothQueryEngines")
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java
index 8d54850f23e..1273a28666e 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
@@ -107,10 +106,9 @@ public class MapFieldTypeRealtimeTest extends 
CustomDataQueryClusterIntegrationT
                 null));
     avroSchema.setFields(fields);
 
-    File avroFile = new File(_tempDir, "data.avro");
     long tsBase = System.currentTimeMillis();
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      fileWriter.create(avroSchema, avroFile);
+    try (AvroFilesAndWriters avroFilesAndWriters = 
createAvroFilesAndWriters(avroSchema)) {
+      List<DataFileWriter<GenericData.Record>> writers = 
avroFilesAndWriters.getWriters();
       for (int i = 0; i < NUM_DOCS; i++) {
         Map<String, String> stringMap = new HashMap<>();
         Map<String, Integer> intMap = new HashMap<>();
@@ -123,10 +121,10 @@ public class MapFieldTypeRealtimeTest extends 
CustomDataQueryClusterIntegrationT
         record.put(STRING_MAP_FIELD_NAME, stringMap);
         record.put(INT_MAP_FIELD_NAME, intMap);
         record.put(TIMESTAMP_FIELD_NAME, tsBase + i);
-        fileWriter.append(record);
+        writers.get(i % getNumAvroFiles()).append(record);
       }
+      return avroFilesAndWriters.getAvroFiles();
     }
-    return List.of(avroFile);
   }
 
   protected int getSelectionDefaultDocCount() {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
index 2974d20482f..7c8a3904485 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -48,14 +47,14 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
   // Default settings
   private static final int V1_DEFAULT_SELECTION_COUNT = 10;
   protected static final String DEFAULT_TABLE_NAME = "MapFieldTypeTest";
-  private static final int NUM_DOCS = 1000;
+  private static final int NUM_DOCS_PER_SEGMENT = 1000;
   private static final String STRING_MAP_FIELD_NAME = "stringMap";
   private static final String INT_MAP_FIELD_NAME = "intMap";
   private int _setSelectionDefaultDocCount = 10;
 
   @Override
   protected long getCountStarResult() {
-    return NUM_DOCS;
+    return (long) NUM_DOCS_PER_SEGMENT * getNumAvroFiles();
   }
 
   @Override
@@ -141,10 +140,8 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
             new org.apache.avro.Schema.Field(INT_MAP_FIELD_NAME, 
intMapAvroSchema, null, null));
     avroSchema.setFields(fields);
 
-    File avroFile = new File(_tempDir, "data.avro");
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      fileWriter.create(avroSchema, avroFile);
-      for (int i = 0; i < NUM_DOCS; i++) {
+    try (AvroFilesAndWriters avroFilesAndWriters = 
createAvroFilesAndWriters(avroSchema)) {
+      for (int i = 0; i < NUM_DOCS_PER_SEGMENT; i++) {
         Map<String, String> stringMap = new HashMap<>();
         Map<String, Integer> intMap = new HashMap<>();
         for (int j = 0; j < i; j++) {
@@ -155,10 +152,12 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
         GenericData.Record record = new GenericData.Record(avroSchema);
         record.put(STRING_MAP_FIELD_NAME, stringMap);
         record.put(INT_MAP_FIELD_NAME, intMap);
-        fileWriter.append(record);
+        for (DataFileWriter<GenericData.Record> writer : 
avroFilesAndWriters.getWriters()) {
+          writer.append(record);
+        }
       }
+      return avroFilesAndWriters.getAvroFiles();
     }
-    return List.of(avroFile);
   }
 
   protected int getSelectionDefaultDocCount() {
@@ -178,9 +177,9 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
       JsonNode intMap = rows.get(i).get(0);
       JsonNode stringMap = rows.get(i).get(1);
-      for (int j = 0; j < i; j++) {
-        assertEquals(intMap.get("k" + j).intValue(), i);
-        assertEquals(stringMap.get("k" + j).textValue(), "v" + i);
+      for (int j = 0; j < i % NUM_DOCS_PER_SEGMENT; j++) {
+        assertEquals(intMap.get("k" + j).intValue(), i % NUM_DOCS_PER_SEGMENT);
+        assertEquals(stringMap.get("k" + j).textValue(), "v" + i % 
NUM_DOCS_PER_SEGMENT);
       }
     }
     // Selection only
@@ -192,9 +191,13 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
 
     assertEquals(rows.get(0).get(0).textValue(), "null");
     assertEquals(rows.get(0).get(1).intValue(), -2147483648);
-    for (int i = 1; i < getSelectionDefaultDocCount(); i++) {
-      assertEquals(rows.get(i).get(0).textValue(), "v" + i);
-      assertEquals(rows.get(i).get(1).intValue(), i);
+    for (int i = 1; i < NUM_DOCS_PER_SEGMENT; i++) {
+      for (int j = 0; j < getNumAvroFiles(); j++) {
+        if (i + NUM_DOCS_PER_SEGMENT < getSelectionDefaultDocCount()) {
+          assertEquals(rows.get(i + NUM_DOCS_PER_SEGMENT * 
j).get(0).textValue(), "v" + i);
+          assertEquals(rows.get(i + NUM_DOCS_PER_SEGMENT * 
j).get(1).intValue(), i);
+        }
+      }
     }
 
     // Selection order-by
@@ -208,22 +211,28 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     assertEquals(rows.get(0).get(0).intValue(), -2147483648);
     assertEquals(rows.get(0).get(1).intValue(), -2147483648);
     assertEquals(rows.get(0).get(2).textValue(), "null");
-    assertEquals(rows.get(1).get(0).intValue(), 1);
+    assertEquals(rows.get(1).get(0).intValue(), -2147483648);
     assertEquals(rows.get(1).get(1).intValue(), -2147483648);
-    assertEquals(rows.get(1).get(2).textValue(), "v1");
-    for (int i = 2; i < getSelectionDefaultDocCount(); i++) {
-      assertEquals(rows.get(i).get(0).intValue(), i);
-      assertEquals(rows.get(i).get(1).intValue(), i);
-      assertEquals(rows.get(i).get(2).textValue(), "v" + i);
-      assertEquals(rows.get(i).get(3).textValue(), "v" + i);
+    assertEquals(rows.get(1).get(2).textValue(), "null");
+    assertEquals(rows.get(2).get(0).intValue(), 1);
+    assertEquals(rows.get(2).get(1).intValue(), -2147483648);
+    assertEquals(rows.get(2).get(2).textValue(), "v1");
+    assertEquals(rows.get(3).get(0).intValue(), 1);
+    assertEquals(rows.get(3).get(1).intValue(), -2147483648);
+    assertEquals(rows.get(3).get(2).textValue(), "v1");
+    for (int i = 4; i < getSelectionDefaultDocCount(); i++) {
+      assertEquals(rows.get(i).get(0).intValue(), i / getNumAvroFiles());
+      assertEquals(rows.get(i).get(1).intValue(), i / getNumAvroFiles());
+      assertEquals(rows.get(i).get(2).textValue(), "v" + i / 
getNumAvroFiles());
+      assertEquals(rows.get(i).get(3).textValue(), "v" + i / 
getNumAvroFiles());
     }
 
     // Aggregation only
     query = "SELECT MAX(intMap['k0']), MAX(intMap['k1']) FROM " + 
getTableName();
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
-    
assertEquals(pinotResponse.get("resultTable").get("rows").get(0).get(0).intValue(),
 NUM_DOCS - 1);
-    
assertEquals(pinotResponse.get("resultTable").get("rows").get(0).get(1).intValue(),
 NUM_DOCS - 1);
+    
assertEquals(pinotResponse.get("resultTable").get("rows").get(0).get(0).intValue(),
 NUM_DOCS_PER_SEGMENT - 1);
+    
assertEquals(pinotResponse.get("resultTable").get("rows").get(0).get(1).intValue(),
 NUM_DOCS_PER_SEGMENT - 1);
 
     // Aggregation group-by
     query = "SELECT stringMap['k0'] AS key, MIN(intMap['k0']) AS value FROM " 
+ getTableName()
@@ -231,10 +240,10 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     rows = pinotResponse.get("resultTable").get("rows");
-    assertEquals(rows.size(), getSelectionDefaultDocCount());
+    assertEquals(rows.size(), Math.min(getSelectionDefaultDocCount(), 
NUM_DOCS_PER_SEGMENT));
     assertEquals(rows.get(0).get(0).textValue(), "null");
     assertEquals(rows.get(0).get(1).intValue(), Integer.MIN_VALUE);
-    for (int i = 1; i < getSelectionDefaultDocCount(); i++) {
+    for (int i = 1; i < Math.min(getSelectionDefaultDocCount(), 
NUM_DOCS_PER_SEGMENT); i++) {
       assertEquals(rows.get(i).get(0).textValue(), "v" + i);
       assertEquals(rows.get(i).get(1).intValue(), i);
     }
@@ -244,14 +253,14 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     rows = pinotResponse.get("resultTable").get("rows");
-    assertEquals(rows.size(), 1);
+    assertEquals(rows.size(), getNumAvroFiles());
     assertEquals(rows.get(0).get(0).textValue(), "v25");
 
     query = "SELECT intMap['k2'] FROM " + getTableName() + " WHERE 
intMap['k1'] = 25";
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     rows = pinotResponse.get("resultTable").get("rows");
-    assertEquals(rows.size(), 1);
+    assertEquals(rows.size(), getNumAvroFiles());
     assertEquals(rows.get(0).get(0).intValue(), 25);
 
     // Filter on non-existing key
@@ -324,7 +333,7 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     JsonNode rows = pinotResponse.get("resultTable").get("rows");
     // Only records with k1 = 'v25' or 'v26' should be returned
-    assertEquals(rows.size(), 2);
+    assertEquals(rows.size(), 2 * getNumAvroFiles());
 
     // Verify the returned values
     for (int i = 0; i < rows.size(); i++) {
@@ -338,7 +347,7 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     rows = pinotResponse.get("resultTable").get("rows");
     // Only records with k1 = 25 or 26 should be returned
-    assertEquals(rows.size(), 2);
+    assertEquals(rows.size(), 2 * getNumAvroFiles());
 
     // Verify the returned values
     for (int i = 0; i < rows.size(); i++) {
@@ -443,6 +452,6 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
   @Override
   protected void setUseMultiStageQueryEngine(boolean useMultiStageQueryEngine) 
{
     super.setUseMultiStageQueryEngine(useMultiStageQueryEngine);
-    _setSelectionDefaultDocCount = useMultiStageQueryEngine ? NUM_DOCS : 
V1_DEFAULT_SELECTION_COUNT;
+    _setSelectionDefaultDocCount = useMultiStageQueryEngine ? (int) 
getCountStarResult() : V1_DEFAULT_SELECTION_COUNT;
   }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
index fdf88db4108..55285092bc8 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.pinot.segment.local.utils.SchemaUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -46,7 +45,7 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
 
   // Default settings
   protected static final String DEFAULT_TABLE_NAME = "MapTypeTest";
-  private static final int NUM_DOCS = 1000;
+  private static final int NUM_DOCS_PER_SEGMENT = 1000;
   private static final String STRING_KEY_MAP_FIELD_NAME = "stringKeyMap";
   private static final String INT_KEY_MAP_FIELD_NAME = "intKeyMap";
   private static final String STRING_KEY_MAP_STR_FIELD_NAME = 
"stringKeyMapStr";
@@ -55,7 +54,7 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
 
   @Override
   protected long getCountStarResult() {
-    return NUM_DOCS;
+    return (long) NUM_DOCS_PER_SEGMENT * getNumAvroFiles();
   }
 
   @Override
@@ -99,24 +98,23 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
             new org.apache.avro.Schema.Field(INT_KEY_MAP_FIELD_NAME, 
intKeyMapAvroSchema, null, null));
     avroSchema.setFields(fields);
 
-    File avroFile = new File(_tempDir, "data.avro");
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      fileWriter.create(avroSchema, avroFile);
-      for (int i = 0; i < NUM_DOCS; i++) {
+    try (AvroFilesAndWriters avroFilesAndWriters = 
createAvroFilesAndWriters(avroSchema)) {
+      for (int i = 0; i < NUM_DOCS_PER_SEGMENT; i++) {
         Map<String, Integer> stringKeyMap = new HashMap<>();
         stringKeyMap.put("k1", i);
-        stringKeyMap.put("k2", NUM_DOCS + i);
+        stringKeyMap.put("k2", NUM_DOCS_PER_SEGMENT + i);
         Map<Integer, String> intKeyMap = new HashMap<>();
         intKeyMap.put(95, Integer.toString(i));
-        intKeyMap.put(717, Integer.toString(NUM_DOCS + i));
+        intKeyMap.put(717, Integer.toString(NUM_DOCS_PER_SEGMENT + i));
         GenericData.Record record = new GenericData.Record(avroSchema);
         record.put(STRING_KEY_MAP_FIELD_NAME, stringKeyMap);
         record.put(INT_KEY_MAP_FIELD_NAME, intKeyMap);
-        fileWriter.append(record);
+        for (DataFileWriter<GenericData.Record> writer : 
avroFilesAndWriters.getWriters()) {
+          writer.append(record);
+        }
       }
+      return avroFilesAndWriters.getAvroFiles();
     }
-
-    return List.of(avroFile);
   }
 
   protected int getSelectionDefaultDocCount() {
@@ -134,7 +132,8 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     JsonNode rows = pinotResponse.get("resultTable").get("rows");
     assertEquals(rows.size(), getSelectionDefaultDocCount());
     for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
-      assertEquals(rows.get(i).get(0).textValue(), 
String.format("{\"k1\":%d,\"k2\":%d}", i, NUM_DOCS + i));
+      assertEquals(rows.get(i).get(0).textValue(), 
String.format("{\"k1\":%d,\"k2\":%d}", i % NUM_DOCS_PER_SEGMENT,
+          NUM_DOCS_PER_SEGMENT + i % NUM_DOCS_PER_SEGMENT));
     }
     query = "SELECT jsonExtractScalar(stringKeyMapStr, '$.k1', 'INT') FROM " + 
getTableName();
     pinotResponse = postQuery(query);
@@ -142,7 +141,7 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     rows = pinotResponse.get("resultTable").get("rows");
     assertEquals(rows.size(), getSelectionDefaultDocCount());
     for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
-      assertEquals(rows.get(i).get(0).intValue(), i);
+      assertEquals(rows.get(i).get(0).intValue(), i % NUM_DOCS_PER_SEGMENT);
     }
     query = "SELECT jsonExtractScalar(intKeyMapStr, '$.95', 'INT') FROM " + 
getTableName();
     pinotResponse = postQuery(query);
@@ -150,7 +149,7 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     rows = pinotResponse.get("resultTable").get("rows");
     assertEquals(rows.size(), getSelectionDefaultDocCount());
     for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
-      assertEquals(rows.get(i).get(0).intValue(), i);
+      assertEquals(rows.get(i).get(0).intValue(), i % NUM_DOCS_PER_SEGMENT);
     }
 
     // Selection order-by
@@ -161,7 +160,7 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     rows = pinotResponse.get("resultTable").get("rows");
     assertEquals(rows.size(), getSelectionDefaultDocCount());
     for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
-      assertEquals(rows.get(i).get(0).intValue(), NUM_DOCS + i);
+      assertEquals(rows.get(i).get(0).intValue(), NUM_DOCS_PER_SEGMENT + i / 
getNumAvroFiles());
     }
     query = "SELECT jsonExtractScalar(intKeyMapStr, '$.717', 'INT') FROM " + 
getTableName()
         + " ORDER BY jsonExtractScalar(intKeyMapStr, '$.95', 'INT')";
@@ -170,7 +169,7 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     rows = pinotResponse.get("resultTable").get("rows");
     assertEquals(rows.size(), getSelectionDefaultDocCount());
     for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
-      assertEquals(rows.get(i).get(0).intValue(), NUM_DOCS + i);
+      assertEquals(rows.get(i).get(0).intValue(), NUM_DOCS_PER_SEGMENT + i / 
getNumAvroFiles());
     }
 
     // Aggregation only
@@ -178,12 +177,12 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     JsonNode aggregationResult = 
pinotResponse.get("resultTable").get("rows").get(0).get(0);
-    assertEquals(aggregationResult.intValue(), NUM_DOCS - 1);
+    assertEquals(aggregationResult.intValue(), NUM_DOCS_PER_SEGMENT - 1);
     query = "SELECT MAX(jsonExtractScalar(intKeyMapStr, '$.95', 'INT')) FROM " 
+ getTableName();
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     aggregationResult = 
pinotResponse.get("resultTable").get("rows").get(0).get(0);
-    assertEquals(aggregationResult.intValue(), NUM_DOCS - 1);
+    assertEquals(aggregationResult.intValue(), NUM_DOCS_PER_SEGMENT - 1);
 
     // Aggregation group-by
     query = "SELECT jsonExtractScalar(stringKeyMapStr, '$.k1', 'INT') AS key, "
@@ -192,10 +191,11 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     rows = pinotResponse.get("resultTable").get("rows");
-    assertEquals(rows.size(), getSelectionDefaultDocCount());
-    for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
+    int expectedReturnRows = Math.min(getSelectionDefaultDocCount(), 
NUM_DOCS_PER_SEGMENT);
+    assertEquals(rows.size(), expectedReturnRows);
+    for (int i = 0; i < expectedReturnRows; i++) {
       assertEquals(rows.get(i).get(0).intValue(), i);
-      assertEquals(rows.get(i).get(1).intValue(), NUM_DOCS + i);
+      assertEquals(rows.get(i).get(1).intValue(), NUM_DOCS_PER_SEGMENT + i);
     }
     query = "SELECT jsonExtractScalar(intKeyMapStr, '$.95', 'INT') AS key, "
         + "MIN(jsonExtractScalar(intKeyMapStr, '$.717', 'INT')) AS value FROM 
" + getTableName()
@@ -203,10 +203,10 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     rows = pinotResponse.get("resultTable").get("rows");
-    assertEquals(rows.size(), getSelectionDefaultDocCount());
-    for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
+    assertEquals(rows.size(), expectedReturnRows);
+    for (int i = 0; i < expectedReturnRows; i++) {
       assertEquals(rows.get(i).get(0).intValue(), i);
-      assertEquals(rows.get(i).get(1).intValue(), NUM_DOCS + i);
+      assertEquals(rows.get(i).get(1).intValue(), NUM_DOCS_PER_SEGMENT + i);
     }
 
     // Filter
@@ -215,15 +215,15 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     rows = pinotResponse.get("resultTable").get("rows");
-    assertEquals(rows.size(), 1);
-    assertEquals(rows.get(0).get(0).intValue(), NUM_DOCS + 25);
+    assertEquals(rows.size(), getNumAvroFiles());
+    assertEquals(rows.get(0).get(0).intValue(), NUM_DOCS_PER_SEGMENT + 25);
     query = "SELECT jsonExtractScalar(intKeyMapStr, '$.717', 'INT') FROM " + 
getTableName()
         + " WHERE jsonExtractScalar(intKeyMapStr, '$.95', 'INT') = 25";
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     rows = pinotResponse.get("resultTable").get("rows");
-    assertEquals(rows.size(), 1);
-    assertEquals(rows.get(0).get(0).intValue(), NUM_DOCS + 25);
+    assertEquals(rows.size(), getNumAvroFiles());
+    assertEquals(rows.get(0).get(0).intValue(), NUM_DOCS_PER_SEGMENT + 25);
 
     // Select non-existing key (illegal query)
     query = "SELECT jsonExtractScalar(stringKeyMapStr, '$.k3', 'INT') FROM " + 
getTableName();
@@ -253,7 +253,7 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     JsonNode rows = pinotResponse.get("resultTable").get("rows");
     assertEquals(rows.size(), getSelectionDefaultDocCount());
     for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
-      assertEquals(rows.get(i).get(0).intValue(), i);
+      assertEquals(rows.get(i).get(0).intValue(), i % NUM_DOCS_PER_SEGMENT);
     }
     query = "SELECT mapValue(intKeyMap__KEYS, 95, intKeyMap__VALUES) FROM " + 
getTableName();
     pinotResponse = postQuery(query);
@@ -261,7 +261,7 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     rows = pinotResponse.get("resultTable").get("rows");
     assertEquals(rows.size(), getSelectionDefaultDocCount());
     for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
-      assertEquals(rows.get(i).get(0).intValue(), i);
+      assertEquals(rows.get(i).get(0).intValue(), i % NUM_DOCS_PER_SEGMENT);
     }
 
     // Selection order-by
@@ -272,7 +272,7 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     rows = pinotResponse.get("resultTable").get("rows");
     assertEquals(rows.size(), getSelectionDefaultDocCount());
     for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
-      assertEquals(rows.get(i).get(0).intValue(), NUM_DOCS + i);
+      assertEquals(rows.get(i).get(0).intValue(), NUM_DOCS_PER_SEGMENT + i / 
getNumAvroFiles());
     }
     query = "SELECT mapValue(intKeyMap__KEYS, 717, intKeyMap__VALUES) FROM " + 
getTableName()
         + " ORDER BY mapValue(intKeyMap__KEYS, 95, intKeyMap__VALUES)";
@@ -281,7 +281,7 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     rows = pinotResponse.get("resultTable").get("rows");
     assertEquals(rows.size(), getSelectionDefaultDocCount());
     for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
-      assertEquals(rows.get(i).get(0).intValue(), NUM_DOCS + i);
+      assertEquals(rows.get(i).get(0).intValue(), NUM_DOCS_PER_SEGMENT + i / 
getNumAvroFiles());
     }
 
     // Aggregation only
@@ -289,12 +289,12 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     JsonNode aggregationResult = 
pinotResponse.get("resultTable").get("rows").get(0).get(0);
-    assertEquals(aggregationResult.intValue(), NUM_DOCS - 1);
+    assertEquals(aggregationResult.intValue(), NUM_DOCS_PER_SEGMENT - 1);
     query = "SELECT MAX(mapValue(intKeyMap__KEYS, 95, intKeyMap__VALUES)) FROM 
" + getTableName();
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     aggregationResult = 
pinotResponse.get("resultTable").get("rows").get(0).get(0);
-    assertEquals(aggregationResult.intValue(), NUM_DOCS - 1);
+    assertEquals(aggregationResult.intValue(), NUM_DOCS_PER_SEGMENT - 1);
 
     // Aggregation group-by
     query = "SELECT mapValue(stringKeyMap__KEYS, 'k1', stringKeyMap__VALUES) 
AS key, "
@@ -303,10 +303,11 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     rows = pinotResponse.get("resultTable").get("rows");
-    assertEquals(rows.size(), getSelectionDefaultDocCount());
-    for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
+    int expectedReturnRows = Math.min(getSelectionDefaultDocCount(), 
NUM_DOCS_PER_SEGMENT);
+    assertEquals(rows.size(), expectedReturnRows);
+    for (int i = 0; i < expectedReturnRows; i++) {
       assertEquals(rows.get(i).get(0).intValue(), i);
-      assertEquals(rows.get(i).get(1).intValue(), NUM_DOCS + i);
+      assertEquals(rows.get(i).get(1).intValue(), NUM_DOCS_PER_SEGMENT + i);
     }
     query = "SELECT mapValue(intKeyMap__KEYS, 95, intKeyMap__VALUES) AS key, "
         + "MIN(mapValue(intKeyMap__KEYS, 717, intKeyMap__VALUES)) AS value 
FROM " + getTableName()
@@ -314,10 +315,10 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     rows = pinotResponse.get("resultTable").get("rows");
-    assertEquals(rows.size(), getSelectionDefaultDocCount());
-    for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
+    assertEquals(rows.size(), expectedReturnRows);
+    for (int i = 0; i < expectedReturnRows; i++) {
       assertEquals(rows.get(i).get(0).intValue(), i);
-      assertEquals(rows.get(i).get(1).intValue(), NUM_DOCS + i);
+      assertEquals(rows.get(i).get(1).intValue(), NUM_DOCS_PER_SEGMENT + i);
     }
 
     // Filter
@@ -326,15 +327,15 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     rows = pinotResponse.get("resultTable").get("rows");
-    assertEquals(rows.size(), 1);
-    assertEquals(rows.get(0).get(0).intValue(), NUM_DOCS + 25);
+    assertEquals(rows.size(), getNumAvroFiles());
+    assertEquals(rows.get(0).get(0).intValue(), NUM_DOCS_PER_SEGMENT + 25);
     query = "SELECT mapValue(intKeyMap__KEYS, 717, intKeyMap__VALUES) FROM " + 
getTableName()
         + " WHERE mapValue(intKeyMap__KEYS, 95, intKeyMap__VALUES) = 25";
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     rows = pinotResponse.get("resultTable").get("rows");
-    assertEquals(rows.size(), 1);
-    assertEquals(rows.get(0).get(0).intValue(), NUM_DOCS + 25);
+    assertEquals(rows.size(), getNumAvroFiles());
+    assertEquals(rows.get(0).get(0).intValue(), NUM_DOCS_PER_SEGMENT + 25);
 
     // Filter on non-existing key
     query = "SELECT mapValue(stringKeyMap__KEYS, 'k2', stringKeyMap__VALUES) 
FROM " + getTableName()
@@ -414,6 +415,6 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
   @Override
   protected void setUseMultiStageQueryEngine(boolean useMultiStageQueryEngine) 
{
     super.setUseMultiStageQueryEngine(useMultiStageQueryEngine);
-    _setSelectionDefaultDocCount = useMultiStageQueryEngine ? 1000 : 10;
+    _setSelectionDefaultDocCount = useMultiStageQueryEngine ? 
NUM_DOCS_PER_SEGMENT * getNumAvroFiles() : 10;
   }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiColumnTextIndicesTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiColumnTextIndicesTest.java
index cea7e334061..ed1f439401d 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiColumnTextIndicesTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiColumnTextIndicesTest.java
@@ -37,7 +37,6 @@ import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.MultiColumnTextIndexConfig;
@@ -230,7 +229,6 @@ public class MultiColumnTextIndicesTest extends 
CustomDataQueryClusterIntegratio
     }
     assertEquals(skills.size(), NUM_SKILLS);
 
-    File avroFile = new File(_tempDir, "data.avro");
     org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
     avroSchema.setFields(Arrays.asList(
         new Field(NULLABLE_TEXT_COL, createUnion(create(Type.NULL), 
create(Type.STRING)), null, null),
@@ -248,8 +246,8 @@ public class MultiColumnTextIndicesTest extends 
CustomDataQueryClusterIntegratio
 
     List<String> valueList = List.of("value");
 
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      fileWriter.create(avroSchema, avroFile);
+    try (AvroFilesAndWriters avroFilesAndWriters = 
createAvroFilesAndWriters(avroSchema)) {
+      List<DataFileWriter<GenericData.Record>> writers = 
avroFilesAndWriters.getWriters();
       for (int i = 0; i < NUM_RECORDS; i++) {
         GenericData.Record record = new GenericData.Record(avroSchema);
         record.put(NULLABLE_TEXT_COL, (i & 1) == 1 ? null : "value");
@@ -264,10 +262,10 @@ public class MultiColumnTextIndicesTest extends 
CustomDataQueryClusterIntegratio
         record.put(DICT_TEXT_COL_CASE_SENSITIVE_MV, List.of(skills.get(i % 
NUM_SKILLS), "" + i));
         record.put(TEXT_COL_NATIVE, skills.get(i % NUM_SKILLS));
         record.put(TIME_COL, System.currentTimeMillis());
-        fileWriter.append(record);
+        writers.get(i % getNumAvroFiles()).append(record);
       }
+      return avroFilesAndWriters.getAvroFiles();
     }
-    return List.of(avroFile);
   }
 
   @Override
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java
index 2687f0529d9..7bd54a4d195 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java
@@ -24,10 +24,8 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.List;
-import java.util.Random;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -81,17 +79,14 @@ public class SumPrecisionTest extends 
CustomDataQueryClusterIntegrationTest {
         new org.apache.avro.Schema.Field(MET_LONG, 
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG),
             null, null)));
 
-    // create avro file
-    File avroFile = new File(_tempDir, "data.avro");
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      fileWriter.create(avroSchema, avroFile);
-      int dimCardinality = 50;
-      BigDecimal bigDecimalBase = BigDecimal.valueOf(Integer.MAX_VALUE + 1L);
-      Random random = new Random();
+    int dimCardinality = 50;
+    BigDecimal bigDecimalBase = BigDecimal.valueOf(Integer.MAX_VALUE + 1L);
+    try (AvroFilesAndWriters avroFilesAndWriters = 
createAvroFilesAndWriters(avroSchema)) {
+      List<DataFileWriter<GenericData.Record>> writers = 
avroFilesAndWriters.getWriters();
       for (int i = 0; i < getCountStarResult(); i++) {
         // create avro record
         GenericData.Record record = new GenericData.Record(avroSchema);
-        record.put(DIM_NAME, "dim" + (random.nextInt() % dimCardinality));
+        record.put(DIM_NAME, "dim" + (RANDOM.nextInt() % dimCardinality));
         BigDecimal bigDecimalValue = bigDecimalBase.add(BigDecimal.valueOf(i));
 
         record.put(MET_BIG_DECIMAL_BYTES, 
ByteBuffer.wrap(BigDecimalUtils.serialize(bigDecimalValue)));
@@ -100,10 +95,10 @@ public class SumPrecisionTest extends 
CustomDataQueryClusterIntegrationTest {
         record.put(MET_LONG, bigDecimalValue.longValue());
 
         // add avro record to file
-        fileWriter.append(record);
+        writers.get(i % getNumAvroFiles()).append(record);
       }
+      return avroFilesAndWriters.getAvroFiles();
     }
-    return List.of(avroFile);
   }
 
   @Override
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
index 09d63a027f2..34d374e4a70 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
@@ -33,7 +33,6 @@ import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -185,7 +184,6 @@ public class TextIndicesTest extends 
CustomDataQueryClusterIntegrationTest {
     }
     assertEquals(skills.size(), NUM_SKILLS);
 
-    File avroFile = new File(_tempDir, "data.avro");
     org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
     avroSchema.setFields(Arrays.asList(
         new Field(TEXT_COLUMN_NULL_NAME, createUnion(create(Type.NULL), 
create(Type.STRING)), null, null),
@@ -197,8 +195,8 @@ public class TextIndicesTest extends 
CustomDataQueryClusterIntegrationTest {
             create(Type.STRING), null, null),
         new Field(TIME_COLUMN_NAME,
             create(Type.LONG), null, null)));
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      fileWriter.create(avroSchema, avroFile);
+    try (AvroFilesAndWriters avroFilesAndWriters = 
createAvroFilesAndWriters(avroSchema)) {
+      List<DataFileWriter<GenericData.Record>> writers = 
avroFilesAndWriters.getWriters();
       for (int i = 0; i < NUM_RECORDS; i++) {
         GenericData.Record record = new GenericData.Record(avroSchema);
         record.put(TEXT_COLUMN_NULL_NAME, i % 2 == 0 ? null : skills.get(i % 
NUM_SKILLS));
@@ -206,10 +204,10 @@ public class TextIndicesTest extends 
CustomDataQueryClusterIntegrationTest {
         record.put(TEXT_COLUMN_NAME_CASE_SENSITIVE, skills.get(i % 
NUM_SKILLS));
         record.put(TEXT_COLUMN_NAME_NATIVE, skills.get(i % NUM_SKILLS));
         record.put(TIME_COLUMN_NAME, System.currentTimeMillis());
-        fileWriter.append(record);
+        writers.get(i % getNumAvroFiles()).append(record);
       }
+      return avroFilesAndWriters.getAvroFiles();
     }
-    return List.of(avroFile);
   }
 
   @Override
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
index 43c5a79ee0d..04f2964de89 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.datasketches.theta.UpdateSketch;
@@ -99,13 +98,11 @@ public class ThetaSketchTest extends 
CustomDataQueryClusterIntegrationTest {
         new org.apache.avro.Schema.Field(THETA_SKETCH, 
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES),
             null, null)));
 
-    // create avro file
-    File avroFile = new File(_tempDir, "data.avro");
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      fileWriter.create(avroSchema, avroFile);
-
-      int studentId = 0;
-      int cardinality = 50;
+    int studentId = 0;
+    int cardinality = 50;
+    int recordId = 0;
+    try (AvroFilesAndWriters avroFilesAndWriters = 
createAvroFilesAndWriters(avroSchema)) {
+      List<DataFileWriter<GenericData.Record>> writers = 
avroFilesAndWriters.getWriters();
       for (int shardId = 0; shardId < 2; shardId++) {
 
         // populate student-course data (studentId, gender, course) for this 
shard id
@@ -142,7 +139,8 @@ public class ThetaSketchTest extends 
CustomDataQueryClusterIntegrationTest {
           record.put(THETA_SKETCH, 
ByteBuffer.wrap(sketch.compact().toByteArray()));
 
           // add avro record to file
-          fileWriter.append(record);
+          writers.get(recordId % getNumAvroFiles()).append(record);
+          recordId++;
         }
 
         // [course dimension] calculate theta sketches & add them to avro file
@@ -164,12 +162,12 @@ public class ThetaSketchTest extends 
CustomDataQueryClusterIntegrationTest {
           record.put(THETA_SKETCH, 
ByteBuffer.wrap(sketch.compact().toByteArray()));
 
           // add avro record to file
-          fileWriter.append(record);
+          writers.get(recordId % getNumAvroFiles()).append(record);
+          recordId++;
         }
       }
+      return avroFilesAndWriters.getAvroFiles();
     }
-
-    return List.of(avroFile);
   }
 
   @Test(dataProvider = "useV1QueryEngine")
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java
index b86f384bc46..f218edf6423 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java
@@ -27,7 +27,6 @@ import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.pinot.common.function.DateTimeUtils;
 import org.apache.pinot.common.function.scalar.DateTimeFunctions;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -89,8 +88,8 @@ public class TimestampTest extends 
CustomDataQueryClusterIntegrationTest {
       throws Exception {
     setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String query =
-        String.format("SELECT tsBase, tsHalfDayAfter, 
longBase,longHalfDayAfter FROM %s LIMIT %d", getTableName(),
-            getCountStarResult());
+        String.format("SELECT tsBase, tsHalfDayAfter, 
longBase,longHalfDayAfter FROM %s ORDER BY tsBase LIMIT %d",
+            getTableName(), getCountStarResult());
     JsonNode jsonNode = postQuery(query);
     long expectedTsBase = DateTimeFunctions.fromDateTime("2019-01-01 
00:00:00", "yyyy-MM-dd HH:mm:ss");
     long expectedTsHalfDayAfter = DateTimeFunctions.fromDateTime("2019-01-01 
12:00:00", "yyyy-MM-dd HH:mm:ss");
@@ -489,12 +488,10 @@ public class TimestampTest extends 
CustomDataQueryClusterIntegrationTest {
         new Field(YYYY_MM_DD_ONE_YEAR_AFTER, create(Type.STRING), null, null)
     ));
 
-    // create avro file
-    File avroFile = new File(_tempDir, "data.avro");
     ISOChronology chronology = ISOChronology.getInstanceUTC();
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      fileWriter.create(avroSchema, avroFile);
-      long tsBaseLong = DateTimeFunctions.fromDateTime("2019-01-01 00:00:00", 
"yyyy-MM-dd HH:mm:ss");
+    long tsBaseLong = DateTimeFunctions.fromDateTime("2019-01-01 00:00:00", 
"yyyy-MM-dd HH:mm:ss");
+    try (AvroFilesAndWriters avroFilesAndWriters = 
createAvroFilesAndWriters(avroSchema)) {
+      List<DataFileWriter<GenericData.Record>> writers = 
avroFilesAndWriters.getWriters();
       for (int i = 0; i < getCountStarResult(); i++) {
         // Generate data
         long tsHalfDayAfter = DateTimeUtils.getTimestampField(chronology, 
"HOUR").add(tsBaseLong, 12);
@@ -529,10 +526,10 @@ public class TimestampTest extends 
CustomDataQueryClusterIntegrationTest {
         record.put(YYYY_MM_DD_ONE_YEAR_AFTER, 
DateTimeFunctions.toDateTime(tsOneYearAfter, "yyyy-MM-dd"));
 
         // add avro record to file
-        fileWriter.append(record);
+        writers.get(i % getNumAvroFiles()).append(record);
         tsBaseLong += 86400000;
       }
+      return avroFilesAndWriters.getAvroFiles();
     }
-    return List.of(avroFile);
   }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
index dac31032cd2..e7408295f11 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
@@ -23,10 +23,8 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Base64;
 import java.util.List;
-import java.util.Random;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.datasketches.tuple.Intersection;
 import org.apache.datasketches.tuple.Sketch;
 import org.apache.datasketches.tuple.aninteger.IntegerSketch;
@@ -289,21 +287,18 @@ public class TupleSketchTest extends 
CustomDataQueryClusterIntegrationTest {
         new org.apache.avro.Schema.Field(MET_TUPLE_SKETCH_BYTES, 
org.apache.avro.Schema.create(
             org.apache.avro.Schema.Type.BYTES), null, null)));
 
-    // create avro file
-    File avroFile = new File(_tempDir, "data.avro");
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      fileWriter.create(avroSchema, avroFile);
-      Random random = new Random();
+    try (AvroFilesAndWriters avroFilesAndWriters = 
createAvroFilesAndWriters(avroSchema)) {
+      List<DataFileWriter<GenericData.Record>> writers = 
avroFilesAndWriters.getWriters();
       for (int i = 0; i < getCountStarResult(); i++) {
         // create avro record
         GenericData.Record record = new GenericData.Record(avroSchema);
-        record.put(ID, random.nextInt(10));
+        record.put(ID, RANDOM.nextInt(10));
         record.put(MET_TUPLE_SKETCH_BYTES, 
ByteBuffer.wrap(getRandomRawValue()));
         // add avro record to file
-        fileWriter.append(record);
+        writers.get(i % getNumAvroFiles()).append(record);
       }
+      return avroFilesAndWriters.getAvroFiles();
     }
-    return List.of(avroFile);
   }
 
   private byte[] getRandomRawValue() {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java
index 0c294e7de8e..18fd9296ba2 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java
@@ -24,10 +24,8 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Base64;
 import java.util.List;
-import java.util.Random;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.segment.local.utils.UltraLogLogUtils;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -55,7 +53,7 @@ public class ULLTest extends 
CustomDataQueryClusterIntegrationTest {
       throws Exception {
     setUseMultiStageQueryEngine(useMultiStageQueryEngine);
     String query = String.format("SELECT DISTINCT_COUNT_ULL(%s), 
DISTINCT_COUNT_RAW_ULL(%s) FROM %s",
-       COLUMN, COLUMN, getTableName());
+        COLUMN, COLUMN, getTableName());
     JsonNode jsonNode = postQuery(query);
     long distinctCount = 
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
     byte[] rawSketchBytes = 
Base64.getDecoder().decode(jsonNode.get("resultTable").get("rows").get(0).get(1).asText());
@@ -69,13 +67,18 @@ public class ULLTest extends 
CustomDataQueryClusterIntegrationTest {
   public void testUnionWithSketchQueries(boolean useMultiStageQueryEngine)
       throws Exception {
     setUseMultiStageQueryEngine(useMultiStageQueryEngine);
-
     String query = String.format(
-        "SELECT " + "DISTINCT_COUNT_ULL(%s), " + "DISTINCT_COUNT_RAW_ULL(%s) " 
+ "FROM " + "("
-            + "SELECT %s FROM %s WHERE %s = 4 " + "UNION ALL " + "SELECT %s 
FROM %s WHERE %s = 5 " + "UNION ALL "
-            + "SELECT %s FROM %s WHERE %s = 6 " + "UNION ALL " + "SELECT %s 
FROM %s WHERE %s = 7 " + ")",
-        COLUMN, COLUMN, COLUMN, getTableName(), ID, COLUMN,
-        getTableName(), ID, COLUMN, getTableName(), ID, COLUMN, 
getTableName(), ID);
+        "SELECT " + "DISTINCT_COUNT_ULL(%s), " + "DISTINCT_COUNT_RAW_ULL(%s) "
+            + "FROM " + "("
+            + "SELECT %s FROM %s WHERE %s = 4 " + "UNION ALL "
+            + "SELECT %s FROM %s WHERE %s = 5 " + "UNION ALL "
+            + "SELECT %s FROM %s WHERE %s = 6 " + "UNION ALL "
+            + "SELECT %s FROM %s WHERE %s = 7 " + ")",
+        COLUMN, COLUMN,
+        COLUMN, getTableName(), ID,
+        COLUMN, getTableName(), ID,
+        COLUMN, getTableName(), ID,
+        COLUMN, getTableName(), ID);
     JsonNode jsonNode = postQuery(query);
     long distinctCount = 
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
     byte[] rawSketchBytes = 
Base64.getDecoder().decode(jsonNode.get("resultTable").get("rows").get(0).get(1).asText());
@@ -132,21 +135,18 @@ public class ULLTest extends 
CustomDataQueryClusterIntegrationTest {
             null), new org.apache.avro.Schema.Field(COLUMN,
             org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES), 
null, null)));
 
-    // create avro file
-    File avroFile = new File(_tempDir, "data.avro");
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      Random random = new Random();
-      fileWriter.create(avroSchema, avroFile);
+    try (AvroFilesAndWriters avroFilesAndWriters = 
createAvroFilesAndWriters(avroSchema)) {
+      List<DataFileWriter<GenericData.Record>> writers = 
avroFilesAndWriters.getWriters();
       for (int i = 0; i < getCountStarResult(); i++) {
         // create avro record
         GenericData.Record record = new GenericData.Record(avroSchema);
-        record.put(ID, random.nextInt(10));
+        record.put(ID, RANDOM.nextInt(10));
         record.put(COLUMN, ByteBuffer.wrap(getRandomRawValue()));
         // add avro record to file
-        fileWriter.append(record);
+        writers.get(i % getNumAvroFiles()).append(record);
       }
+      return avroFilesAndWriters.getAvroFiles();
     }
-    return List.of(avroFile);
   }
 
   private byte[] getRandomRawValue() {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/UnnestIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/UnnestIntegrationTest.java
index 1587aa382a9..87cd6f33ac2 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/UnnestIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/UnnestIntegrationTest.java
@@ -19,13 +19,10 @@
 package org.apache.pinot.integration.tests.custom;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 import java.io.File;
 import java.util.List;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
@@ -366,32 +363,25 @@ public class UnnestIntegrationTest extends 
CustomDataQueryClusterIntegrationTest
             null, null)
     ));
 
-    // create avro file
-    File avroFile = new File(_tempDir, "data.avro");
-    Cache<Integer, GenericData.Record> recordCache = 
CacheBuilder.newBuilder().build();
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      fileWriter.create(avroSchema, avroFile);
+    try (AvroFilesAndWriters avroFilesAndWriters = 
createAvroFilesAndWriters(avroSchema)) {
+      List<DataFileWriter<GenericData.Record>> writers = 
avroFilesAndWriters.getWriters();
       for (int i = 0; i < getCountStarResult(); i++) {
+        // create avro record
+        GenericData.Record record = new GenericData.Record(avroSchema);
+        record.put(INT_COLUMN, i);
+        record.put(LONG_COLUMN, i);
+        record.put(FLOAT_COLUMN, i + RANDOM.nextFloat());
+        record.put(DOUBLE_COLUMN, i + RANDOM.nextDouble());
+        record.put(STRING_COLUMN, RandomStringUtils.insecure().next(i));
+        record.put(TIMESTAMP_COLUMN, i);
+        record.put(GROUP_BY_COLUMN, String.valueOf(i % 10));
+        record.put(LONG_ARRAY_COLUMN, List.of(0, 1, 2, 3));
+        record.put(DOUBLE_ARRAY_COLUMN, List.of(0.0, 0.1, 0.2, 0.3));
+        record.put(STRING_ARRAY_COLUMN, List.of("a", "b", "c"));
         // add avro record to file
-        int finalI = i;
-        fileWriter.append(recordCache.get(i, () -> {
-              // create avro record
-              GenericData.Record record = new GenericData.Record(avroSchema);
-              record.put(INT_COLUMN, finalI);
-              record.put(LONG_COLUMN, finalI);
-              record.put(FLOAT_COLUMN, finalI + RANDOM.nextFloat());
-              record.put(DOUBLE_COLUMN, finalI + RANDOM.nextDouble());
-              record.put(STRING_COLUMN, 
RandomStringUtils.insecure().next(finalI));
-              record.put(TIMESTAMP_COLUMN, finalI);
-              record.put(GROUP_BY_COLUMN, String.valueOf(finalI % 10));
-              record.put(LONG_ARRAY_COLUMN, List.of(0, 1, 2, 3));
-              record.put(DOUBLE_ARRAY_COLUMN, List.of(0.0, 0.1, 0.2, 0.3));
-              record.put(STRING_ARRAY_COLUMN, List.of("a", "b", "c"));
-              return record;
-            }
-        ));
+        writers.get(i % getNumAvroFiles()).append(record);
       }
+      return avroFilesAndWriters.getAvroFiles();
     }
-    return List.of(avroFile);
   }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java
index 2cbee3a2c2e..5f9b0a25d02 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java
@@ -24,11 +24,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.stream.IntStream;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.function.scalar.VectorFunctions;
 import org.apache.pinot.spi.config.table.FieldConfig;
@@ -291,10 +289,8 @@ public class VectorTest extends 
CustomDataQueryClusterIntegrationTest {
             null, null)
     ));
 
-    // create avro file
-    File avroFile = new File(_tempDir, "data.avro");
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      fileWriter.create(avroSchema, avroFile);
+    try (AvroFilesAndWriters avroFilesAndWriters = 
createAvroFilesAndWriters(avroSchema)) {
+      List<DataFileWriter<GenericData.Record>> writers = 
avroFilesAndWriters.getWriters();
       for (int i = 0; i < getCountStarResult(); i++) {
         // create avro record
         GenericData.Record record = new GenericData.Record(avroSchema);
@@ -316,10 +312,10 @@ public class VectorTest extends 
CustomDataQueryClusterIntegrationTest {
         record.put(VECTOR_ZERO_L2_DIST, VectorFunctions.l2Distance(vector1, 
zeroVector));
 
         // add avro record to file
-        fileWriter.append(record);
+        writers.get(i % getNumAvroFiles()).append(record);
       }
+      return avroFilesAndWriters.getAvroFiles();
     }
-    return List.of(avroFile);
   }
 
   private float[] createZeroVector(int vectorDimSize) {
@@ -330,9 +326,8 @@ public class VectorTest extends 
CustomDataQueryClusterIntegrationTest {
 
   private float[] createRandomVector(int vectorDimSize) {
     float[] vector = new float[vectorDimSize];
-    Random random = new Random();
     for (int i = 0; i < vectorDimSize; i++) {
-      vector[i] = random.nextFloat();
+      vector[i] = RANDOM.nextFloat();
     }
     return vector;
   }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
index cd93002c02d..612847e0270 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import java.io.File;
 import java.util.List;
-import java.util.Random;
 import org.apache.pinot.integration.tests.window.utils.WindowFunnelUtils;
 import org.apache.pinot.spi.data.Schema;
 import org.testng.annotations.Test;
@@ -472,7 +471,7 @@ public class WindowFunnelTest extends 
CustomDataQueryClusterIntegrationTest {
   public void testFunnelMatchStepWithMultiThreadsReduce(boolean 
useMultiStageQueryEngine)
       throws Exception {
     setUseMultiStageQueryEngine(useMultiStageQueryEngine);
-    int numThreadsExtractFinalResult = 2 + new Random().nextInt(10);
+    int numThreadsExtractFinalResult = 2 + RANDOM.nextInt(10);
     LOGGER.info("Running testFunnelMatchStepWithMultiThreadsReduce with 
numThreadsExtractFinalResult: {}",
         numThreadsExtractFinalResult);
     String query =
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/tpch/generator/TPCHQueryGeneratorV2.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/tpch/generator/TPCHQueryGeneratorV2.java
index 678dee8a3ee..02dd39d0778 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/tpch/generator/TPCHQueryGeneratorV2.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/tpch/generator/TPCHQueryGeneratorV2.java
@@ -30,6 +30,7 @@ import org.codehaus.jettison.json.JSONException;
 
 
 public class TPCHQueryGeneratorV2 {
+  private static final Random RANDOM = new Random();
   private static final Map<String, Table> TABLES_MAP = new HashMap<>();
   private static final List<String> TABLE_NAMES =
       List.of("nation", "region", "supplier", "customer", "part", "partsupp", 
"orders", "lineitem");
@@ -48,8 +49,7 @@ public class TPCHQueryGeneratorV2 {
   }
 
   private static Table getRandomTable() {
-    Random random = new Random();
-    int index = random.nextInt(TABLES_MAP.size());
+    int index = RANDOM.nextInt(TABLES_MAP.size());
     return TABLES_MAP.get(TABLE_NAMES.get(index));
   }
 
@@ -138,12 +138,11 @@ public class TPCHQueryGeneratorV2 {
   }
 
   private List<String> getRandomProjections(Table t1) {
-    Random random = new Random();
-    int numColumns = random.nextInt(t1.getColumns().size()) + 1;
+    int numColumns = RANDOM.nextInt(t1.getColumns().size()) + 1;
     List<String> selectedColumns = new ArrayList<>();
 
     while (selectedColumns.size() < numColumns) {
-      String columnName = 
t1.getColumns().get(random.nextInt(t1.getColumns().size())).getColumnName();
+      String columnName = 
t1.getColumns().get(RANDOM.nextInt(t1.getColumns().size())).getColumnName();
       if (!selectedColumns.contains(columnName)) {
         selectedColumns.add(columnName);
       }
@@ -155,12 +154,11 @@ public class TPCHQueryGeneratorV2 {
   private String generateInnerQueryForPredicate(Table t1, Column c) {
     QuerySkeleton innerQuery = new QuerySkeleton();
 
-    Random random = new Random();
     List<String> predicates = new ArrayList<>();
     innerQuery.addTable(t1.getTableName());
     // Limit to maximum of 1 join
-    if (random.nextBoolean()) {
-      RelatedTable relatedTable = 
t1.getRelatedTables().get(random.nextInt(t1.getRelatedTables().size()));
+    if (RANDOM.nextBoolean()) {
+      RelatedTable relatedTable = 
t1.getRelatedTables().get(RANDOM.nextInt(t1.getRelatedTables().size()));
       if (relatedTable != null) {
         innerQuery.addTable(relatedTable.getForeignTableName());
         predicates.add(relatedTable.getLocalTableKey() + "=" + 
relatedTable.getForeignTableKey());
@@ -169,7 +167,7 @@ public class TPCHQueryGeneratorV2 {
         predicates.addAll(inp);
       }
     }
-    String aggregation = 
c.getColumnType()._aggregations.get(random.nextInt(c.getColumnType()._aggregations.size()));
+    String aggregation = 
c.getColumnType()._aggregations.get(RANDOM.nextInt(c.getColumnType()._aggregations.size()));
     innerQuery.addProjection(aggregation + "(" + c.getColumnName() + ")");
 
     List<String> inp = getRandomPredicates(t1, false);
@@ -181,8 +179,7 @@ public class TPCHQueryGeneratorV2 {
   }
 
   private String getRandomValueForPredicate(Table t1, Column c, boolean 
useNextedQueries) {
-    Random random = new Random();
-    if (random.nextBoolean() && useNextedQueries && 
c.getColumnType()._aggregations.size() > 0) {
+    if (RANDOM.nextBoolean() && useNextedQueries && 
c.getColumnType()._aggregations.size() > 0) {
       // Use nested query for predicate
       String nestedQueries = generateInnerQueryForPredicate(t1, c);
       return "(" + nestedQueries + ")";
@@ -196,15 +193,14 @@ public class TPCHQueryGeneratorV2 {
   }
 
   private List<String> getRandomPredicates(Table t1, boolean useNestedQueries) 
{
-    Random random = new Random();
-    int predicateCount = random.nextInt(5) + 1;
+    int predicateCount = RANDOM.nextInt(5) + 1;
     List<String> predicates = new ArrayList<>();
     List<String> results = new ArrayList<>();
     while (predicates.size() < predicateCount) {
-      Column column = 
t1.getColumns().get(random.nextInt(t1.getColumns().size()));
+      Column column = 
t1.getColumns().get(RANDOM.nextInt(t1.getColumns().size()));
       predicates.add(column.getColumnName());
       ColumnType columnType = column.getColumnType();
-      String operator = 
columnType._operators.get(random.nextInt(columnType._operators.size()));
+      String operator = 
columnType._operators.get(RANDOM.nextInt(columnType._operators.size()));
       String value = getRandomValueForPredicate(t1, column, useNestedQueries);
       String predicateBuilder = column.getColumnName() + " " + operator + " " 
+ value + " ";
       results.add(predicateBuilder);
@@ -218,17 +214,16 @@ public class TPCHQueryGeneratorV2 {
   }
 
   private List<String> getRandomOrderBys(Table t1) {
-    Random random = new Random();
-    int orderByCount = random.nextInt(2) + 1;
+    int orderByCount = RANDOM.nextInt(2) + 1;
     List<String> orderBys = new ArrayList<>();
     List<String> results = new ArrayList<>();
     while (orderBys.size() < orderByCount) {
-      Column column = 
t1.getColumns().get(random.nextInt(t1.getColumns().size()));
+      Column column = 
t1.getColumns().get(RANDOM.nextInt(t1.getColumns().size()));
       orderBys.add(column.getColumnName());
       String name = column.getColumnName();
       StringBuilder orderByBuilder = new StringBuilder();
       orderByBuilder.append(name).append(" ");
-      if (random.nextBoolean()) {
+      if (RANDOM.nextBoolean()) {
         orderByBuilder.append(" DESC ");
       }
       results.add(orderByBuilder.toString());
@@ -261,15 +256,14 @@ public class TPCHQueryGeneratorV2 {
     if (groupByCols.size() == 0) {
       return result;
     }
-    Random random = new Random();
     List<String> orderBys = new ArrayList<>();
-    int orderByCount = random.nextInt(groupByCols.size()) + 1;
+    int orderByCount = RANDOM.nextInt(groupByCols.size()) + 1;
     while (orderBys.size() < orderByCount) {
-      String column = groupByCols.get(random.nextInt(groupByCols.size()));
+      String column = groupByCols.get(RANDOM.nextInt(groupByCols.size()));
 
       if (groupByCols.contains(column)) {
         orderBys.add(column);
-        if (random.nextBoolean()) {
+        if (RANDOM.nextBoolean()) {
           result.add(column + " DESC");
         } else {
           result.add(column);
@@ -291,14 +285,13 @@ public class TPCHQueryGeneratorV2 {
       }
     }
 
-    Random random = new Random();
-    RelatedTable rt = 
t1.getRelatedTables().get(random.nextInt(t1.getRelatedTables().size()));
+    RelatedTable rt = 
t1.getRelatedTables().get(RANDOM.nextInt(t1.getRelatedTables().size()));
     Table t2 = TABLES_MAP.get(rt.getForeignTableName());
     getRandomProjections(t1).forEach(querySkeleton::addProjection);
     getRandomProjections(t2).forEach(querySkeleton::addProjection);
 
     String t2NameWithJoin =
-        t1.getTableName() + " " + 
JOIN_TYPES[random.nextInt(JOIN_TYPES.length)] + " " + t2.getTableName() + " ON "
+        t1.getTableName() + " " + 
JOIN_TYPES[RANDOM.nextInt(JOIN_TYPES.length)] + " " + t2.getTableName() + " ON "
             + rt.getLocalTableKey() + " = " + rt.getForeignTableKey() + " ";
     querySkeleton.addTable(t2NameWithJoin);
 
@@ -318,20 +311,19 @@ public class TPCHQueryGeneratorV2 {
   }
 
   private Pair<List<String>, List<String>> getGroupByAndAggregates(Table t1) {
-    Random random = new Random();
-    int numColumns = random.nextInt(t1.getColumns().size()) + 1;
+    int numColumns = RANDOM.nextInt(t1.getColumns().size()) + 1;
     List<String> selectedColumns = new ArrayList<>();
     List<String> groupByColumns = new ArrayList<>();
     List<String> resultProjections = new ArrayList<>();
 
     while (selectedColumns.size() < numColumns) {
-      Column column = 
t1.getColumns().get(random.nextInt(t1.getColumns().size()));
+      Column column = 
t1.getColumns().get(RANDOM.nextInt(t1.getColumns().size()));
       String columnName = column.getColumnName();
       if (!selectedColumns.contains(columnName)) {
-        if (random.nextBoolean() && 
column.getColumnType()._aggregations.size() > 0) {
+        if (RANDOM.nextBoolean() && 
column.getColumnType()._aggregations.size() > 0) {
           // Use as aggregation
           String aggregation =
-              
column.getColumnType()._aggregations.get(random.nextInt(column.getColumnType()._aggregations.size()));
+              
column.getColumnType()._aggregations.get(RANDOM.nextInt(column.getColumnType()._aggregations.size()));
           resultProjections.add(aggregation + "(" + columnName + ")");
         } else {
           // Use as group by
@@ -377,8 +369,7 @@ public class TPCHQueryGeneratorV2 {
       }
     }
 
-    Random random = new Random();
-    RelatedTable rt = 
t1.getRelatedTables().get(random.nextInt(t1.getRelatedTables().size()));
+    RelatedTable rt = 
t1.getRelatedTables().get(RANDOM.nextInt(t1.getRelatedTables().size()));
     Table t2 = TABLES_MAP.get(rt.getForeignTableName());
     Pair<List<String>, List<String>> groupByColumns = 
getGroupByAndAggregates(t1);
     groupByColumns.getLeft().forEach(querySkeleton::addProjection);
@@ -387,7 +378,7 @@ public class TPCHQueryGeneratorV2 {
     groupByColumnsT2.getLeft().forEach(querySkeleton::addProjection);
 
     String tName =
-        t1.getTableName() + "  " + 
JOIN_TYPES[random.nextInt(JOIN_TYPES.length)] + " " + t2.getTableName() + " ON "
+        t1.getTableName() + "  " + 
JOIN_TYPES[RANDOM.nextInt(JOIN_TYPES.length)] + " " + t2.getTableName() + " ON "
             + " " + rt.getLocalTableKey() + " = " + " " + 
rt.getForeignTableKey() + " ";
 
     querySkeleton.addTable(tName);
@@ -415,7 +406,6 @@ public class TPCHQueryGeneratorV2 {
     List<Table> tables = new ArrayList<>();
     Set<String> tableNames = new HashSet<>();
 
-    Random random = new Random();
 
     // Start off with a random table with related tables
     while (true) {
@@ -428,10 +418,10 @@ public class TPCHQueryGeneratorV2 {
     }
 
     // Add more tables
-    while (random.nextInt() % 8 != 0) {
-      int tableToAddIdx = random.nextInt(tables.size());
+    while (RANDOM.nextInt() % 8 != 0) {
+      int tableToAddIdx = RANDOM.nextInt(tables.size());
       RelatedTable relatedTable = tables.get(tableToAddIdx).getRelatedTables()
-          
.get(random.nextInt(tables.get(tableToAddIdx).getRelatedTables().size()));
+          
.get(RANDOM.nextInt(tables.get(tableToAddIdx).getRelatedTables().size()));
       if (!tableNames.contains(relatedTable.getForeignTableName())) {
         tableNames.add(relatedTable.getForeignTableName());
         
tables.add(TPCHQueryGeneratorV2.TABLES_MAP.get(relatedTable.getForeignTableName()));
@@ -475,7 +465,6 @@ public class TPCHQueryGeneratorV2 {
     List<Table> tables = new ArrayList<>();
     Set<String> tableNames = new HashSet<>();
 
-    Random random = new Random();
 
     // Start off with a random table with related tables
     while (true) {
@@ -488,10 +477,10 @@ public class TPCHQueryGeneratorV2 {
     }
 
     // Add more tables
-    while (random.nextInt() % 8 != 0) {
-      int tableToAddIdx = random.nextInt(tables.size());
+    while (RANDOM.nextInt() % 8 != 0) {
+      int tableToAddIdx = RANDOM.nextInt(tables.size());
       RelatedTable relatedTable = tables.get(tableToAddIdx).getRelatedTables()
-          
.get(random.nextInt(tables.get(tableToAddIdx).getRelatedTables().size()));
+          
.get(RANDOM.nextInt(tables.get(tableToAddIdx).getRelatedTables().size()));
       if (!tableNames.contains(relatedTable.getForeignTableName())) {
         tableNames.add(relatedTable.getForeignTableName());
         
tables.add(TPCHQueryGeneratorV2.TABLES_MAP.get(relatedTable.getForeignTableName()));
@@ -529,9 +518,8 @@ public class TPCHQueryGeneratorV2 {
   }
 
   public String generateRandomQuery() {
-    Random random = new Random();
-    int queryType = random.nextInt(6);
-    boolean includePredicates = random.nextBoolean();
+    int queryType = RANDOM.nextInt(6);
+    boolean includePredicates = RANDOM.nextBoolean();
     boolean includeOrderBy = true;
     switch (queryType) {
       case 0:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to