pingtimeout commented on code in PR #21:
URL: https://github.com/apache/polaris-tools/pull/21#discussion_r2131657209


##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/TableActions.scala:
##########
@@ -184,6 +184,7 @@ case class TableActions(
     http("Fetch Table")
       
.get("/api/catalog/v1/#{catalogName}/namespaces/#{multipartNamespace}/tables/#{tableName}")
       .header("Authorization", "Bearer #{accessToken}")
+      .header("If-None-Match", "")

Review Comment:
   Is it possible this is a leftover from the previous discussion about 
disabling caching?



##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.benchmarks.parameters
+
+import com.typesafe.config.Config
+import com.typesafe.scalalogging.Logger
+import org.slf4j.LoggerFactory
+
+import scala.jdk.CollectionConverters._
+import scala.collection.immutable.LazyList
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+/**
+ * Case class to hold the parameters for the WeightedWorkloadOnTreeDataset 
simulation.
+ *
+ * @param seed The RNG seed to use
+ * @param readers A seq of distrbutions to use for reading tables
+ * @param writers A seq of distrbutions to use for writing to tables
+ */
+case class WeightedWorkloadOnTreeDatasetParameters(
+    seed: Int,
+    readers: Seq[Distribution],
+    writers: Seq[Distribution],
+    durationInMinutes: Int
+) {
+  require(readers.nonEmpty || writers.nonEmpty, "At least one reader or writer 
is required")
+  require(durationInMinutes > 0, "Duration in minutes must be positive")
+}
+
+object WeightedWorkloadOnTreeDatasetParameters {
+  def loadDistributionsList(config: Config, key: String): List[Distribution] =
+    config.getConfigList(key).asScala.toList.map { conf =>
+      Distribution(
+        count = conf.getInt("count"),
+        mean = conf.getDouble("mean"),
+        variance = conf.getDouble("variance")
+      )
+    }
+}
+
+case class Distribution(count: Int, mean: Double, variance: Double) {
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  def printDescription(dataset: DatasetParameters): Unit = {
+    println(s"Summary for ${this}:")
+
+    // Visualize distributions
+    printVisualization(dataset.maxPossibleTables)
+
+    // Warn if a large amount of resampling will be needed
+    val debugRandomNumberProvider = RandomNumberProvider("debug".hashCode, -1)

Review Comment:
   It would be better if the seed used for the RNG was the one configured in 
the `.conf` file.  Not a big deal though as if a distribution is bad, then most 
likely it will be bad regardless of the seed.  But still something that might 
be considered to follow the principle of least surprise



##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.benchmarks.simulations
+
+import io.gatling.core.Predef._
+import io.gatling.core.structure.ScenarioBuilder
+import io.gatling.http.Predef._
+import org.apache.polaris.benchmarks.actions._
+import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config
+import org.apache.polaris.benchmarks.parameters.{
+  ConnectionParameters,
+  DatasetParameters,
+  Distribution,
+  RandomNumberProvider,
+  WorkloadParameters
+}
+import org.slf4j.LoggerFactory
+
+import java.util.concurrent.atomic.AtomicReference
+import scala.concurrent.duration._
+
+/**
+ * This simulation performs reads and writes based on distributions specified 
in the config. It
+ * allows the simulation of workloads where e.g. a small fraction of tables 
get most writes. It is
+ * intended to be used against a Polaris instance with a pre-existing tree 
dataset.
+ */
+class WeightedWorkloadOnTreeDataset extends Simulation {
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  // 
--------------------------------------------------------------------------------
+  // Load parameters
+  // 
--------------------------------------------------------------------------------
+  val cp: ConnectionParameters = config.connectionParameters
+  val dp: DatasetParameters = config.datasetParameters
+  val wp: WorkloadParameters = config.workloadParameters
+
+  println("### Reader distributions ###")
+  wp.weightedWorkloadOnTreeDataset.readers.foreach(_.printDescription(dp))
+
+  println("### Writer distributions ###")
+  wp.weightedWorkloadOnTreeDataset.writers.foreach(_.printDescription(dp))
+
+  // 
--------------------------------------------------------------------------------
+  // Helper values
+  // 
--------------------------------------------------------------------------------
+  private val accessToken: AtomicReference[String] = new AtomicReference()
+
+  private val authActions = AuthenticationActions(cp, accessToken)
+  private val tblActions = TableActions(dp, wp, accessToken)
+
+  // 
--------------------------------------------------------------------------------
+  // Authentication related workloads
+  // 
--------------------------------------------------------------------------------
+  val refreshOauthForDuration: ScenarioBuilder =
+    scenario("Authenticate every minute using the Iceberg REST API")

Review Comment:
   nit: the name says "every minute" but the pause is 30s



##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala:
##########
@@ -85,8 +86,9 @@ case class AuthenticationActions(
         .check(jsonPath("$.access_token").saveAs("accessToken"))
     )
       .exec { session =>
-        if (session.contains("accessToken"))
+        if (session.contains("accessToken") && session("accessToken") != null) 
{

Review Comment:
   This should not be possible.  See the comment in the simulation class.



##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.benchmarks.simulations
+
+import io.gatling.core.Predef._
+import io.gatling.core.structure.ScenarioBuilder
+import io.gatling.http.Predef._
+import org.apache.polaris.benchmarks.actions._
+import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config
+import org.apache.polaris.benchmarks.parameters.{
+  ConnectionParameters,
+  DatasetParameters,
+  Distribution,
+  RandomNumberProvider,
+  WorkloadParameters
+}
+import org.slf4j.LoggerFactory
+
+import java.util.concurrent.atomic.AtomicReference
+import scala.concurrent.duration._
+
+/**
+ * This simulation performs reads and writes based on distributions specified 
in the config. It
+ * allows the simulation of workloads where e.g. a small fraction of tables 
get most writes. It is
+ * intended to be used against a Polaris instance with a pre-existing tree 
dataset.
+ */
+class WeightedWorkloadOnTreeDataset extends Simulation {
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  // 
--------------------------------------------------------------------------------
+  // Load parameters
+  // 
--------------------------------------------------------------------------------
+  val cp: ConnectionParameters = config.connectionParameters
+  val dp: DatasetParameters = config.datasetParameters
+  val wp: WorkloadParameters = config.workloadParameters
+
+  println("### Reader distributions ###")
+  wp.weightedWorkloadOnTreeDataset.readers.foreach(_.printDescription(dp))
+
+  println("### Writer distributions ###")
+  wp.weightedWorkloadOnTreeDataset.writers.foreach(_.printDescription(dp))
+
+  // 
--------------------------------------------------------------------------------
+  // Helper values
+  // 
--------------------------------------------------------------------------------
+  private val accessToken: AtomicReference[String] = new AtomicReference()
+
+  private val authActions = AuthenticationActions(cp, accessToken)
+  private val tblActions = TableActions(dp, wp, accessToken)
+
+  // 
--------------------------------------------------------------------------------
+  // Authentication related workloads
+  // 
--------------------------------------------------------------------------------
+  val refreshOauthForDuration: ScenarioBuilder =
+    scenario("Authenticate every minute using the Iceberg REST API")
+      .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) {
+        feed(authActions.feeder())
+          .exec(authActions.authenticateAndSaveAccessToken)
+          .pause(30.seconds)
+      }
+
+  val waitForAuthentication: ScenarioBuilder =
+    scenario("Wait for the authentication token to be available")
+      .asLongAs(_ => accessToken.get() == null) {
+        pause(1.second)
+      }
+
+  // 
--------------------------------------------------------------------------------
+  // Build up the HTTP protocol configuration and set up the simulation
+  // 
--------------------------------------------------------------------------------
+  private val httpProtocol = http
+    .baseUrl(cp.baseUrl)
+    .acceptHeader("application/json")
+    .contentTypeHeader("application/json")
+    .disableCaching
+
+  // 
--------------------------------------------------------------------------------
+  // Create all reader/writer scenarios and prepare them for injection
+  // 
--------------------------------------------------------------------------------
+  private val readerScenarioBuilders: List[ScenarioBuilder] =
+    wp.weightedWorkloadOnTreeDataset.readers.zipWithIndex.flatMap { case 
(dist, i) =>
+      (0 until dist.count).map { threadId =>
+        val rnp = RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed, 
i * 1000 + threadId)
+        scenario(s"Reader-$i-$threadId")
+          .exec(authActions.restoreAccessTokenInSession)
+          .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) {
+            exec { session =>
+              val tableIndex = dist.sample(dp.maxPossibleTables, rnp)
+              val (catalog, namespace, table) =
+                Distribution.tableIndexToIdentifier(tableIndex, dp)
+
+              // Checked in `fetchTable`
+              val expectedProperties: Map[String, String] = (0 until 
dp.numTableProperties)
+                .map(id => s"InitialAttribute_$id" -> s"$id")
+                .toMap
+              val expectedLocation =
+                
s"${dp.defaultBaseLocation}/$catalog/${namespace.mkString("/")}/${table}"
+
+              session
+                .set("catalogName", catalog)
+                .set("multipartNamespace", 
namespace.mkString(0x1f.toChar.toString))
+                .set("tableName", table)
+                .set("initialProperties", expectedProperties)
+                .set("location", expectedLocation)

Review Comment:
   This part does not feel right as in all the other workloads, it is 
implemented as a feeder is the `TableActions` class.  What is the rationale 
behind having this in the simulation itself and in an `exec` block?



##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/DatasetParameters.scala:
##########
@@ -54,7 +54,7 @@ case class DatasetParameters(
     numViewProperties: Int
 ) {
   val nAryTree: NAryTreeBuilder = NAryTreeBuilder(nsWidth, nsDepth)
-  private val maxPossibleTables = nAryTree.numberOfLastLevelElements * 
numTablesPerNs
+  val maxPossibleTables = nAryTree.numberOfLastLevelElements * numTablesPerNs

Review Comment:
   The intent behind initially `maxPossibleTables` having private visibility 
was to only use `numTables` so that the number of tables could be capped to a 
certain value.
   
   I.e. if there are 2^16 max possible tables in your dataset, but you want to 
restrict a specific workload on the first 2000 tables, you would set 
`max-tables = 2000`.  That way, you can reuse a larger dataset for more 
concentrated workloads.
   
   It does not mean that all workloads have to support this though.  I just 
want to emphasize was the initial idea was.
   
   If you want to keep using `maxPossibleTables`, would you mind adding a type 
annotation to it?  IntelliJ displays a warning when a public field has no 
annotated type and is just `var xyz = ...`.



##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.benchmarks.parameters
+
+import com.typesafe.config.Config
+import com.typesafe.scalalogging.Logger
+import org.slf4j.LoggerFactory
+
+import scala.jdk.CollectionConverters._
+import scala.collection.immutable.LazyList
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+/**
+ * Case class to hold the parameters for the WeightedWorkloadOnTreeDataset 
simulation.
+ *
+ * @param seed The RNG seed to use
+ * @param readers A seq of distrbutions to use for reading tables
+ * @param writers A seq of distrbutions to use for writing to tables
+ */
+case class WeightedWorkloadOnTreeDatasetParameters(
+    seed: Int,
+    readers: Seq[Distribution],
+    writers: Seq[Distribution],
+    durationInMinutes: Int
+) {
+  require(readers.nonEmpty || writers.nonEmpty, "At least one reader or writer 
is required")
+  require(durationInMinutes > 0, "Duration in minutes must be positive")
+}
+
+object WeightedWorkloadOnTreeDatasetParameters {
+  def loadDistributionsList(config: Config, key: String): List[Distribution] =
+    config.getConfigList(key).asScala.toList.map { conf =>
+      Distribution(
+        count = conf.getInt("count"),
+        mean = conf.getDouble("mean"),
+        variance = conf.getDouble("variance")
+      )
+    }
+}
+
+case class Distribution(count: Int, mean: Double, variance: Double) {
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  def printDescription(dataset: DatasetParameters): Unit = {
+    println(s"Summary for ${this}:")
+
+    // Visualize distributions
+    printVisualization(dataset.maxPossibleTables)
+
+    // Warn if a large amount of resampling will be needed
+    val debugRandomNumberProvider = RandomNumberProvider("debug".hashCode, -1)
+    def resampleStream: LazyList[Double] =
+      LazyList.continually(sample(dataset.maxPossibleTables, 
debugRandomNumberProvider))
+
+    val (_, resamples) = resampleStream.zipWithIndex
+      .take(100000)
+      .find { case (value, _) => value >= 0 && value < 
dataset.maxPossibleTables }
+      .map { case (value, index) => (value, index) }

Review Comment:
   Is this `.map` call doing anything?



##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.benchmarks.parameters
+
+import com.typesafe.config.Config
+import com.typesafe.scalalogging.Logger
+import org.slf4j.LoggerFactory
+
+import scala.jdk.CollectionConverters._
+import scala.collection.immutable.LazyList
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+/**
+ * Case class to hold the parameters for the WeightedWorkloadOnTreeDataset 
simulation.
+ *
+ * @param seed The RNG seed to use
+ * @param readers A seq of distrbutions to use for reading tables
+ * @param writers A seq of distrbutions to use for writing to tables
+ */
+case class WeightedWorkloadOnTreeDatasetParameters(
+    seed: Int,
+    readers: Seq[Distribution],
+    writers: Seq[Distribution],
+    durationInMinutes: Int
+) {
+  require(readers.nonEmpty || writers.nonEmpty, "At least one reader or writer 
is required")
+  require(durationInMinutes > 0, "Duration in minutes must be positive")
+}
+
+object WeightedWorkloadOnTreeDatasetParameters {
+  def loadDistributionsList(config: Config, key: String): List[Distribution] =
+    config.getConfigList(key).asScala.toList.map { conf =>
+      Distribution(
+        count = conf.getInt("count"),
+        mean = conf.getDouble("mean"),
+        variance = conf.getDouble("variance")
+      )
+    }
+}
+
+case class Distribution(count: Int, mean: Double, variance: Double) {
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  def printDescription(dataset: DatasetParameters): Unit = {
+    println(s"Summary for ${this}:")
+
+    // Visualize distributions
+    printVisualization(dataset.maxPossibleTables)
+
+    // Warn if a large amount of resampling will be needed
+    val debugRandomNumberProvider = RandomNumberProvider("debug".hashCode, -1)
+    def resampleStream: LazyList[Double] =
+      LazyList.continually(sample(dataset.maxPossibleTables, 
debugRandomNumberProvider))
+
+    val (_, resamples) = resampleStream.zipWithIndex
+      .take(100000)
+      .find { case (value, _) => value >= 0 && value < 
dataset.maxPossibleTables }
+      .map { case (value, index) => (value, index) }
+      .getOrElse((-1, 100000))
+
+    if (resamples > 100) {
+      logger.warn(
+        s"A distribution appears to require aggressive resampling: ${this} 
took ${resamples + 1} samples!"
+      )
+    }
+  }
+
+  /**
+   * Return a value in [0, items) based on this distribution using truncated 
normal resampling.
+   */
+  def sample(items: Int, randomNumberProvider: RandomNumberProvider): Int = {
+    val stddev = math.sqrt(variance)
+    // Resample until the value is in [0, 1]
+    val maxSamples = 100000
+    val value = Iterator
+      .continually(randomNumberProvider.next() * stddev + mean)
+      .take(maxSamples)
+      .find(x => x >= 0.0 && x <= 1.0)
+      .getOrElse(
+        throw new RuntimeException(
+          s"Failed to sample a value in [0, 1] after ${maxSamples} attempts"
+        )
+      )
+
+    (value * items).toInt.min(items - 1)
+  }
+
+  def printVisualization(tables: Int, samples: Int = 100000, bins: Int = 10): 
Unit = {
+    val binCounts = Array.fill(bins)(0)
+    val hits = new mutable.HashMap[Int, Int]()
+    val rng = RandomNumberProvider("visualization".hashCode, -1)
+
+    (1 to samples).foreach { _ =>
+      val value = sample(tables, rng)
+      val bin = ((value.toDouble / tables) * bins).toInt.min(bins - 1)
+      hits.put(value, hits.getOrElse(value, 0) + 1)
+      binCounts(bin) += 1
+    }
+
+    val maxBarWidth = 50
+    val total = binCounts.sum.toDouble
+    println("  Range         | % of Samples | Visualization")
+    println("  --------------|--------------|------------------")
+
+    (0 until bins).foreach { i =>
+      val low = i.toDouble / bins
+      val high = (i + 1).toDouble / bins
+      val percent = binCounts(i) / total * 100
+      val bar = "█" * ((percent / 100 * maxBarWidth).round.toInt)
+      println(f"  [$low%.1f - $high%.1f) | $percent%6.2f%%      | $bar")
+    }
+    println()
+
+    val mode = hits.maxBy(_._2)
+    val modePercentage: Int = Math.round(mode._2.toFloat / samples * 100)
+    println(s"  The most frequently selected table was chosen in 
~${modePercentage}% of samples")
+
+    println()
+  }
+}
+
+object Distribution {
+
+  // Map an index back to a table path
+  def tableIndexToIdentifier(index: Int, dp: DatasetParameters): (String, 
List[String], String) = {
+    require(
+      dp.numTablesMax == -1,
+      "Sampling is incompatible with numTablesMax settings other than -1"
+    )
+
+    val namespaceIndex = index / dp.numTablesPerNs
+    val namespaceOrdinal = 
dp.nAryTree.lastLevelOrdinals.toList.apply(namespaceIndex)
+    val namespacePath = dp.nAryTree.pathToRoot(namespaceOrdinal)
+
+    (s"C_0", namespacePath.map(n => s"NS_${n}"), s"T_${index}")

Review Comment:
   ```suggestion
       // TODO Refactor this line once entity names are configurable
       (s"C_0", namespacePath.map(n => s"NS_${n}"), s"T_${index}")
   ```
   
   I would like to add config parameters for each entity name (e.g. 
`table-name-prefix = "T_"`, ...).  That way, longer names are possible and 
entity names are computed in a single place.  Otherwise, the code becomes 
fragile (as it starts to be in the *Actions classes).
   
   I have not started this yet, so just a TODO reminder here will be sufficient.



##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.benchmarks.parameters
+
+import com.typesafe.config.Config
+import com.typesafe.scalalogging.Logger
+import org.slf4j.LoggerFactory
+
+import scala.jdk.CollectionConverters._
+import scala.collection.immutable.LazyList
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+/**
+ * Case class to hold the parameters for the WeightedWorkloadOnTreeDataset 
simulation.
+ *
+ * @param seed The RNG seed to use
+ * @param readers A seq of distrbutions to use for reading tables
+ * @param writers A seq of distrbutions to use for writing to tables
+ */
+case class WeightedWorkloadOnTreeDatasetParameters(
+    seed: Int,
+    readers: Seq[Distribution],
+    writers: Seq[Distribution],
+    durationInMinutes: Int
+) {
+  require(readers.nonEmpty || writers.nonEmpty, "At least one reader or writer 
is required")
+  require(durationInMinutes > 0, "Duration in minutes must be positive")
+}
+
+object WeightedWorkloadOnTreeDatasetParameters {
+  def loadDistributionsList(config: Config, key: String): List[Distribution] =
+    config.getConfigList(key).asScala.toList.map { conf =>
+      Distribution(
+        count = conf.getInt("count"),
+        mean = conf.getDouble("mean"),
+        variance = conf.getDouble("variance")
+      )
+    }
+}
+
+case class Distribution(count: Int, mean: Double, variance: Double) {
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  def printDescription(dataset: DatasetParameters): Unit = {
+    println(s"Summary for ${this}:")
+
+    // Visualize distributions
+    printVisualization(dataset.maxPossibleTables)
+
+    // Warn if a large amount of resampling will be needed
+    val debugRandomNumberProvider = RandomNumberProvider("debug".hashCode, -1)
+    def resampleStream: LazyList[Double] =
+      LazyList.continually(sample(dataset.maxPossibleTables, 
debugRandomNumberProvider))
+
+    val (_, resamples) = resampleStream.zipWithIndex
+      .take(100000)
+      .find { case (value, _) => value >= 0 && value < 
dataset.maxPossibleTables }
+      .map { case (value, index) => (value, index) }
+      .getOrElse((-1, 100000))
+
+    if (resamples > 100) {
+      logger.warn(
+        s"A distribution appears to require aggressive resampling: ${this} 
took ${resamples + 1} samples!"
+      )
+    }
+  }
+
+  /**
+   * Return a value in [0, items) based on this distribution using truncated 
normal resampling.
+   */
+  def sample(items: Int, randomNumberProvider: RandomNumberProvider): Int = {
+    val stddev = math.sqrt(variance)
+    // Resample until the value is in [0, 1]
+    val maxSamples = 100000
+    val value = Iterator
+      .continually(randomNumberProvider.next() * stddev + mean)
+      .take(maxSamples)
+      .find(x => x >= 0.0 && x <= 1.0)
+      .getOrElse(
+        throw new RuntimeException(
+          s"Failed to sample a value in [0, 1] after ${maxSamples} attempts"
+        )
+      )
+
+    (value * items).toInt.min(items - 1)
+  }
+
+  def printVisualization(tables: Int, samples: Int = 100000, bins: Int = 10): 
Unit = {
+    val binCounts = Array.fill(bins)(0)
+    val hits = new mutable.HashMap[Int, Int]()
+    val rng = RandomNumberProvider("visualization".hashCode, -1)

Review Comment:
   Same comment here about the seed.  AFAICT, we may be printing a distribution 
that does not _exactly_ match what is going to be executed, because the seed is 
different.  It could be useful to pass the seed as parameter to this method and 
ensure the user-provided seed is used.



##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala:
##########
@@ -96,5 +98,9 @@ case class AuthenticationActions(
    * scenario.
    */
   val restoreAccessTokenInSession: ChainBuilder =
-    exec(session => session.set("accessToken", accessToken.get()))
+    asLongAs(_ => accessToken.get() == null) {

Review Comment:
   This should not be possible.  See the comment in the simulation class.



##########
benchmarks/src/gatling/resources/benchmark-defaults.conf:
##########
@@ -161,4 +161,32 @@ workload {
     # Default: 5
     duration-in-minutes = 5
   }
+
+  # Configuration for the WeightedWorkloadOnTreeDataset simulation
+  weighted-workload-on-tree-dataset {
+    # Seed used for RNG during the test
+    seed = 42

Review Comment:
   nit: feel free to create a global `seed` parameter if you want.  I had added 
one a while ago but removed it after I made all workloads deterministic.



##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.benchmarks.simulations
+
+import io.gatling.core.Predef._
+import io.gatling.core.structure.ScenarioBuilder
+import io.gatling.http.Predef._
+import org.apache.polaris.benchmarks.actions._
+import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config
+import org.apache.polaris.benchmarks.parameters.{
+  ConnectionParameters,
+  DatasetParameters,
+  Distribution,
+  RandomNumberProvider,
+  WorkloadParameters
+}
+import org.slf4j.LoggerFactory
+
+import java.util.concurrent.atomic.AtomicReference
+import scala.concurrent.duration._
+
+/**
+ * This simulation performs reads and writes based on distributions specified 
in the config. It
+ * allows the simulation of workloads where e.g. a small fraction of tables 
get most writes. It is
+ * intended to be used against a Polaris instance with a pre-existing tree 
dataset.
+ */
+class WeightedWorkloadOnTreeDataset extends Simulation {
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  // 
--------------------------------------------------------------------------------
+  // Load parameters
+  // 
--------------------------------------------------------------------------------
+  val cp: ConnectionParameters = config.connectionParameters
+  val dp: DatasetParameters = config.datasetParameters
+  val wp: WorkloadParameters = config.workloadParameters
+
+  println("### Reader distributions ###")
+  wp.weightedWorkloadOnTreeDataset.readers.foreach(_.printDescription(dp))
+
+  println("### Writer distributions ###")
+  wp.weightedWorkloadOnTreeDataset.writers.foreach(_.printDescription(dp))
+
+  // 
--------------------------------------------------------------------------------
+  // Helper values
+  // 
--------------------------------------------------------------------------------
+  private val accessToken: AtomicReference[String] = new AtomicReference()
+
+  private val authActions = AuthenticationActions(cp, accessToken)
+  private val tblActions = TableActions(dp, wp, accessToken)
+
+  // 
--------------------------------------------------------------------------------
+  // Authentication related workloads
+  // 
--------------------------------------------------------------------------------
+  val refreshOauthForDuration: ScenarioBuilder =
+    scenario("Authenticate every minute using the Iceberg REST API")
+      .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) {
+        feed(authActions.feeder())
+          .exec(authActions.authenticateAndSaveAccessToken)
+          .pause(30.seconds)
+      }
+
+  val waitForAuthentication: ScenarioBuilder =
+    scenario("Wait for the authentication token to be available")
+      .asLongAs(_ => accessToken.get() == null) {
+        pause(1.second)
+      }
+
+  // 
--------------------------------------------------------------------------------
+  // Build up the HTTP protocol configuration and set up the simulation
+  // 
--------------------------------------------------------------------------------
+  private val httpProtocol = http
+    .baseUrl(cp.baseUrl)
+    .acceptHeader("application/json")
+    .contentTypeHeader("application/json")
+    .disableCaching
+
+  // 
--------------------------------------------------------------------------------
+  // Create all reader/writer scenarios and prepare them for injection
+  // 
--------------------------------------------------------------------------------
+  private val readerScenarioBuilders: List[ScenarioBuilder] =
+    wp.weightedWorkloadOnTreeDataset.readers.zipWithIndex.flatMap { case 
(dist, i) =>
+      (0 until dist.count).map { threadId =>
+        val rnp = RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed, 
i * 1000 + threadId)

Review Comment:
   I don't understand the `i * 1000 + threadId` part here.  Same for the `i * 
2000 + threadId` part in the write scenario builders.  Is this because you have 
a strict requirement to have different table distributions across read and 
write workloads?



##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.benchmarks.simulations
+
+import io.gatling.core.Predef._
+import io.gatling.core.structure.ScenarioBuilder
+import io.gatling.http.Predef._
+import org.apache.polaris.benchmarks.actions._
+import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config
+import org.apache.polaris.benchmarks.parameters.{
+  ConnectionParameters,
+  DatasetParameters,
+  Distribution,
+  RandomNumberProvider,
+  WorkloadParameters
+}
+import org.slf4j.LoggerFactory
+
+import java.util.concurrent.atomic.AtomicReference
+import scala.concurrent.duration._
+
+/**
+ * This simulation performs reads and writes based on distributions specified 
in the config. It
+ * allows the simulation of workloads where e.g. a small fraction of tables 
get most writes. It is
+ * intended to be used against a Polaris instance with a pre-existing tree 
dataset.
+ */
+class WeightedWorkloadOnTreeDataset extends Simulation {
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  // 
--------------------------------------------------------------------------------
+  // Load parameters
+  // 
--------------------------------------------------------------------------------
+  val cp: ConnectionParameters = config.connectionParameters
+  val dp: DatasetParameters = config.datasetParameters
+  val wp: WorkloadParameters = config.workloadParameters
+
+  println("### Reader distributions ###")
+  wp.weightedWorkloadOnTreeDataset.readers.foreach(_.printDescription(dp))
+
+  println("### Writer distributions ###")
+  wp.weightedWorkloadOnTreeDataset.writers.foreach(_.printDescription(dp))
+
+  // 
--------------------------------------------------------------------------------
+  // Helper values
+  // 
--------------------------------------------------------------------------------
+  private val accessToken: AtomicReference[String] = new AtomicReference()
+
+  private val authActions = AuthenticationActions(cp, accessToken)
+  private val tblActions = TableActions(dp, wp, accessToken)
+
+  // 
--------------------------------------------------------------------------------
+  // Authentication related workloads
+  // 
--------------------------------------------------------------------------------
+  val refreshOauthForDuration: ScenarioBuilder =
+    scenario("Authenticate every minute using the Iceberg REST API")
+      .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) {
+        feed(authActions.feeder())
+          .exec(authActions.authenticateAndSaveAccessToken)
+          .pause(30.seconds)
+      }
+
+  val waitForAuthentication: ScenarioBuilder =
+    scenario("Wait for the authentication token to be available")
+      .asLongAs(_ => accessToken.get() == null) {
+        pause(1.second)
+      }
+
+  // 
--------------------------------------------------------------------------------
+  // Build up the HTTP protocol configuration and set up the simulation
+  // 
--------------------------------------------------------------------------------
+  private val httpProtocol = http
+    .baseUrl(cp.baseUrl)
+    .acceptHeader("application/json")
+    .contentTypeHeader("application/json")
+    .disableCaching
+
+  // 
--------------------------------------------------------------------------------
+  // Create all reader/writer scenarios and prepare them for injection
+  // 
--------------------------------------------------------------------------------
+  private val readerScenarioBuilders: List[ScenarioBuilder] =
+    wp.weightedWorkloadOnTreeDataset.readers.zipWithIndex.flatMap { case 
(dist, i) =>
+      (0 until dist.count).map { threadId =>
+        val rnp = RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed, 
i * 1000 + threadId)
+        scenario(s"Reader-$i-$threadId")
+          .exec(authActions.restoreAccessTokenInSession)
+          .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) {
+            exec { session =>
+              val tableIndex = dist.sample(dp.maxPossibleTables, rnp)
+              val (catalog, namespace, table) =
+                Distribution.tableIndexToIdentifier(tableIndex, dp)
+
+              // Checked in `fetchTable`
+              val expectedProperties: Map[String, String] = (0 until 
dp.numTableProperties)
+                .map(id => s"InitialAttribute_$id" -> s"$id")
+                .toMap
+              val expectedLocation =
+                
s"${dp.defaultBaseLocation}/$catalog/${namespace.mkString("/")}/${table}"
+
+              session
+                .set("catalogName", catalog)
+                .set("multipartNamespace", 
namespace.mkString(0x1f.toChar.toString))
+                .set("tableName", table)
+                .set("initialProperties", expectedProperties)
+                .set("location", expectedLocation)
+            }.exec(tblActions.fetchTable)
+          }
+      }
+    }.toList
+
+  private val writerScenarioBuilders: List[ScenarioBuilder] =
+    wp.weightedWorkloadOnTreeDataset.writers.zipWithIndex.flatMap { case 
(dist, i) =>
+      (0 until dist.count).map { threadId =>
+        val rnp = RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed, 
i * 2000 + threadId)
+        scenario(s"Writer-$i-$threadId")
+          .exec(authActions.restoreAccessTokenInSession)
+          .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) {
+            exec { session =>
+              val tableIndex = dist.sample(dp.maxPossibleTables, rnp)
+              val (catalog, namespace, table) =
+                Distribution.tableIndexToIdentifier(tableIndex, dp)
+
+              // Needed for `updateTable`
+              val now = System.currentTimeMillis
+              val newProperty = s"""{"last_updated": "${now}"}"""
+
+              session
+                .set("catalogName", catalog)
+                .set("multipartNamespace", 
namespace.mkString(0x1f.toChar.toString))
+                .set("tableName", table)
+                .set("newProperty", newProperty)
+            }.exec(tblActions.updateTable)
+          }
+      }
+    }.toList
+
+  // 
--------------------------------------------------------------------------------
+  // Setup
+  // 
--------------------------------------------------------------------------------
+  setUp(
+    List(
+      refreshOauthForDuration.inject(atOnceUsers(1)).protocols(httpProtocol),
+      waitForAuthentication.inject(atOnceUsers(1)).protocols(httpProtocol)
+    ) ++
+      
readerScenarioBuilders.map(_.inject(atOnceUsers(1)).protocols(httpProtocol)) ++
+      
writerScenarioBuilders.map(_.inject(atOnceUsers(1)).protocols(httpProtocol))
+  )

Review Comment:
   This setUp() call is the reason why the access token is sometimes null.  You 
probably want to use the `.andThen(...)` method to chain workloads in a 
sequence.  See for example: 
https://github.com/apache/polaris-tools/blob/main/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/ReadUpdateTreeDataset.scala#L151-L163
   
   Typically, the way you could write that is:
   
   1. In parallel, start the `refreshOauthForDuration` and 
`waitForAuthentication` scenarios
   2. In sequence, start your read and write senarios *only after* 
`waitForAuthentication` has completed
   
   This would look like:
   
   ```
     setUp(
       refreshOauthForDuration.inject(atOnceUsers(1)).protocols(httpProtocol),
       waitForAuthentication.inject(atOnceUsers(1)).protocols(httpProtocol)
         .andThen(
           
readerScenarioBuilders.map(_.inject(atOnceUsers(1)).protocols(httpProtocol)) ++
           
writerScenarioBuilders.map(_.inject(atOnceUsers(1)).protocols(httpProtocol))
         )
     )
   ```
   
   That way, it will be impossible for the `accessToken` session attribute to 
be `null`.  So the changes in AuthenticationActions will not be required 
anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to