This is an automated email from the ASF dual-hosted git repository.
sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 136201ab4e75 [SPARK-53927][BUILD][DSTREAM] Upgrade kinesis client and
fix kinesis integration tests
136201ab4e75 is described below
commit 136201ab4e750f03dfbc662dacb96b848f1ace2f
Author: Vlad Rozov <[email protected]>
AuthorDate: Fri Oct 17 14:48:25 2025 +0900
[SPARK-53927][BUILD][DSTREAM] Upgrade kinesis client and fix kinesis
integration tests
### What changes were proposed in this pull request?
Upgrade kinesis client and AWS Java SDK to fix Kinesis integration tests.
Kinesis client is upgraded from `1.12.0` to `1.15.3` (latest on 1.x)
AWS Java SDK is upgraded from `1.11.655` to `1.12.681` (the one used by
Kinesis client)
AWS Kinesis producer library (used in test) upgraded from `0.12.8` to
`1.0.5`
### Why are the changes needed?
Existing clients are not compatible causing Kinesis integration tests to
fail at runtime:
```
ENABLE_KINESIS_TESTS=1 ./build/sbt -Pkinesis-asl
...
Using endpoint URL https://kinesis.us-west-2.amazonaws.com for creating
Kinesis streams for tests.
[info] WithoutAggregationKinesisBackedBlockRDDSuite:
[info]
org.apache.spark.streaming.kinesis.WithoutAggregationKinesisBackedBlockRDDSuite
*** ABORTED *** (1 second, 131 milliseconds)
[info] java.lang.NoClassDefFoundError:
com/fasterxml/jackson/databind/PropertyNamingStrategy$PascalCaseStrategy
[info] at
com.amazonaws.services.kinesis.AmazonKinesisClient.<clinit>(AmazonKinesisClient.java:86)
[info] at
org.apache.spark.streaming.kinesis.KinesisTestUtils.kinesisClient$lzycompute(KinesisTestUtils.scala:59)
[info] at
org.apache.spark.streaming.kinesis.KinesisTestUtils.kinesisClient(KinesisTestUtils.scala:58)
[info] at
org.apache.spark.streaming.kinesis.KinesisTestUtils.describeStream(KinesisTestUtils.scala:169)
[info] at
org.apache.spark.streaming.kinesis.KinesisTestUtils.findNonExistentStreamName(KinesisTestUtils.scala:182)
[info] at
org.apache.spark.streaming.kinesis.KinesisTestUtils.createStream(KinesisTestUtils.scala:85)
[info] at
org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.$anonfun$beforeAll$1(KinesisBackedBlockRDDSuite.scala:45)
[info] at
org.apache.spark.streaming.kinesis.KinesisFunSuite.runIfTestsEnabled(KinesisFunSuite.scala:41)
[info] at
org.apache.spark.streaming.kinesis.KinesisFunSuite.runIfTestsEnabled$(KinesisFunSuite.scala:39)
[info] at
org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.runIfTestsEnabled(KinesisBackedBlockRDDSuite.scala:26)
[info] at
org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.beforeAll(KinesisBackedBlockRDDSuite.scala:43)
[info] at
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)
[info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info] at
org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68)
[info] at
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info] at
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info] at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info] at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
[info] Cause: java.lang.ClassNotFoundException:
com.fasterxml.jackson.databind.PropertyNamingStrategy$PascalCaseStrategy
[info] at
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
[info] at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
[info] at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
[info] at
com.amazonaws.services.kinesis.AmazonKinesisClient.<clinit>(AmazonKinesisClient.java:86)
[info] at
org.apache.spark.streaming.kinesis.KinesisTestUtils.kinesisClient$lzycompute(KinesisTestUtils.scala:59)
[info] at
org.apache.spark.streaming.kinesis.KinesisTestUtils.kinesisClient(KinesisTestUtils.scala:58)
[info] at
org.apache.spark.streaming.kinesis.KinesisTestUtils.describeStream(KinesisTestUtils.scala:169)
[info] at
org.apache.spark.streaming.kinesis.KinesisTestUtils.findNonExistentStreamName(KinesisTestUtils.scala:182)
[info] at
org.apache.spark.streaming.kinesis.KinesisTestUtils.createStream(KinesisTestUtils.scala:85)
[info] at
org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.$anonfun$beforeAll$1(KinesisBackedBlockRDDSuite.scala:45)
[info] at
org.apache.spark.streaming.kinesis.KinesisFunSuite.runIfTestsEnabled(KinesisFunSuite.scala:41)
[info] at
org.apache.spark.streaming.kinesis.KinesisFunSuite.runIfTestsEnabled$(KinesisFunSuite.scala:39)
[info] at
org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.runIfTestsEnabled(KinesisBackedBlockRDDSuite.scala:26)
[info] at
org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.beforeAll(KinesisBackedBlockRDDSuite.scala:43)
[info] at
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)
[info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info] at
org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68)
[info] at
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info] at
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info] at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info] at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
[error] Uncaught exception when running
org.apache.spark.streaming.kinesis.WithoutAggregationKinesisBackedBlockRDDSuite:
java.lang.NoClassDefFoundError:
com/fasterxml/jackson/databind/PropertyNamingStrategy$PascalCaseStrategy
```
### Does this PR introduce _any_ user-facing change?
No, only minor version upgrade for the Kinesis and AWS Java SDK libraries
### How was this patch tested?
```
ENABLE_KINESIS_TESTS=1 ./build/sbt -Pkinesis-asl
```
and
```
ENABLE_KINESIS_TESTS=1 build/mvn test -Pkinesis-asl -pl
connector/kinesis-asl
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52630 from vrozov/SPARK-53927.
Authored-by: Vlad Rozov <[email protected]>
Signed-off-by: Kousuke Saruta <[email protected]>
---
connector/kinesis-asl/pom.xml | 8 +++++++-
.../streaming/kinesis/KPLBasedKinesisTestUtils.scala | 16 +++++++++-------
.../spark/streaming/kinesis/KinesisTestUtils.scala | 18 ++++++------------
pom.xml | 6 +++---
4 files changed, 25 insertions(+), 23 deletions(-)
diff --git a/connector/kinesis-asl/pom.xml b/connector/kinesis-asl/pom.xml
index 2d02b910bf43..aa90e3c8a94f 100644
--- a/connector/kinesis-asl/pom.xml
+++ b/connector/kinesis-asl/pom.xml
@@ -64,10 +64,16 @@
<version>${aws.java.sdk.version}</version>
</dependency>
<dependency>
- <groupId>com.amazonaws</groupId>
+ <groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>${aws.kinesis.producer.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.kjetland</groupId>
+ <artifactId>mbknor-jackson-jsonschema_2.12</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- manage this up explicitly to match Spark;
com.amazonaws:aws-java-sdk-pom specifies
2.6.7 but says we can manage it up -->
diff --git
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
index 58b64ba11d35..6b06f197a137 100644
---
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
+++
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
@@ -18,15 +18,14 @@ package org.apache.spark.streaming.kinesis
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
+import java.util.concurrent.{Executors, TimeUnit}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import com.amazonaws.services.kinesis.producer.{KinesisProducer => KPLProducer,
- KinesisProducerConfiguration, UserRecordResult}
import com.google.common.util.concurrent.{FutureCallback, Futures}
-
-import org.apache.spark.util.ThreadUtils
+import software.amazon.kinesis.producer.{KinesisProducer => KPLProducer,
+ KinesisProducerConfiguration, UserRecordResult}
private[kinesis] class KPLBasedKinesisTestUtils(streamShardCount: Int = 2)
extends KinesisTestUtils(streamShardCount) {
@@ -53,6 +52,7 @@ private[kinesis] class KPLDataGenerator(regionName: String)
extends KinesisDataG
}
override def sendData(streamName: String, data: Seq[Int]): Map[String,
Seq[(Int, String)]] = {
+ val executor = Executors.newSingleThreadExecutor()
val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int,
String)]]()
data.foreach { num =>
val str = num.toString
@@ -63,15 +63,17 @@ private[kinesis] class KPLDataGenerator(regionName: String)
extends KinesisDataG
override def onSuccess(result: UserRecordResult): Unit = {
val shardId = result.getShardId
- val seqNumber = result.getSequenceNumber()
+ val seqNumber = result.getSequenceNumber
val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
new ArrayBuffer[(Int, String)]())
sentSeqNumbers += ((num, seqNumber))
}
}
- Futures.addCallback(future, kinesisCallBack,
ThreadUtils.sameThreadExecutorService())
+ Futures.addCallback(future, kinesisCallBack, executor)
}
producer.flushSync()
- shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq)
+ executor.shutdown()
+ executor.awaitTermination(10, TimeUnit.SECONDS)
+ shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq.sortBy(_._2))
}
}
diff --git
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 3f06d476f08d..7674cef105e7 100644
---
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -32,6 +32,7 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClient}
import com.amazonaws.services.kinesis.model._
+import com.amazonaws.waiters.WaiterParameters
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.{STREAM_NAME, TABLE_NAME}
@@ -61,6 +62,8 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int
= 2) extends Loggi
client
}
+ private lazy val streamExistsWaiter = kinesisClient.waiters().streamExists()
+
private lazy val dynamoDB = {
val dynamoDBClient = new AmazonDynamoDBClient(new
DefaultAWSCredentialsProviderChain())
dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
@@ -184,18 +187,9 @@ private[kinesis] class KinesisTestUtils(streamShardCount:
Int = 2) extends Loggi
}
private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
- val startTimeNs = System.nanoTime()
- while (System.nanoTime() - startTimeNs <
TimeUnit.SECONDS.toNanos(createStreamTimeoutSeconds)) {
- Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
- describeStream(streamNameToWaitFor).foreach { description =>
- val streamStatus = description.getStreamStatus()
- logDebug(s"\t- current state: $streamStatus\n")
- if ("ACTIVE".equals(streamStatus)) {
- return
- }
- }
- }
- require(false, s"Stream $streamName never became active")
+ val describeStreamRequest = new DescribeStreamRequest()
+ .withStreamName(streamNameToWaitFor)
+ streamExistsWaiter.run(new WaiterParameters(describeStreamRequest))
}
}
diff --git a/pom.xml b/pom.xml
index 741cebf601cf..37eb9dfbc4b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -158,12 +158,12 @@
<codahale.metrics.version>4.2.33</codahale.metrics.version>
<!-- Should be consistent with SparkBuild.scala and docs -->
<avro.version>1.12.0</avro.version>
- <aws.kinesis.client.version>1.12.0</aws.kinesis.client.version>
+ <aws.kinesis.client.version>1.15.3</aws.kinesis.client.version>
<!-- Should be consistent with Kinesis client dependency -->
- <aws.java.sdk.version>1.11.655</aws.java.sdk.version>
+ <aws.java.sdk.version>1.12.681</aws.java.sdk.version>
<aws.java.sdk.v2.version>2.29.52</aws.java.sdk.v2.version>
<!-- the producer is used in tests -->
- <aws.kinesis.producer.version>0.12.8</aws.kinesis.producer.version>
+ <aws.kinesis.producer.version>1.0.5</aws.kinesis.producer.version>
<!-- Do not use 3.0.0:
https://github.com/GoogleCloudDataproc/hadoop-connectors/issues/1114 -->
<gcs-connector.version>hadoop3-2.2.28</gcs-connector.version>
<analyticsaccelerator-s3.version>1.3.0</analyticsaccelerator-s3.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]