This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b2760be562e Add benchmark for equi-join queries with various 
configurations (#16825)
b2760be562e is described below

commit b2760be562eac82ec7e5ca2eff06e357ad2a0ffe
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Fri Sep 26 17:07:19 2025 +0200

    Add benchmark for equi-join queries with various configurations (#16825)
---
 .../org/apache/pinot/perf/BenchmarkEquiJoin.java   | 316 +++++++++++++++++++++
 .../java/org/apache/pinot/perf/Distribution.java   |   3 +-
 .../src/main/resources/log4j2-test.properties      |  53 ++++
 3 files changed, 370 insertions(+), 2 deletions(-)

diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkEquiJoin.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkEquiJoin.java
new file mode 100644
index 00000000000..507e26aea41
--- /dev/null
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkEquiJoin.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
+import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.intellij.lang.annotations.Language;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.profile.GCProfiler;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(1)
+@Warmup(iterations = 10, time = 1)
+@Measurement(iterations = 5, time = 1)
+@State(Scope.Benchmark)
+public class BenchmarkEquiJoin extends BaseClusterIntegrationTest {
+
+  public static void main(String[] args)
+      throws Exception {
+    ChainedOptionsBuilder opt = new OptionsBuilder()
+        .include(BenchmarkEquiJoin.class.getSimpleName())
+        .addProfiler(GCProfiler.class);
+    new Runner(opt.build()).run();
+  }
+
+  /// Number of rows on each segment
+  private int _rowsPerSegment = 100_000;
+  @Param({"10"})
+  private int _segments;
+
+  /// The ratio (in the range of `(0, 1]`) of unique rows in the table.
+  ///
+  /// The lower the [#_uRatio], the more rows will match in the join.
+  /// The requested uniqueness will not be guaranteed. Instead it will be 
statistically approached by using a
+  /// random number generator to create the data.
+  //@Param({"1", "0.1", "0.01"})
+  @Param({"1"})
+  private double _uRatio;
+
+  private JsonNode query(String query)
+      throws Exception {
+    JsonNode result = postQuery(query,
+        
ClusterIntegrationTestUtils.getBrokerQueryApiUrl(getBrokerBaseApiUrl(), true),
+        null,
+        getExtraQueryProperties());
+    JsonNode exceptions = result.get("exceptions").get(0);
+    if (exceptions != null) {
+      throw new RuntimeException(exceptions.get("message").asText());
+    }
+    return result.get("resultTable").get("rows");
+  }
+
+  @Override
+  protected Map<String, String> getExtraQueryProperties() {
+    return Map.of("useMultistageEngine", "true", "dropResults", "true");
+  }
+
+  /// Executes a query where [#_rowsPerSegment] left rows are joined by int 
with [#_rowsPerSegment]/100 right rows
+  /// and then counted
+  @Benchmark
+  public JsonNode countJoinOneHundredthInt()
+      throws Exception {
+    @Language("sql")
+    String query = "SET useMultistageEngine=true;"
+        + "SET maxRowsInJoin=1000000000;"
+        + "SELECT count(*) "
+        + "FROM MyTable t1 "
+        + "JOIN MyTable t2 "
+        + "ON t1.intCol = t2.intCol "
+        + "WHERE t2.intCol % 10 = 0";
+    ;
+    return query(query);
+  }
+
+  /// Executes a query where [#_rowsPerSegment] left rows are joined by str 
with [#_rowsPerSegment]/100 right rows
+  /// and then counted
+  @Benchmark
+  public JsonNode countJoinOneHundredthStr()
+      throws Exception {
+    @Language("sql")
+    String query = "SET useMultistageEngine=true;"
+        + "SELECT COUNT(*) "
+        + "FROM MyTable t1 "
+        + "JOIN MyTable t2 "
+        + "ON t1.strCol = t2.strCol "
+        + "WHERE t2.intCol % 100 = 0";
+    return query(query);
+  }
+
+  /// Executes a query where [#_rowsPerSegment] left rows are joined by int 
with [#_rowsPerSegment]/100 right rows
+  /// and then all rows projected
+  @Benchmark
+  public JsonNode projectJoinOneHundredthInt()
+      throws Exception {
+    @Language("sql")
+    String query = "SET useMultistageEngine=true;"
+        + "SELECT * "
+        + "FROM MyTable t1 "
+        + "JOIN MyTable t2 "
+        + "ON t1.intCol = t2.intCol "
+        + "WHERE t2.intCol % 100 = 0";
+    return query(query);
+  }
+
+  /// Executes a query where [#_rowsPerSegment] left rows are joined by str 
with [#_rowsPerSegment]/100 right rows
+  /// and then all rows projected
+  @Benchmark
+  public JsonNode projectJoinOneHundredthStr()
+      throws Exception {
+    @Language("sql")
+    String query = "SET useMultistageEngine=true;"
+        + "SELECT * "
+        + "FROM MyTable t1 "
+        + "JOIN MyTable t2 "
+        + "ON t1.strCol = t2.strCol "
+        + "WHERE t2.intCol % 100 = 0";
+    return query(query);
+  }
+
+  @Setup
+  public void setUp()
+      throws Exception {
+
+    Distribution.DataSupplier supplier = createSupplier();
+
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    CompletableFuture<?> clusterStarted = CompletableFuture.runAsync(() -> {
+      try {
+        // Start the Pinot cluster
+        startZk();
+        startController();
+        startBroker();
+        startServers(2);
+
+        // upload test data
+        addSchema(SCHEMA);
+        addTableConfig(TABLE_CONFIG);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+
+    CompletableFuture[] futures = new CompletableFuture[_segments];
+    for (int i = 0; i < _segments; i++) {
+      futures[i] = buildSegment("segment" + i, supplier);
+    }
+    CompletableFuture.allOf(futures).join();
+
+    clusterStarted.join();
+    uploadSegments(TABLE_NAME, _tarDir);
+
+    //check for data to arrive
+    waitForAllDocsLoaded(60000);
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return _rowsPerSegment * (long) _segments;
+  }
+
+  private CompletableFuture<?> buildSegment(String segmentName, 
Distribution.DataSupplier supplier) {
+    return CompletableFuture.runAsync(() -> {
+      try {
+        LazyDataGenerator rows = createDataGenerator(supplier);
+        SegmentGeneratorConfig config = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+        config.setOutDir(_segmentDir.getPath());
+        config.setTableName(TABLE_NAME);
+        config.setSegmentName(segmentName);
+
+        SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+        try (RecordReader recordReader = new GeneratedDataRecordReader(rows)) {
+          driver.init(config, recordReader);
+          driver.build();
+        }
+        //save generator state so that other segments are not identical to 
this one
+        supplier.snapshot();
+
+        // Tar the segment
+        File indexDir = new File(_segmentDir, segmentName);
+        File segmentTarFile = new File(_tarDir, segmentName + 
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+        TarCompressionUtils.createCompressedTarFile(indexDir, segmentTarFile);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  private LazyDataGenerator createDataGenerator(Distribution.DataSupplier 
supplier) {
+    return new LazyDataGenerator() {
+      Distribution.DataSupplier _supplier = supplier.clone();
+
+      @Override
+      public int size() {
+        return _rowsPerSegment;
+      }
+
+      @Override
+      public GenericRow next(GenericRow row, int index) {
+        long randomValue = _supplier.getAsLong();
+        row.putValue(INT_COL_NAME, (int) randomValue);
+        row.putValue(LONG_COL_NAME, randomValue);
+        row.putValue(STRING_COL_NAME, "value" + randomValue);
+        return row;
+      }
+
+      @Override
+      public void rewind() {
+        _supplier = supplier.clone();
+      }
+    };
+  }
+
+  private Distribution.DataSupplier createSupplier() {
+    int actualTableRows = _rowsPerSegment * _segments;
+    Preconditions.checkState(_uRatio > 0 && _uRatio <= 1.0, "_uRatio must be 
in the range of (0, 1]");
+    int uniqueRows = _uRatio == 1 ? actualTableRows : (int) (actualTableRows * 
_uRatio);
+
+    return Distribution.UNIFORM.createSupplier(42, 0, uniqueRows);
+  }
+
+  @TearDown
+  public void tearDown()
+      throws IOException {
+    stopServer();
+    stopBroker();
+    stopController();
+    stopZk();
+    FileUtils.deleteQuietly(_tempDir);
+  }
+
+  @Override
+  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+    super.overrideBrokerConf(brokerConf);
+    
brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
 true);
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    super.overrideServerConf(serverConf);
+    
serverConf.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
 true);
+  }
+
+  private static final String TABLE_NAME = "MyTable";
+  private static final String INT_COL_NAME = "intCol";
+  private static final String LONG_COL_NAME = "longCol";
+  private static final String STRING_COL_NAME = "strCol";
+  private static final List<FieldConfig> FIELD_CONFIGS = new ArrayList<>();
+
+  private static final TableConfig TABLE_CONFIG = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+      .setInvertedIndexColumns(List.of(INT_COL_NAME, LONG_COL_NAME, 
STRING_COL_NAME))
+      .setFieldConfigList(FIELD_CONFIGS)
+      .build();
+
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+      .addSingleValueDimension(INT_COL_NAME, FieldSpec.DataType.INT)
+      .addSingleValueDimension(LONG_COL_NAME, FieldSpec.DataType.LONG)
+      .addSingleValueDimension(STRING_COL_NAME, FieldSpec.DataType.STRING)
+      .build();
+}
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/Distribution.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/Distribution.java
index 8902e281717..406ea67cd7a 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/Distribution.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/Distribution.java
@@ -114,8 +114,7 @@ public enum Distribution {
     }
 
     @Override
-    protected Object clone()
-    throws CloneNotSupportedException {
+    protected DataSupplier clone() {
       return new DataSupplier(clone(_random), _generator);
     }
 
diff --git a/pinot-perf/src/main/resources/log4j2-test.properties 
b/pinot-perf/src/main/resources/log4j2-test.properties
new file mode 100644
index 00000000000..2cb8ada276f
--- /dev/null
+++ b/pinot-perf/src/main/resources/log4j2-test.properties
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+status = error
+name = PropertiesConfig
+
+# Appenders
+appender.console.type = Console
+appender.console.name = console
+appender.console.target = SYSTEM_OUT
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy/MM/dd HH:mm:ss.SSS} %p [%c{1}] [%t] 
%m%n
+
+appender.controllerLog.type = RandomAccessFile
+appender.controllerLog.name = controllerLog
+appender.controllerLog.fileName = pinotController.log
+appender.controllerLog.immediateFlush = false
+appender.controllerLog.layout.type = PatternLayout
+appender.controllerLog.layout.pattern = %d{yyyy/MM/dd HH:mm:ss.SSS} %p [%c{1}] 
[%t] %m%n
+
+appender.brokerLog.type = RandomAccessFile
+appender.brokerLog.name = brokerLog
+appender.brokerLog.fileName = pinotBroker.log
+appender.brokerLog.immediateFlush = false
+appender.brokerLog.layout.type = PatternLayout
+appender.brokerLog.layout.pattern = %d{yyyy/MM/dd HH:mm:ss.SSS} %p [%c{1}] 
[%t] %m%n
+
+appender.serverLog.type = RandomAccessFile
+appender.serverLog.name = serverLog
+appender.serverLog.fileName = pinotServer.log
+appender.serverLog.immediateFlush = false
+appender.serverLog.layout.type = PatternLayout
+appender.serverLog.layout.pattern = %d{yyyy/MM/dd HH:mm:ss.SSS} %p [%c{1}] 
[%t] %m%n
+
+# Loggers
+rootLogger.level = error
+rootLogger.appenderRef.console.ref = console


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to