This is an automated email from the ASF dual-hosted git repository.

sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 136201ab4e75 [SPARK-53927][BUILD][DSTREAM] Upgrade kinesis client and 
fix kinesis integration tests
136201ab4e75 is described below

commit 136201ab4e750f03dfbc662dacb96b848f1ace2f
Author: Vlad Rozov <[email protected]>
AuthorDate: Fri Oct 17 14:48:25 2025 +0900

    [SPARK-53927][BUILD][DSTREAM] Upgrade kinesis client and fix kinesis 
integration tests
    
    ### What changes were proposed in this pull request?
    Upgrade kinesis client and AWS Java SDK to fix Kinesis integration tests.
    
    Kinesis client is upgraded from `1.12.0` to `1.15.3` (latest on 1.x)
    AWS Java SDK is upgraded from `1.11.655` to `1.12.681` (the one used by 
Kinesis client)
    AWS Kinesis producer library (used in test) upgraded from `0.12.8` to 
`1.0.5`
    
    ### Why are the changes needed?
    Existing clients are not compatible causing Kinesis integration tests to 
fail at runtime:
    ```
    ENABLE_KINESIS_TESTS=1 ./build/sbt -Pkinesis-asl
    ...
    Using endpoint URL https://kinesis.us-west-2.amazonaws.com for creating 
Kinesis streams for tests.
    [info] WithoutAggregationKinesisBackedBlockRDDSuite:
    [info] 
org.apache.spark.streaming.kinesis.WithoutAggregationKinesisBackedBlockRDDSuite 
*** ABORTED *** (1 second, 131 milliseconds)
    [info]   java.lang.NoClassDefFoundError: 
com/fasterxml/jackson/databind/PropertyNamingStrategy$PascalCaseStrategy
    [info]   at 
com.amazonaws.services.kinesis.AmazonKinesisClient.<clinit>(AmazonKinesisClient.java:86)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisTestUtils.kinesisClient$lzycompute(KinesisTestUtils.scala:59)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisTestUtils.kinesisClient(KinesisTestUtils.scala:58)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisTestUtils.describeStream(KinesisTestUtils.scala:169)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisTestUtils.findNonExistentStreamName(KinesisTestUtils.scala:182)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisTestUtils.createStream(KinesisTestUtils.scala:85)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.$anonfun$beforeAll$1(KinesisBackedBlockRDDSuite.scala:45)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisFunSuite.runIfTestsEnabled(KinesisFunSuite.scala:41)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisFunSuite.runIfTestsEnabled$(KinesisFunSuite.scala:39)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.runIfTestsEnabled(KinesisBackedBlockRDDSuite.scala:26)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.beforeAll(KinesisBackedBlockRDDSuite.scala:43)
    [info]   at 
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)
    [info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
    [info]   at 
org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
    [info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68)
    [info]   at 
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
    [info]   at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
    [info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
    [info]   at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    [info]   at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    [info]   at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    [info]   at java.base/java.lang.Thread.run(Thread.java:840)
    [info]   Cause: java.lang.ClassNotFoundException: 
com.fasterxml.jackson.databind.PropertyNamingStrategy$PascalCaseStrategy
    [info]   at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
    [info]   at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
    [info]   at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
    [info]   at 
com.amazonaws.services.kinesis.AmazonKinesisClient.<clinit>(AmazonKinesisClient.java:86)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisTestUtils.kinesisClient$lzycompute(KinesisTestUtils.scala:59)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisTestUtils.kinesisClient(KinesisTestUtils.scala:58)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisTestUtils.describeStream(KinesisTestUtils.scala:169)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisTestUtils.findNonExistentStreamName(KinesisTestUtils.scala:182)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisTestUtils.createStream(KinesisTestUtils.scala:85)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.$anonfun$beforeAll$1(KinesisBackedBlockRDDSuite.scala:45)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisFunSuite.runIfTestsEnabled(KinesisFunSuite.scala:41)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisFunSuite.runIfTestsEnabled$(KinesisFunSuite.scala:39)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.runIfTestsEnabled(KinesisBackedBlockRDDSuite.scala:26)
    [info]   at 
org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDTests.beforeAll(KinesisBackedBlockRDDSuite.scala:43)
    [info]   at 
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)
    [info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
    [info]   at 
org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
    [info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68)
    [info]   at 
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
    [info]   at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
    [info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
    [info]   at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    [info]   at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    [info]   at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    [info]   at java.base/java.lang.Thread.run(Thread.java:840)
    [error] Uncaught exception when running 
org.apache.spark.streaming.kinesis.WithoutAggregationKinesisBackedBlockRDDSuite:
 java.lang.NoClassDefFoundError: 
com/fasterxml/jackson/databind/PropertyNamingStrategy$PascalCaseStrategy
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No, only minor version upgrade for the Kinesis and AWS Java SDK libraries
    
    ### How was this patch tested?
    ```
    ENABLE_KINESIS_TESTS=1 ./build/sbt -Pkinesis-asl
    ```
    and
    ```
    ENABLE_KINESIS_TESTS=1 build/mvn test -Pkinesis-asl -pl 
