pingtimeout commented on code in PR #21:
URL: https://github.com/apache/polaris-tools/pull/21#discussion_r2131657209
##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/TableActions.scala:
##########
@@ -184,6 +184,7 @@ case class TableActions(
http("Fetch Table")
.get("/api/catalog/v1/#{catalogName}/namespaces/#{multipartNamespace}/tables/#{tableName}")
.header("Authorization", "Bearer #{accessToken}")
+ .header("If-None-Match", "")
Review Comment:
Is it possible this is a leftover from the previous discussion about
disabling caching?
##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.benchmarks.parameters
+
+import com.typesafe.config.Config
+import com.typesafe.scalalogging.Logger
+import org.slf4j.LoggerFactory
+
+import scala.jdk.CollectionConverters._
+import scala.collection.immutable.LazyList
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+/**
+ * Case class to hold the parameters for the WeightedWorkloadOnTreeDataset
simulation.
+ *
+ * @param seed The RNG seed to use
+ * @param readers A seq of distrbutions to use for reading tables
+ * @param writers A seq of distrbutions to use for writing to tables
+ */
+case class WeightedWorkloadOnTreeDatasetParameters(
+ seed: Int,
+ readers: Seq[Distribution],
+ writers: Seq[Distribution],
+ durationInMinutes: Int
+) {
+ require(readers.nonEmpty || writers.nonEmpty, "At least one reader or writer
is required")
+ require(durationInMinutes > 0, "Duration in minutes must be positive")
+}
+
+object WeightedWorkloadOnTreeDatasetParameters {
+ def loadDistributionsList(config: Config, key: String): List[Distribution] =
+ config.getConfigList(key).asScala.toList.map { conf =>
+ Distribution(
+ count = conf.getInt("count"),
+ mean = conf.getDouble("mean"),
+ variance = conf.getDouble("variance")
+ )
+ }
+}
+
+case class Distribution(count: Int, mean: Double, variance: Double) {
+ private val logger = LoggerFactory.getLogger(getClass)
+
+ def printDescription(dataset: DatasetParameters): Unit = {
+ println(s"Summary for ${this}:")
+
+ // Visualize distributions
+ printVisualization(dataset.maxPossibleTables)
+
+ // Warn if a large amount of resampling will be needed
+ val debugRandomNumberProvider = RandomNumberProvider("debug".hashCode, -1)
Review Comment:
It would be better if the seed used for the RNG was the one configured in
the `.conf` file. Not a big deal though as if a distribution is bad, then most
likely it will be bad regardless of the seed. But still something that might
be considered to follow the principle of least surprise
##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.benchmarks.simulations
+
+import io.gatling.core.Predef._
+import io.gatling.core.structure.ScenarioBuilder
+import io.gatling.http.Predef._
+import org.apache.polaris.benchmarks.actions._
+import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config
+import org.apache.polaris.benchmarks.parameters.{
+ ConnectionParameters,
+ DatasetParameters,
+ Distribution,
+ RandomNumberProvider,
+ WorkloadParameters
+}
+import org.slf4j.LoggerFactory
+
+import java.util.concurrent.atomic.AtomicReference
+import scala.concurrent.duration._
+
+/**
+ * This simulation performs reads and writes based on distributions specified
in the config. It
+ * allows the simulation of workloads where e.g. a small fraction of tables
get most writes. It is
+ * intended to be used against a Polaris instance with a pre-existing tree
dataset.
+ */
+class WeightedWorkloadOnTreeDataset extends Simulation {
+ private val logger = LoggerFactory.getLogger(getClass)
+
+ //
--------------------------------------------------------------------------------
+ // Load parameters
+ //
--------------------------------------------------------------------------------
+ val cp: ConnectionParameters = config.connectionParameters
+ val dp: DatasetParameters = config.datasetParameters
+ val wp: WorkloadParameters = config.workloadParameters
+
+ println("### Reader distributions ###")
+ wp.weightedWorkloadOnTreeDataset.readers.foreach(_.printDescription(dp))
+
+ println("### Writer distributions ###")
+ wp.weightedWorkloadOnTreeDataset.writers.foreach(_.printDescription(dp))
+
+ //
--------------------------------------------------------------------------------
+ // Helper values
+ //
--------------------------------------------------------------------------------
+ private val accessToken: AtomicReference[String] = new AtomicReference()
+
+ private val authActions = AuthenticationActions(cp, accessToken)
+ private val tblActions = TableActions(dp, wp, accessToken)
+
+ //
--------------------------------------------------------------------------------
+ // Authentication related workloads
+ //
--------------------------------------------------------------------------------
+ val refreshOauthForDuration: ScenarioBuilder =
+ scenario("Authenticate every minute using the Iceberg REST API")
Review Comment:
nit: the name says "every minute" but the pause is 30s
##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala:
##########
@@ -85,8 +86,9 @@ case class AuthenticationActions(
.check(jsonPath("$.access_token").saveAs("accessToken"))
)
.exec { session =>
- if (session.contains("accessToken"))
+ if (session.contains("accessToken") && session("accessToken") != null)
{
Review Comment:
This should not be possible. See the comment in the simulation class.
##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.benchmarks.simulations
+
+import io.gatling.core.Predef._
+import io.gatling.core.structure.ScenarioBuilder
+import io.gatling.http.Predef._
+import org.apache.polaris.benchmarks.actions._
+import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config
+import org.apache.polaris.benchmarks.parameters.{
+ ConnectionParameters,
+ DatasetParameters,
+ Distribution,
+ RandomNumberProvider,
+ WorkloadParameters
+}
+import org.slf4j.LoggerFactory
+
+import java.util.concurrent.atomic.AtomicReference
+import scala.concurrent.duration._
+
+/**
+ * This simulation performs reads and writes based on distributions specified
in the config. It
+ * allows the simulation of workloads where e.g. a small fraction of tables
get most writes. It is
+ * intended to be used against a Polaris instance with a pre-existing tree
dataset.
+ */
+class WeightedWorkloadOnTreeDataset extends Simulation {
+ private val logger = LoggerFactory.getLogger(getClass)
+
+ //
--------------------------------------------------------------------------------
+ // Load parameters
+ //
--------------------------------------------------------------------------------
+ val cp: ConnectionParameters = config.connectionParameters
+ val dp: DatasetParameters = config.datasetParameters
+ val wp: WorkloadParameters = config.workloadParameters
+
+ println("### Reader distributions ###")
+ wp.weightedWorkloadOnTreeDataset.readers.foreach(_.printDescription(dp))
+
+ println("### Writer distributions ###")
+ wp.weightedWorkloadOnTreeDataset.writers.foreach(_.printDescription(dp))
+
+ //
--------------------------------------------------------------------------------
+ // Helper values
+ //
--------------------------------------------------------------------------------
+ private val accessToken: AtomicReference[String] = new AtomicReference()
+
+ private val authActions = AuthenticationActions(cp, accessToken)
+ private val tblActions = TableActions(dp, wp, accessToken)
+
+ //
--------------------------------------------------------------------------------
+ // Authentication related workloads
+ //
--------------------------------------------------------------------------------
+ val refreshOauthForDuration: ScenarioBuilder =
+ scenario("Authenticate every minute using the Iceberg REST API")
+ .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) {
+ feed(authActions.feeder())
+ .exec(authActions.authenticateAndSaveAccessToken)
+ .pause(30.seconds)
+ }
+
+ val waitForAuthentication: ScenarioBuilder =
+ scenario("Wait for the authentication token to be available")
+ .asLongAs(_ => accessToken.get() == null) {
+ pause(1.second)
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Build up the HTTP protocol configuration and set up the simulation
+ //
--------------------------------------------------------------------------------
+ private val httpProtocol = http
+ .baseUrl(cp.baseUrl)
+ .acceptHeader("application/json")
+ .contentTypeHeader("application/json")
+ .disableCaching
+
+ //
--------------------------------------------------------------------------------
+ // Create all reader/writer scenarios and prepare them for injection
+ //
--------------------------------------------------------------------------------
+ private val readerScenarioBuilders: List[ScenarioBuilder] =
+ wp.weightedWorkloadOnTreeDataset.readers.zipWithIndex.flatMap { case
(dist, i) =>
+ (0 until dist.count).map { threadId =>
+ val rnp = RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed,
i * 1000 + threadId)
+ scenario(s"Reader-$i-$threadId")
+ .exec(authActions.restoreAccessTokenInSession)
+ .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) {
+ exec { session =>
+ val tableIndex = dist.sample(dp.maxPossibleTables, rnp)
+ val (catalog, namespace, table) =
+ Distribution.tableIndexToIdentifier(tableIndex, dp)
+
+ // Checked in `fetchTable`
+ val expectedProperties: Map[String, String] = (0 until
dp.numTableProperties)
+ .map(id => s"InitialAttribute_$id" -> s"$id")
+ .toMap
+ val expectedLocation =
+
s"${dp.defaultBaseLocation}/$catalog/${namespace.mkString("/")}/${table}"
+
+ session
+ .set("catalogName", catalog)
+ .set("multipartNamespace",
namespace.mkString(0x1f.toChar.toString))
+ .set("tableName", table)
+ .set("initialProperties", expectedProperties)
+ .set("location", expectedLocation)
Review Comment:
This part does not feel right as in all the other workloads, it is
implemented as a feeder is the `TableActions` class. What is the rationale
behind having this in the simulation itself and in an `exec` block?
##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/DatasetParameters.scala:
##########
@@ -54,7 +54,7 @@ case class DatasetParameters(
numViewProperties: Int
) {
val nAryTree: NAryTreeBuilder = NAryTreeBuilder(nsWidth, nsDepth)
- private val maxPossibleTables = nAryTree.numberOfLastLevelElements *
numTablesPerNs
+ val maxPossibleTables = nAryTree.numberOfLastLevelElements * numTablesPerNs
Review Comment:
The intent behind initially `maxPossibleTables` having private visibility
was to only use `numTables` so that the number of tables could be capped to a
certain value.
I.e. if there are 2^16 max possible tables in your dataset, but you want to
restrict a specific workload on the first 2000 tables, you would set
`max-tables = 2000`. That way, you can reuse a larger dataset for more
concentrated workloads.
It does not mean that all workloads have to support this though. I just
want to emphasize was the initial idea was.
If you want to keep using `maxPossibleTables`, would you mind adding a type
annotation to it? IntelliJ displays a warning when a public field has no
annotated type and is just `var xyz = ...`.
##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.benchmarks.parameters
+
+import com.typesafe.config.Config
+import com.typesafe.scalalogging.Logger
+import org.slf4j.LoggerFactory
+
+import scala.jdk.CollectionConverters._
+import scala.collection.immutable.LazyList
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+/**
+ * Case class to hold the parameters for the WeightedWorkloadOnTreeDataset
simulation.
+ *
+ * @param seed The RNG seed to use
+ * @param readers A seq of distrbutions to use for reading tables
+ * @param writers A seq of distrbutions to use for writing to tables
+ */
+case class WeightedWorkloadOnTreeDatasetParameters(
+ seed: Int,
+ readers: Seq[Distribution],
+ writers: Seq[Distribution],
+ durationInMinutes: Int
+) {
+ require(readers.nonEmpty || writers.nonEmpty, "At least one reader or writer
is required")
+ require(durationInMinutes > 0, "Duration in minutes must be positive")
+}
+
+object WeightedWorkloadOnTreeDatasetParameters {
+ def loadDistributionsList(config: Config, key: String): List[Distribution] =
+ config.getConfigList(key).asScala.toList.map { conf =>
+ Distribution(
+ count = conf.getInt("count"),
+ mean = conf.getDouble("mean"),
+ variance = conf.getDouble("variance")
+ )
+ }
+}
+
+case class Distribution(count: Int, mean: Double, variance: Double) {
+ private val logger = LoggerFactory.getLogger(getClass)
+
+ def printDescription(dataset: DatasetParameters): Unit = {
+ println(s"Summary for ${this}:")
+
+ // Visualize distributions
+ printVisualization(dataset.maxPossibleTables)
+
+ // Warn if a large amount of resampling will be needed
+ val debugRandomNumberProvider = RandomNumberProvider("debug".hashCode, -1)
+ def resampleStream: LazyList[Double] =
+ LazyList.continually(sample(dataset.maxPossibleTables,
debugRandomNumberProvider))
+
+ val (_, resamples) = resampleStream.zipWithIndex
+ .take(100000)
+ .find { case (value, _) => value >= 0 && value <
dataset.maxPossibleTables }
+ .map { case (value, index) => (value, index) }
Review Comment:
Is this `.map` call doing anything?
##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.benchmarks.parameters
+
+import com.typesafe.config.Config
+import com.typesafe.scalalogging.Logger
+import org.slf4j.LoggerFactory
+
+import scala.jdk.CollectionConverters._
+import scala.collection.immutable.LazyList
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+/**
+ * Case class to hold the parameters for the WeightedWorkloadOnTreeDataset
simulation.
+ *
+ * @param seed The RNG seed to use
+ * @param readers A seq of distrbutions to use for reading tables
+ * @param writers A seq of distrbutions to use for writing to tables
+ */
+case class WeightedWorkloadOnTreeDatasetParameters(
+ seed: Int,
+ readers: Seq[Distribution],
+ writers: Seq[Distribution],
+ durationInMinutes: Int
+) {
+ require(readers.nonEmpty || writers.nonEmpty, "At least one reader or writer
is required")
+ require(durationInMinutes > 0, "Duration in minutes must be positive")
+}
+
+object WeightedWorkloadOnTreeDatasetParameters {
+ def loadDistributionsList(config: Config, key: String): List[Distribution] =
+ config.getConfigList(key).asScala.toList.map { conf =>
+ Distribution(
+ count = conf.getInt("count"),
+ mean = conf.getDouble("mean"),
+ variance = conf.getDouble("variance")
+ )
+ }
+}
+
+case class Distribution(count: Int, mean: Double, variance: Double) {
+ private val logger = LoggerFactory.getLogger(getClass)
+
+ def printDescription(dataset: DatasetParameters): Unit = {
+ println(s"Summary for ${this}:")
+
+ // Visualize distributions
+ printVisualization(dataset.maxPossibleTables)
+
+ // Warn if a large amount of resampling will be needed
+ val debugRandomNumberProvider = RandomNumberProvider("debug".hashCode, -1)
+ def resampleStream: LazyList[Double] =
+ LazyList.continually(sample(dataset.maxPossibleTables,
debugRandomNumberProvider))
+
+ val (_, resamples) = resampleStream.zipWithIndex
+ .take(100000)
+ .find { case (value, _) => value >= 0 && value <
dataset.maxPossibleTables }
+ .map { case (value, index) => (value, index) }
+ .getOrElse((-1, 100000))
+
+ if (resamples > 100) {
+ logger.warn(
+ s"A distribution appears to require aggressive resampling: ${this}
took ${resamples + 1} samples!"
+ )
+ }
+ }
+
+ /**
+ * Return a value in [0, items) based on this distribution using truncated
normal resampling.
+ */
+ def sample(items: Int, randomNumberProvider: RandomNumberProvider): Int = {
+ val stddev = math.sqrt(variance)
+ // Resample until the value is in [0, 1]
+ val maxSamples = 100000
+ val value = Iterator
+ .continually(randomNumberProvider.next() * stddev + mean)
+ .take(maxSamples)
+ .find(x => x >= 0.0 && x <= 1.0)
+ .getOrElse(
+ throw new RuntimeException(
+ s"Failed to sample a value in [0, 1] after ${maxSamples} attempts"
+ )
+ )
+
+ (value * items).toInt.min(items - 1)
+ }
+
+ def printVisualization(tables: Int, samples: Int = 100000, bins: Int = 10):
Unit = {
+ val binCounts = Array.fill(bins)(0)
+ val hits = new mutable.HashMap[Int, Int]()
+ val rng = RandomNumberProvider("visualization".hashCode, -1)
+
+ (1 to samples).foreach { _ =>
+ val value = sample(tables, rng)
+ val bin = ((value.toDouble / tables) * bins).toInt.min(bins - 1)
+ hits.put(value, hits.getOrElse(value, 0) + 1)
+ binCounts(bin) += 1
+ }
+
+ val maxBarWidth = 50
+ val total = binCounts.sum.toDouble
+ println(" Range | % of Samples | Visualization")
+ println(" --------------|--------------|------------------")
+
+ (0 until bins).foreach { i =>
+ val low = i.toDouble / bins
+ val high = (i + 1).toDouble / bins
+ val percent = binCounts(i) / total * 100
+ val bar = "█" * ((percent / 100 * maxBarWidth).round.toInt)
+ println(f" [$low%.1f - $high%.1f) | $percent%6.2f%% | $bar")
+ }
+ println()
+
+ val mode = hits.maxBy(_._2)
+ val modePercentage: Int = Math.round(mode._2.toFloat / samples * 100)
+ println(s" The most frequently selected table was chosen in
~${modePercentage}% of samples")
+
+ println()
+ }
+}
+
+object Distribution {
+
+ // Map an index back to a table path
+ def tableIndexToIdentifier(index: Int, dp: DatasetParameters): (String,
List[String], String) = {
+ require(
+ dp.numTablesMax == -1,
+ "Sampling is incompatible with numTablesMax settings other than -1"
+ )
+
+ val namespaceIndex = index / dp.numTablesPerNs
+ val namespaceOrdinal =
dp.nAryTree.lastLevelOrdinals.toList.apply(namespaceIndex)
+ val namespacePath = dp.nAryTree.pathToRoot(namespaceOrdinal)
+
+ (s"C_0", namespacePath.map(n => s"NS_${n}"), s"T_${index}")
Review Comment:
```suggestion
// TODO Refactor this line once entity names are configurable
(s"C_0", namespacePath.map(n => s"NS_${n}"), s"T_${index}")
```
I would like to add config parameters for each entity name (e.g.
`table-name-prefix = "T_"`, ...). That way, longer names are possible and
entity names are computed in a single place. Otherwise, the code becomes
fragile (as it starts to be in the *Actions classes).
I have not started this yet, so just a TODO reminder here will be sufficient.
##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/WeightedWorkloadOnTreeDatasetParameters.scala:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.benchmarks.parameters
+
+import com.typesafe.config.Config
+import com.typesafe.scalalogging.Logger
+import org.slf4j.LoggerFactory
+
+import scala.jdk.CollectionConverters._
+import scala.collection.immutable.LazyList
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+/**
+ * Case class to hold the parameters for the WeightedWorkloadOnTreeDataset
simulation.
+ *
+ * @param seed The RNG seed to use
+ * @param readers A seq of distrbutions to use for reading tables
+ * @param writers A seq of distrbutions to use for writing to tables
+ */
+case class WeightedWorkloadOnTreeDatasetParameters(
+ seed: Int,
+ readers: Seq[Distribution],
+ writers: Seq[Distribution],
+ durationInMinutes: Int
+) {
+ require(readers.nonEmpty || writers.nonEmpty, "At least one reader or writer
is required")
+ require(durationInMinutes > 0, "Duration in minutes must be positive")
+}
+
+object WeightedWorkloadOnTreeDatasetParameters {
+ def loadDistributionsList(config: Config, key: String): List[Distribution] =
+ config.getConfigList(key).asScala.toList.map { conf =>
+ Distribution(
+ count = conf.getInt("count"),
+ mean = conf.getDouble("mean"),
+ variance = conf.getDouble("variance")
+ )
+ }
+}
+
+case class Distribution(count: Int, mean: Double, variance: Double) {
+ private val logger = LoggerFactory.getLogger(getClass)
+
+ def printDescription(dataset: DatasetParameters): Unit = {
+ println(s"Summary for ${this}:")
+
+ // Visualize distributions
+ printVisualization(dataset.maxPossibleTables)
+
+ // Warn if a large amount of resampling will be needed
+ val debugRandomNumberProvider = RandomNumberProvider("debug".hashCode, -1)
+ def resampleStream: LazyList[Double] =
+ LazyList.continually(sample(dataset.maxPossibleTables,
debugRandomNumberProvider))
+
+ val (_, resamples) = resampleStream.zipWithIndex
+ .take(100000)
+ .find { case (value, _) => value >= 0 && value <
dataset.maxPossibleTables }
+ .map { case (value, index) => (value, index) }
+ .getOrElse((-1, 100000))
+
+ if (resamples > 100) {
+ logger.warn(
+ s"A distribution appears to require aggressive resampling: ${this}
took ${resamples + 1} samples!"
+ )
+ }
+ }
+
+ /**
+ * Return a value in [0, items) based on this distribution using truncated
normal resampling.
+ */
+ def sample(items: Int, randomNumberProvider: RandomNumberProvider): Int = {
+ val stddev = math.sqrt(variance)
+ // Resample until the value is in [0, 1]
+ val maxSamples = 100000
+ val value = Iterator
+ .continually(randomNumberProvider.next() * stddev + mean)
+ .take(maxSamples)
+ .find(x => x >= 0.0 && x <= 1.0)
+ .getOrElse(
+ throw new RuntimeException(
+ s"Failed to sample a value in [0, 1] after ${maxSamples} attempts"
+ )
+ )
+
+ (value * items).toInt.min(items - 1)
+ }
+
+ def printVisualization(tables: Int, samples: Int = 100000, bins: Int = 10):
Unit = {
+ val binCounts = Array.fill(bins)(0)
+ val hits = new mutable.HashMap[Int, Int]()
+ val rng = RandomNumberProvider("visualization".hashCode, -1)
Review Comment:
Same comment here about the seed. AFAICT, we may be printing a distribution
that does not _exactly_ match what is going to be executed, because the seed is
different. It could be useful to pass the seed as parameter to this method and
ensure the user-provided seed is used.
##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala:
##########
@@ -96,5 +98,9 @@ case class AuthenticationActions(
* scenario.
*/
val restoreAccessTokenInSession: ChainBuilder =
- exec(session => session.set("accessToken", accessToken.get()))
+ asLongAs(_ => accessToken.get() == null) {
Review Comment:
This should not be possible. See the comment in the simulation class.
##########
benchmarks/src/gatling/resources/benchmark-defaults.conf:
##########
@@ -161,4 +161,32 @@ workload {
# Default: 5
duration-in-minutes = 5
}
+
+ # Configuration for the WeightedWorkloadOnTreeDataset simulation
+ weighted-workload-on-tree-dataset {
+ # Seed used for RNG during the test
+ seed = 42
Review Comment:
nit: feel free to create a global `seed` parameter if you want. I had added
one a while ago but removed it after I made all workloads deterministic.
##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.benchmarks.simulations
+
+import io.gatling.core.Predef._
+import io.gatling.core.structure.ScenarioBuilder
+import io.gatling.http.Predef._
+import org.apache.polaris.benchmarks.actions._
+import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config
+import org.apache.polaris.benchmarks.parameters.{
+ ConnectionParameters,
+ DatasetParameters,
+ Distribution,
+ RandomNumberProvider,
+ WorkloadParameters
+}
+import org.slf4j.LoggerFactory
+
+import java.util.concurrent.atomic.AtomicReference
+import scala.concurrent.duration._
+
+/**
+ * This simulation performs reads and writes based on distributions specified
in the config. It
+ * allows the simulation of workloads where e.g. a small fraction of tables
get most writes. It is
+ * intended to be used against a Polaris instance with a pre-existing tree
dataset.
+ */
+class WeightedWorkloadOnTreeDataset extends Simulation {
+ private val logger = LoggerFactory.getLogger(getClass)
+
+ //
--------------------------------------------------------------------------------
+ // Load parameters
+ //
--------------------------------------------------------------------------------
+ val cp: ConnectionParameters = config.connectionParameters
+ val dp: DatasetParameters = config.datasetParameters
+ val wp: WorkloadParameters = config.workloadParameters
+
+ println("### Reader distributions ###")
+ wp.weightedWorkloadOnTreeDataset.readers.foreach(_.printDescription(dp))
+
+ println("### Writer distributions ###")
+ wp.weightedWorkloadOnTreeDataset.writers.foreach(_.printDescription(dp))
+
+ //
--------------------------------------------------------------------------------
+ // Helper values
+ //
--------------------------------------------------------------------------------
+ private val accessToken: AtomicReference[String] = new AtomicReference()
+
+ private val authActions = AuthenticationActions(cp, accessToken)
+ private val tblActions = TableActions(dp, wp, accessToken)
+
+ //
--------------------------------------------------------------------------------
+ // Authentication related workloads
+ //
--------------------------------------------------------------------------------
+ val refreshOauthForDuration: ScenarioBuilder =
+ scenario("Authenticate every minute using the Iceberg REST API")
+ .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) {
+ feed(authActions.feeder())
+ .exec(authActions.authenticateAndSaveAccessToken)
+ .pause(30.seconds)
+ }
+
+ val waitForAuthentication: ScenarioBuilder =
+ scenario("Wait for the authentication token to be available")
+ .asLongAs(_ => accessToken.get() == null) {
+ pause(1.second)
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Build up the HTTP protocol configuration and set up the simulation
+ //
--------------------------------------------------------------------------------
+ private val httpProtocol = http
+ .baseUrl(cp.baseUrl)
+ .acceptHeader("application/json")
+ .contentTypeHeader("application/json")
+ .disableCaching
+
+ //
--------------------------------------------------------------------------------
+ // Create all reader/writer scenarios and prepare them for injection
+ //
--------------------------------------------------------------------------------
+ private val readerScenarioBuilders: List[ScenarioBuilder] =
+ wp.weightedWorkloadOnTreeDataset.readers.zipWithIndex.flatMap { case
(dist, i) =>
+ (0 until dist.count).map { threadId =>
+ val rnp = RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed,
i * 1000 + threadId)
Review Comment:
I don't understand the `i * 1000 + threadId` part here. Same for the `i *
2000 + threadId` part in the write scenario builders. Is this because you have
a strict requirement to have different table distributions across read and
write workloads?
##########
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.benchmarks.simulations
+
+import io.gatling.core.Predef._
+import io.gatling.core.structure.ScenarioBuilder
+import io.gatling.http.Predef._
+import org.apache.polaris.benchmarks.actions._
+import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config
+import org.apache.polaris.benchmarks.parameters.{
+ ConnectionParameters,
+ DatasetParameters,
+ Distribution,
+ RandomNumberProvider,
+ WorkloadParameters
+}
+import org.slf4j.LoggerFactory
+
+import java.util.concurrent.atomic.AtomicReference
+import scala.concurrent.duration._
+
+/**
+ * This simulation performs reads and writes based on distributions specified
in the config. It
+ * allows the simulation of workloads where e.g. a small fraction of tables
get most writes. It is
+ * intended to be used against a Polaris instance with a pre-existing tree
dataset.
+ */
+class WeightedWorkloadOnTreeDataset extends Simulation {
+ private val logger = LoggerFactory.getLogger(getClass)
+
+ //
--------------------------------------------------------------------------------
+ // Load parameters
+ //
--------------------------------------------------------------------------------
+ val cp: ConnectionParameters = config.connectionParameters
+ val dp: DatasetParameters = config.datasetParameters
+ val wp: WorkloadParameters = config.workloadParameters
+
+ println("### Reader distributions ###")
+ wp.weightedWorkloadOnTreeDataset.readers.foreach(_.printDescription(dp))
+
+ println("### Writer distributions ###")
+ wp.weightedWorkloadOnTreeDataset.writers.foreach(_.printDescription(dp))
+
+ //
--------------------------------------------------------------------------------
+ // Helper values
+ //
--------------------------------------------------------------------------------
+ private val accessToken: AtomicReference[String] = new AtomicReference()
+
+ private val authActions = AuthenticationActions(cp, accessToken)
+ private val tblActions = TableActions(dp, wp, accessToken)
+
+ //
--------------------------------------------------------------------------------
+ // Authentication related workloads
+ //
--------------------------------------------------------------------------------
+ val refreshOauthForDuration: ScenarioBuilder =
+ scenario("Authenticate every minute using the Iceberg REST API")
+ .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) {
+ feed(authActions.feeder())
+ .exec(authActions.authenticateAndSaveAccessToken)
+ .pause(30.seconds)
+ }
+
+ val waitForAuthentication: ScenarioBuilder =
+ scenario("Wait for the authentication token to be available")
+ .asLongAs(_ => accessToken.get() == null) {
+ pause(1.second)
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Build up the HTTP protocol configuration and set up the simulation
+ //
--------------------------------------------------------------------------------
+ private val httpProtocol = http
+ .baseUrl(cp.baseUrl)
+ .acceptHeader("application/json")
+ .contentTypeHeader("application/json")
+ .disableCaching
+
+ //
--------------------------------------------------------------------------------
+ // Create all reader/writer scenarios and prepare them for injection
+ //
--------------------------------------------------------------------------------
+ private val readerScenarioBuilders: List[ScenarioBuilder] =
+ wp.weightedWorkloadOnTreeDataset.readers.zipWithIndex.flatMap { case
(dist, i) =>
+ (0 until dist.count).map { threadId =>
+ val rnp = RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed,
i * 1000 + threadId)
+ scenario(s"Reader-$i-$threadId")
+ .exec(authActions.restoreAccessTokenInSession)
+ .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) {
+ exec { session =>
+ val tableIndex = dist.sample(dp.maxPossibleTables, rnp)
+ val (catalog, namespace, table) =
+ Distribution.tableIndexToIdentifier(tableIndex, dp)
+
+ // Checked in `fetchTable`
+ val expectedProperties: Map[String, String] = (0 until
dp.numTableProperties)
+ .map(id => s"InitialAttribute_$id" -> s"$id")
+ .toMap
+ val expectedLocation =
+
s"${dp.defaultBaseLocation}/$catalog/${namespace.mkString("/")}/${table}"
+
+ session
+ .set("catalogName", catalog)
+ .set("multipartNamespace",
namespace.mkString(0x1f.toChar.toString))
+ .set("tableName", table)
+ .set("initialProperties", expectedProperties)
+ .set("location", expectedLocation)
+ }.exec(tblActions.fetchTable)
+ }
+ }
+ }.toList
+
+ private val writerScenarioBuilders: List[ScenarioBuilder] =
+ wp.weightedWorkloadOnTreeDataset.writers.zipWithIndex.flatMap { case
(dist, i) =>
+ (0 until dist.count).map { threadId =>
+ val rnp = RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed,
i * 2000 + threadId)
+ scenario(s"Writer-$i-$threadId")
+ .exec(authActions.restoreAccessTokenInSession)
+ .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) {
+ exec { session =>
+ val tableIndex = dist.sample(dp.maxPossibleTables, rnp)
+ val (catalog, namespace, table) =
+ Distribution.tableIndexToIdentifier(tableIndex, dp)
+
+ // Needed for `updateTable`
+ val now = System.currentTimeMillis
+ val newProperty = s"""{"last_updated": "${now}"}"""
+
+ session
+ .set("catalogName", catalog)
+ .set("multipartNamespace",
namespace.mkString(0x1f.toChar.toString))
+ .set("tableName", table)
+ .set("newProperty", newProperty)
+ }.exec(tblActions.updateTable)
+ }
+ }
+ }.toList
+
+ //
--------------------------------------------------------------------------------
+ // Setup
+ //
--------------------------------------------------------------------------------
+ setUp(
+ List(
+ refreshOauthForDuration.inject(atOnceUsers(1)).protocols(httpProtocol),
+ waitForAuthentication.inject(atOnceUsers(1)).protocols(httpProtocol)
+ ) ++
+
readerScenarioBuilders.map(_.inject(atOnceUsers(1)).protocols(httpProtocol)) ++
+
writerScenarioBuilders.map(_.inject(atOnceUsers(1)).protocols(httpProtocol))
+ )
Review Comment:
This setUp() call is the reason why the access token is sometimes null. You
probably want to use the `.andThen(...)` method to chain workloads in a
sequence. See for example:
https://github.com/apache/polaris-tools/blob/main/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/ReadUpdateTreeDataset.scala#L151-L163
Typically, the way you could write that is:
1. In parallel, start the `refreshOauthForDuration` and
`waitForAuthentication` scenarios
2. In sequence, start your read and write senarios *only after*
`waitForAuthentication` has completed
This would look like:
```
setUp(
refreshOauthForDuration.inject(atOnceUsers(1)).protocols(httpProtocol),
waitForAuthentication.inject(atOnceUsers(1)).protocols(httpProtocol)
.andThen(
readerScenarioBuilders.map(_.inject(atOnceUsers(1)).protocols(httpProtocol)) ++
writerScenarioBuilders.map(_.inject(atOnceUsers(1)).protocols(httpProtocol))
)
)
```
That way, it will be impossible for the `accessToken` session attribute to
be `null`. So the changes in AuthenticationActions will not be required
anymore.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]