connector/kinesis-asl
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #52630 from vrozov/SPARK-53927.
    
    Authored-by: Vlad Rozov <[email protected]>
    Signed-off-by: Kousuke Saruta <[email protected]>
---
 connector/kinesis-asl/pom.xml                          |  8 +++++++-
 .../streaming/kinesis/KPLBasedKinesisTestUtils.scala   | 16 +++++++++-------
 .../spark/streaming/kinesis/KinesisTestUtils.scala     | 18 ++++++------------
 pom.xml                                                |  6 +++---
 4 files changed, 25 insertions(+), 23 deletions(-)

diff --git a/connector/kinesis-asl/pom.xml b/connector/kinesis-asl/pom.xml
index 2d02b910bf43..aa90e3c8a94f 100644
--- a/connector/kinesis-asl/pom.xml
+++ b/connector/kinesis-asl/pom.xml
@@ -64,10 +64,16 @@
       <version>${aws.java.sdk.version}</version>
     </dependency>
     <dependency>
-      <groupId>com.amazonaws</groupId>
+      <groupId>software.amazon.kinesis</groupId>
       <artifactId>amazon-kinesis-producer</artifactId>
       <version>${aws.kinesis.producer.version}</version>
       <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>com.kjetland</groupId>
+          <artifactId>mbknor-jackson-jsonschema_2.12</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <!-- manage this up explicitly to match Spark; 
com.amazonaws:aws-java-sdk-pom specifies
       2.6.7 but says we can manage it up -->
diff --git 
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
 
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
index 58b64ba11d35..6b06f197a137 100644
--- 
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
+++ 
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
@@ -18,15 +18,14 @@ package org.apache.spark.streaming.kinesis
 
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
+import java.util.concurrent.{Executors, TimeUnit}
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
-import com.amazonaws.services.kinesis.producer.{KinesisProducer => KPLProducer,
-  KinesisProducerConfiguration, UserRecordResult}
 import com.google.common.util.concurrent.{FutureCallback, Futures}
-
-import org.apache.spark.util.ThreadUtils
+import software.amazon.kinesis.producer.{KinesisProducer => KPLProducer,
+  KinesisProducerConfiguration, UserRecordResult}
 
 private[kinesis] class KPLBasedKinesisTestUtils(streamShardCount: Int = 2)
     extends KinesisTestUtils(streamShardCount) {
@@ -53,6 +52,7 @@ private[kinesis] class KPLDataGenerator(regionName: String) 
extends KinesisDataG
   }
 
   override def sendData(streamName: String, data: Seq[Int]): Map[String, 
Seq[(Int, String)]] = {
+    val executor = Executors.newSingleThreadExecutor()
     val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, 
String)]]()
     data.foreach { num =>
       val str = num.toString
@@ -63,15 +63,17 @@ private[kinesis] class KPLDataGenerator(regionName: String) 
extends KinesisDataG
 
         override def onSuccess(result: UserRecordResult): Unit = {
           val shardId = result.getShardId
-          val seqNumber = result.getSequenceNumber()
+          val seqNumber = result.getSequenceNumber
           val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
             new ArrayBuffer[(Int, String)]())
           sentSeqNumbers += ((num, seqNumber))
         }
       }
-      Futures.addCallback(future, kinesisCallBack, 
ThreadUtils.sameThreadExecutorService())
+      Futures.addCallback(future, kinesisCallBack, executor)
     }
     producer.flushSync()
-    shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq)
+    executor.shutdown()
+    executor.awaitTermination(10, TimeUnit.SECONDS)
+    shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq.sortBy(_._2))
   }
 }
diff --git 
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
 
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 3f06d476f08d..7674cef105e7 100644
--- 
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ 
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -32,6 +32,7 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
 import com.amazonaws.services.dynamodbv2.document.DynamoDB
 import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClient}
 import com.amazonaws.services.kinesis.model._
+import com.amazonaws.waiters.WaiterParameters
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.LogKeys.{STREAM_NAME, TABLE_NAME}
@@ -61,6 +62,8 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int 
= 2) extends Loggi
     client
   }
 
+  private lazy val streamExistsWaiter = kinesisClient.waiters().streamExists()
+
   private lazy val dynamoDB = {
     val dynamoDBClient = new AmazonDynamoDBClient(new 
DefaultAWSCredentialsProviderChain())
     dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
@@ -184,18 +187,9 @@ private[kinesis] class KinesisTestUtils(streamShardCount: 
Int = 2) extends Loggi
   }
 
   private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
-    val startTimeNs = System.nanoTime()
-    while (System.nanoTime() - startTimeNs < 
TimeUnit.SECONDS.toNanos(createStreamTimeoutSeconds)) {
-      Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
-      describeStream(streamNameToWaitFor).foreach { description =>
-        val streamStatus = description.getStreamStatus()
-        logDebug(s"\t- current state: $streamStatus\n")
-        if ("ACTIVE".equals(streamStatus)) {
-          return
-        }
-      }
-    }
-    require(false, s"Stream $streamName never became active")
+    val describeStreamRequest = new DescribeStreamRequest()
+      .withStreamName(streamNameToWaitFor)
+    streamExistsWaiter.run(new WaiterParameters(describeStreamRequest))
   }
 }
 
diff --git a/pom.xml b/pom.xml
index 741cebf601cf..37eb9dfbc4b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -158,12 +158,12 @@
     <codahale.metrics.version>4.2.33</codahale.metrics.version>
     <!-- Should be consistent with SparkBuild.scala and docs -->
     <avro.version>1.12.0</avro.version>
-    <aws.kinesis.client.version>1.12.0</aws.kinesis.client.version>
+    <aws.kinesis.client.version>1.15.3</aws.kinesis.client.version>
     <!-- Should be consistent with Kinesis client dependency -->
-    <aws.java.sdk.version>1.11.655</aws.java.sdk.version>
+    <aws.java.sdk.version>1.12.681</aws.java.sdk.version>
     <aws.java.sdk.v2.version>2.29.52</aws.java.sdk.v2.version>
     <!-- the producer is used in tests -->
-    <aws.kinesis.producer.version>0.12.8</aws.kinesis.producer.version>
+    <aws.kinesis.producer.version>1.0.5</aws.kinesis.producer.version>
     <!-- Do not use 3.0.0: 
https://github.com/GoogleCloudDataproc/hadoop-connectors/issues/1114 -->
     <gcs-connector.version>hadoop3-2.2.28</gcs-connector.version>
     <analyticsaccelerator-s3.version>1.3.0</analyticsaccelerator-s3.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to