This is an automated email from the ASF dual-hosted git repository.
sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d5b51fe31b9e [SPARK-45720][BUILD][DSTREAM][KINESIS] Upgrade KCL to
2.7.2 and remove AWS SDK for Java 1.x dependency
d5b51fe31b9e is described below
commit d5b51fe31b9ee53a0da802dac2c5c486c0533c2e
Author: Junyu Chen <[email protected]>
AuthorDate: Mon Dec 22 11:30:23 2025 +0900
[SPARK-45720][BUILD][DSTREAM][KINESIS] Upgrade KCL to 2.7.2 and remove AWS
SDK for Java 1.x dependency
### What changes were proposed in this pull request?
This PR proposes to upgrade KCL to 2.7.2 based on junyuc25 's
[PR](https://github.com/apache/spark/pull/44211) with some updates.
By upgrading KCL, we can remove AWS SDK for Java 1.x dependency.
* Basic migration guide.
* https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html
### Why are the changes needed?
* KCL 1.x will reach end-of-life on January 30, 2026.
*
https://docs.aws.amazon.com/streams/latest/dev/kcl-version-lifecycle-policy.html
* Currently, Spark depends on both AWS SDK for Java 1.x and 2.x. 1.x
dependency can be removed by this PR.
### Does this PR introduce _any_ user-facing change?
Expect the behavior is not changed.
### How was this patch tested?
Confirmed all kinesis tests passed with the following commands.
* SBT
```
$ ENABLE_KINESIS_TESTS=1 nohup ./build/sbt -Pkinesis-asl
'streaming-kinesis-asl/test'
```
* Maven
```
$ ENABLE_KINESIS_TESTS=1 build/mvn -Pkinesis-asl
-Dtest=org.apache.spark.streaming.kinesis.JavaKinesisInputDStreamBuilderSuite
-DwildcardSuit\
es=org.apache.spark.streaming.kinesis test
```
Also confirmed existing examples work.
```
# Need to do `build/sbt -Pkinesis-asl package` beforehand
# Producer
$ bin/run-example streaming.KinesisWordProducerASL kinesis-example-stream
https://kinesis.us-west-2.amazonaws.com 10 5
# Consumer
$ bin/run-example streaming.KinesisWordCountASL my-stream-app
kinesis-example-stream https://kinesis.us-west-2.amazonaws.com
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53256 from sarutak/upgrade-aws-sdk2.
Lead-authored-by: Junyu Chen <[email protected]>
Co-authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Kousuke Saruta <[email protected]>
---
connector/kinesis-asl/pom.xml | 58 +++++++-
.../streaming/JavaKinesisWordCountASL.java | 25 +++-
.../streaming/kinesis/KinesisInitialPositions.java | 2 +-
.../examples/streaming/KinesisExampleUtils.scala | 10 +-
.../examples/streaming/KinesisWordCountASL.scala | 47 ++++--
.../streaming/kinesis/KinesisBackedBlockRDD.scala | 86 ++++++-----
.../streaming/kinesis/KinesisCheckpointer.scala | 27 ++--
.../streaming/kinesis/KinesisInputDStream.scala | 28 ++--
.../spark/streaming/kinesis/KinesisReceiver.scala | 159 ++++++++++++---------
.../streaming/kinesis/KinesisRecordProcessor.scala | 133 +++++++++--------
.../kinesis/KinesisUtilsPythonHelper.scala | 4 +-
.../streaming/kinesis/SparkAWSCredentials.scala | 55 ++++---
.../JavaKinesisInputDStreamBuilderSuite.java | 2 +-
.../kinesis/KinesisCheckpointerSuite.scala | 14 +-
.../kinesis/KinesisInputDStreamBuilderSuite.scala | 6 +-
.../streaming/kinesis/KinesisReceiverSuite.scala | 103 ++++++++-----
.../streaming/kinesis/KinesisStreamSuite.scala | 14 +-
.../spark/streaming/kinesis/KinesisTestUtils.scala | 133 ++++++++++-------
docs/streaming-kinesis-integration.md | 27 ++--
pom.xml | 5 +-
20 files changed, 565 insertions(+), 373 deletions(-)
diff --git a/connector/kinesis-asl/pom.xml b/connector/kinesis-asl/pom.xml
index 8fd1ad1d0ece..de785e5c5db3 100644
--- a/connector/kinesis-asl/pom.xml
+++ b/connector/kinesis-asl/pom.xml
@@ -54,14 +54,64 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.amazonaws</groupId>
+ <groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>${aws.kinesis.client.version}</version>
+ <exclusions>
+ <!--
+ mbknor-jackson-jsonschema is necessary at runtime only if JSON
format schema is
+ registered using GlueSchemaRegistry. kinesis-asl currently doesn't
use this feature
+ so it should be safe to exclude the dependency.
+ -->
+ <exclusion>
+ <groupId>com.kjetland</groupId>
+ <artifactId>mbknor-jackson-jsonschema_2.12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.lz4</groupId>
+ <artifactId>lz4-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>auth</artifactId>
+ <version>${aws.java.sdk.v2.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>sts</artifactId>
+ <version>${aws.java.sdk.v2.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>apache-client</artifactId>
+ <version>${aws.java.sdk.v2.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>regions</artifactId>
+ <version>${aws.java.sdk.v2.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>dynamodb</artifactId>
+ <version>${aws.java.sdk.v2.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>kinesis</artifactId>
+ <version>${aws.java.sdk.v2.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>cloudwatch</artifactId>
+ <version>${aws.java.sdk.v2.version}</version>
</dependency>
<dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-sts</artifactId>
- <version>${aws.java.sdk.version}</version>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>sdk-core</artifactId>
+ <version>${aws.java.sdk.v2.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.kinesis</groupId>
diff --git
a/connector/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
b/connector/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
index 636af9f6c606..98ef98798bb0 100644
---
a/connector/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
+++
b/connector/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
@@ -16,6 +16,7 @@
*/
package org.apache.spark.examples.streaming;
+import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -38,8 +39,10 @@ import
org.apache.spark.streaming.kinesis.KinesisInputDStream;
import scala.Tuple2;
import scala.reflect.ClassTag$;
-import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
/**
* Consumes messages from a Amazon Kinesis streams and does wordcount.
@@ -66,7 +69,7 @@ import com.amazonaws.services.kinesis.AmazonKinesisClient;
* There is a companion helper class called KinesisWordProducerASL which puts
dummy data
* onto the Kinesis stream.
*
- * This code uses the DefaultAWSCredentialsProviderChain to find credentials
+ * This code uses the DefaultCredentialsProvider to find credentials
* in the following order:
* Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
* Java System Properties - aws.accessKeyId and aws.secretKey
@@ -106,11 +109,19 @@ public final class JavaKinesisWordCountASL { // needs to
be public for access fr
String endpointUrl = args[2];
// Create a Kinesis client in order to determine the number of shards for
the given stream
- AmazonKinesisClient kinesisClient =
- new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain());
- kinesisClient.setEndpoint(endpointUrl);
+ KinesisClient kinesisClient =
+ KinesisClient.builder()
+ .credentialsProvider(DefaultCredentialsProvider.create())
+ .endpointOverride(URI.create(endpointUrl))
+ .httpClientBuilder(ApacheHttpClient.builder())
+ .build();
+
+ DescribeStreamRequest describeStreamRequest =
+ DescribeStreamRequest.builder()
+ .streamName(streamName)
+ .build();
int numShards =
-
kinesisClient.describeStream(streamName).getStreamDescription().getShards().size();
+
kinesisClient.describeStream(describeStreamRequest).streamDescription().shards().size();
// In this example, we're going to create 1 Kinesis Receiver/input DStream
for each shard.
diff --git
a/connector/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java
b/connector/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java
index b5f5ab0e9054..936044c5297a 100644
---
a/connector/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java
+++
b/connector/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPositions.java
@@ -16,7 +16,7 @@
*/
package org.apache.spark.streaming.kinesis;
-import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import software.amazon.kinesis.common.InitialPositionInStream;
import java.io.Serializable;
import java.util.Date;
diff --git
a/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisExampleUtils.scala
b/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisExampleUtils.scala
index 737e5199e71a..ec3bc67f5e2a 100644
---
a/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisExampleUtils.scala
+++
b/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisExampleUtils.scala
@@ -19,16 +19,16 @@ package org.apache.spark.examples.streaming
import scala.jdk.CollectionConverters._
-import com.amazonaws.regions.RegionUtils
-import com.amazonaws.services.kinesis.AmazonKinesis
+import software.amazon.awssdk.regions.servicemetadata.KinesisServiceMetadata
private[streaming] object KinesisExampleUtils {
def getRegionNameByEndpoint(endpoint: String): String = {
val uri = new java.net.URI(endpoint)
- RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX)
+ val kinesisServiceMetadata = new KinesisServiceMetadata()
+ kinesisServiceMetadata.regions
.asScala
- .find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
- .map(_.getName)
+ .find(r =>
kinesisServiceMetadata.endpointFor(r).toString.equals(uri.getHost))
+ .map(_.id)
.getOrElse(
throw new IllegalArgumentException(s"Could not resolve region for
endpoint: $endpoint"))
}
diff --git
a/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
b/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
index cc24c378f4cb..217069bd16c6 100644
---
a/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
+++
b/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
@@ -18,15 +18,18 @@
// scalastyle:off println
package org.apache.spark.examples.streaming
+import java.net.URI
import java.nio.ByteBuffer
import scala.util.Random
-import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
-import com.amazonaws.services.kinesis.AmazonKinesisClient
-import com.amazonaws.services.kinesis.model.PutRecordRequest
import org.apache.logging.log4j.Level
import org.apache.logging.log4j.core.config.Configurator
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
+import software.amazon.awssdk.core.SdkBytes
+import software.amazon.awssdk.http.apache.ApacheHttpClient
+import software.amazon.awssdk.services.kinesis.KinesisClient
+import software.amazon.awssdk.services.kinesis.model.{DescribeStreamRequest,
PutRecordRequest}
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
@@ -101,13 +104,22 @@ object KinesisWordCountASL extends Logging {
// Determine the number of shards from the stream using the low-level
Kinesis Client
// from the AWS Java SDK.
- val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
- require(credentials != null,
+ val credentialsProvider = DefaultCredentialsProvider.create
+ require(credentialsProvider.resolveCredentials() != null,
"No AWS credentials found. Please specify credentials using one of the
methods specified " +
- "in
http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
- val kinesisClient = new AmazonKinesisClient(credentials)
- kinesisClient.setEndpoint(endpointUrl)
- val numShards =
kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
+ "in
https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
+ val kinesisClient = KinesisClient.builder()
+ .credentialsProvider(credentialsProvider)
+ .endpointOverride(URI.create(endpointUrl))
+ .httpClientBuilder(ApacheHttpClient.builder())
+ .build()
+ val describeStreamRequest = DescribeStreamRequest.builder()
+ .streamName(streamName)
+ .build()
+ val numShards = kinesisClient.describeStream(describeStreamRequest)
+ .streamDescription
+ .shards
+ .size
// In this example, we're going to create 1 Kinesis Receiver/input DStream
for each shard.
@@ -221,8 +233,11 @@ object KinesisWordProducerASL {
val totals = scala.collection.mutable.Map[String, Int]()
// Create the low-level Kinesis Client from the AWS Java SDK.
- val kinesisClient = new AmazonKinesisClient(new
DefaultAWSCredentialsProviderChain())
- kinesisClient.setEndpoint(endpoint)
+ val kinesisClient = KinesisClient.builder()
+ .credentialsProvider(DefaultCredentialsProvider.create())
+ .endpointOverride(URI.create(endpoint))
+ .httpClientBuilder(ApacheHttpClient.builder())
+ .build()
println(s"Putting records onto stream $stream and endpoint $endpoint at a
rate of" +
s" $recordsPerSecond records per second and $wordsPerRecord words per
record")
@@ -247,12 +262,14 @@ object KinesisWordProducerASL {
val partitionKey = s"partitionKey-$recordNum"
// Create a PutRecordRequest with an Array[Byte] version of the data
- val putRecordRequest = new PutRecordRequest().withStreamName(stream)
- .withPartitionKey(partitionKey)
- .withData(ByteBuffer.wrap(data.getBytes()))
+ val putRecordRequest = PutRecordRequest.builder()
+ .streamName(stream)
+ .partitionKey(partitionKey)
+ .data(SdkBytes.fromByteBuffer(ByteBuffer.wrap(data.getBytes())))
+ .build()
// Put the record onto the stream and capture the PutRecordResult
- val putRecordResult = kinesisClient.putRecord(putRecordRequest)
+ kinesisClient.putRecord(putRecordRequest)
}
// Sleep for a second
diff --git
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
index ac3622f93321..a9a51db7abe1 100644
---
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -17,16 +17,20 @@
package org.apache.spark.streaming.kinesis
+import java.net.URI
import java.util.concurrent.TimeUnit
+import java.util.stream.Collectors
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
import scala.util.control.NonFatal
-import com.amazonaws.auth.AWSCredentials
-import com.amazonaws.services.kinesis.AmazonKinesisClient
-import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
-import com.amazonaws.services.kinesis.model._
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
+import software.amazon.awssdk.http.apache.ApacheHttpClient
+import software.amazon.awssdk.regions.Region
+import software.amazon.awssdk.services.kinesis.KinesisClient
+import software.amazon.awssdk.services.kinesis.model.{GetRecordsRequest,
GetRecordsResponse, GetShardIteratorRequest, GetShardIteratorResponse,
ProvisionedThroughputExceededException, ShardIteratorType}
+import software.amazon.kinesis.retrieval.{AggregatorUtil, KinesisClientRecord}
import org.apache.spark._
import org.apache.spark.internal.Logging
@@ -84,7 +88,7 @@ class KinesisBackedBlockRDD[T: ClassTag](
@transient private val _blockIds: Array[BlockId],
@transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges],
@transient private val isBlockIdValid: Array[Boolean] = Array.empty,
- val messageHandler: Record => T =
KinesisInputDStream.defaultMessageHandler _,
+ val messageHandler: KinesisClientRecord => T =
KinesisInputDStream.defaultMessageHandler _,
val kinesisCreds: SparkAWSCredentials = DefaultCredentials,
val kinesisReadConfigs: KinesisReadConfigurations =
KinesisReadConfigurations()
) extends BlockRDD[T](sc, _blockIds) {
@@ -112,9 +116,9 @@ class KinesisBackedBlockRDD[T: ClassTag](
}
def getBlockFromKinesis(): Iterator[T] = {
- val credentials = kinesisCreds.provider.getCredentials
+ val credentialsProvider = kinesisCreds.provider
partition.seqNumberRanges.ranges.iterator.flatMap { range =>
- new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
+ new KinesisSequenceRangeIterator(credentialsProvider, endpointUrl,
regionName,
range, kinesisReadConfigs).map(messageHandler)
}
}
@@ -134,13 +138,19 @@ class KinesisBackedBlockRDD[T: ClassTag](
*/
private[kinesis]
class KinesisSequenceRangeIterator(
- credentials: AWSCredentials,
+ credentialsProvider: AwsCredentialsProvider,
endpointUrl: String,
regionId: String,
range: SequenceNumberRange,
- kinesisReadConfigs: KinesisReadConfigurations) extends
NextIterator[Record] with Logging {
-
- private val client = new AmazonKinesisClient(credentials)
+ kinesisReadConfigs: KinesisReadConfigurations)
+ extends NextIterator[KinesisClientRecord] with Logging {
+
+ private val client = KinesisClient.builder()
+ .credentialsProvider(credentialsProvider)
+ .region(Region.of(regionId))
+ .endpointOverride(URI.create(endpointUrl))
+ .httpClientBuilder(ApacheHttpClient.builder())
+ .build()
private val streamName = range.streamName
private val shardId = range.shardId
// AWS limits to maximum of 10k records per get call
@@ -148,12 +158,11 @@ class KinesisSequenceRangeIterator(
private var toSeqNumberReceived = false
private var lastSeqNumber: String = null
- private var internalIterator: Iterator[Record] = null
-
- client.setEndpoint(endpointUrl)
+ private var internalIterator: Iterator[KinesisClientRecord] = null
+ private val aggregatorUtil = new AggregatorUtil()
- override protected def getNext(): Record = {
- var nextRecord: Record = null
+ override protected def getNext(): KinesisClientRecord = {
+ var nextRecord: KinesisClientRecord = null
if (toSeqNumberReceived) {
finished = true
} else {
@@ -183,11 +192,11 @@ class KinesisSequenceRangeIterator(
// Get the record, copy the data into a byte array and remember its
sequence number
nextRecord = internalIterator.next()
- lastSeqNumber = nextRecord.getSequenceNumber()
+ lastSeqNumber = nextRecord.sequenceNumber
// If the this record's sequence number matches the stopping sequence
number, then make sure
// the iterator is marked finished next time getNext() is called
- if (nextRecord.getSequenceNumber == range.toSeqNumber) {
+ if (nextRecord.sequenceNumber == range.toSeqNumber) {
toSeqNumberReceived = true
}
}
@@ -196,7 +205,7 @@ class KinesisSequenceRangeIterator(
}
override protected def close(): Unit = {
- client.shutdown()
+ client.close()
}
/**
@@ -205,7 +214,7 @@ class KinesisSequenceRangeIterator(
private def getRecords(
iteratorType: ShardIteratorType,
seqNum: String,
- recordCount: Int): Iterator[Record] = {
+ recordCount: Int): Iterator[KinesisClientRecord] = {
val shardIterator = getKinesisIterator(iteratorType, seqNum)
val result = getRecordsAndNextKinesisIterator(shardIterator, recordCount)
result._1
@@ -217,19 +226,23 @@ class KinesisSequenceRangeIterator(
*/
private def getRecordsAndNextKinesisIterator(
shardIterator: String,
- recordCount: Int): (Iterator[Record], String) = {
- val getRecordsRequest = new GetRecordsRequest
- getRecordsRequest.setRequestCredentials(credentials)
- getRecordsRequest.setShardIterator(shardIterator)
- getRecordsRequest.setLimit(Math.min(recordCount, this.maxGetRecordsLimit))
- val getRecordsResult = retryOrTimeout[GetRecordsResult](
+ recordCount: Int): (Iterator[KinesisClientRecord], String) = {
+ val getRecordsRequest = GetRecordsRequest.builder()
+ .shardIterator(shardIterator)
+ .limit(Math.min(recordCount, this.maxGetRecordsLimit))
+ .build()
+ val getRecordsResponse = retryOrTimeout[GetRecordsResponse](
s"getting records using shard iterator") {
client.getRecords(getRecordsRequest)
}
// De-aggregate records, if KPL was used in producing the records. The KCL
automatically
// handles de-aggregation during regular operation. This code path is used
during recovery
- val recordIterator = UserRecord.deaggregate(getRecordsResult.getRecords)
- (recordIterator.iterator().asScala, getRecordsResult.getNextShardIterator)
+ val records = getRecordsResponse.records()
+ .stream()
+ .map[KinesisClientRecord](r => KinesisClientRecord.fromRecord(r))
+ .collect(Collectors.toList[KinesisClientRecord]())
+ val recordIterator = aggregatorUtil.deaggregate(records)
+ (recordIterator.iterator().asScala, getRecordsResponse.nextShardIterator)
}
/**
@@ -239,17 +252,18 @@ class KinesisSequenceRangeIterator(
private def getKinesisIterator(
iteratorType: ShardIteratorType,
sequenceNumber: String): String = {
- val getShardIteratorRequest = new GetShardIteratorRequest
- getShardIteratorRequest.setRequestCredentials(credentials)
- getShardIteratorRequest.setStreamName(streamName)
- getShardIteratorRequest.setShardId(shardId)
- getShardIteratorRequest.setShardIteratorType(iteratorType.toString)
- getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber)
- val getShardIteratorResult = retryOrTimeout[GetShardIteratorResult](
+ val getShardIteratorRequest = GetShardIteratorRequest.builder()
+ .streamName(streamName)
+ .shardId(shardId)
+ .shardIteratorType(iteratorType)
+ .startingSequenceNumber(sequenceNumber)
+ .build()
+
+ val getShardIteratorResponse = retryOrTimeout[GetShardIteratorResponse](
s"getting shard iterator from sequence number $sequenceNumber") {
client.getShardIterator(getShardIteratorRequest)
}
- getShardIteratorResult.getShardIterator
+ getShardIteratorResponse.shardIterator
}
/** Helper method to retry Kinesis API request with exponential backoff and
timeouts */
diff --git
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
index b259a5337f37..d6ce9c6c4f4c 100644
---
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
+++
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
@@ -20,7 +20,7 @@ import java.util.concurrent._
import scala.util.control.NonFatal
-import
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import software.amazon.kinesis.processor.RecordProcessorCheckpointer
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.{SHARD_ID, WORKER_URL}
@@ -33,35 +33,34 @@ import org.apache.spark.util.{Clock, SystemClock}
*
* @param receiver The receiver that keeps track of which sequence numbers we
can checkpoint
* @param checkpointInterval How frequently we will checkpoint to DynamoDB
- * @param workerId Worker Id of KCL worker for logging purposes
+ * @param schedulerId Scheduler Id of KCL scheduler for logging purposes
* @param clock In order to use ManualClocks for the purpose of testing
*/
private[kinesis] class KinesisCheckpointer(
receiver: KinesisReceiver[_],
checkpointInterval: Duration,
- workerId: String,
+ schedulerId: String,
clock: Clock = new SystemClock) extends Logging {
// a map from shardId's to checkpointers
- private val checkpointers = new ConcurrentHashMap[String,
IRecordProcessorCheckpointer]()
+ private val checkpointers = new ConcurrentHashMap[String,
RecordProcessorCheckpointer]()
private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, String]()
private val checkpointerThread: RecurringTimer = startCheckpointerThread()
/** Update the checkpointer instance to the most recent one for the given
shardId. */
- def setCheckpointer(shardId: String, checkpointer:
IRecordProcessorCheckpointer): Unit = {
+ def setCheckpointer(shardId: String, checkpointer:
RecordProcessorCheckpointer): Unit = {
checkpointers.put(shardId, checkpointer)
}
/**
* Stop tracking the specified shardId.
*
- * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown
[[ShutdownReason.TERMINATE]],
- * we will use that to make the final checkpoint. If `null` is provided, we
will not make the
- * checkpoint, e.g. in case of [[ShutdownReason.ZOMBIE]].
+ * If a checkpointer is provided, we will use that to make the final
checkpoint. If `null`
+ * is provided, we will not make the checkpoint, e.g. in case of
[[ShutdownReason.ZOMBIE]].
*/
- def removeCheckpointer(shardId: String, checkpointer:
IRecordProcessorCheckpointer): Unit = {
+ def removeCheckpointer(shardId: String, checkpointer:
RecordProcessorCheckpointer): Unit = {
synchronized {
checkpointers.remove(shardId)
}
@@ -73,7 +72,7 @@ private[kinesis] class KinesisCheckpointer(
KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)
} catch {
case NonFatal(e) =>
- logError(log"Exception: WorkerId ${MDC(WORKER_URL, workerId)}
encountered an " +
+ logError(log"Exception: SchedulerId ${MDC(WORKER_URL, schedulerId)}
encountered an " +
log"exception while checkpointing to finish reading a shard of " +
log"${MDC(SHARD_ID, shardId)}.", e)
// Rethrow the exception to the Kinesis Worker that is managing this
RecordProcessor
@@ -83,7 +82,7 @@ private[kinesis] class KinesisCheckpointer(
}
/** Perform the checkpoint. */
- private def checkpoint(shardId: String, checkpointer:
IRecordProcessorCheckpointer): Unit = {
+ private def checkpoint(shardId: String, checkpointer:
RecordProcessorCheckpointer): Unit = {
try {
if (checkpointer != null) {
receiver.getLatestSeqNumToCheckpoint(shardId).foreach { latestSeqNum =>
@@ -93,8 +92,8 @@ private[kinesis] class KinesisCheckpointer(
if (lastSeqNum == null || latestSeqNum > lastSeqNum) {
/* Perform the checkpoint */
KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(latestSeqNum), 4,
100)
- logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint at
sequence number" +
- s" $latestSeqNum for shardId $shardId")
+ logDebug(s"Checkpoint: schedulerId $schedulerId completed
checkpoint at sequence " +
+ s" number $latestSeqNum for shardId $shardId")
lastCheckpointedSeqNums.put(shardId, latestSeqNum)
}
}
@@ -127,7 +126,7 @@ private[kinesis] class KinesisCheckpointer(
*/
private def startCheckpointerThread(): RecurringTimer = {
val period = checkpointInterval.milliseconds
- val threadName = s"Kinesis Checkpointer - Worker $workerId"
+ val threadName = s"Kinesis Checkpointer - scheduler $schedulerId"
val timer = new RecurringTimer(clock, period, _ => checkpointAll(),
threadName)
timer.start()
logDebug(s"Started checkpointer thread: $threadName")
diff --git
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
index 9e432eda6251..11b6d28266a0 100644
---
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
+++
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
@@ -17,12 +17,11 @@
package org.apache.spark.streaming.kinesis
-import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
-import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream,
KinesisClientLibConfiguration}
-import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
-import com.amazonaws.services.kinesis.model.Record
+import software.amazon.kinesis.common.InitialPositionInStream
+import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
+import software.amazon.kinesis.retrieval.KinesisClientRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{BlockId, StorageLevel}
@@ -42,7 +41,7 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
val checkpointAppName: String,
val checkpointInterval: Duration,
val _storageLevel: StorageLevel,
- val messageHandler: Record => T,
+ val messageHandler: KinesisClientRecord => T,
val kinesisCreds: SparkAWSCredentials,
val dynamoDBCreds: Option[SparkAWSCredentials],
val cloudWatchCreds: Option[SparkAWSCredentials],
@@ -275,7 +274,7 @@ object KinesisInputDStream {
/**
* Sets the CloudWatch metrics level. Defaults to
- * [[KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL]] if no custom
value is specified.
+ * [[MetricsLevel.DETAILED]] if no custom value is specified.
*
* @param metricsLevel [[MetricsLevel]] to specify the CloudWatch metrics
level
* @return Reference to this [[KinesisInputDStream.Builder]]
@@ -289,8 +288,8 @@ object KinesisInputDStream {
/**
* Sets the enabled CloudWatch metrics dimensions. Defaults to
- * [[KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS]]
- * if no custom value is specified.
+ * the set of [[MetricsUtil.OPERATION_DIMENSION_NAME]] and
+ * [[MetricsUtil.SHARD_ID_DIMENSION_NAME]] if no custom value is specified.
*
* @param metricsEnabledDimensions Set[String] to specify which CloudWatch
metrics dimensions
* should be enabled
@@ -307,11 +306,12 @@ object KinesisInputDStream {
* Create a new instance of [[KinesisInputDStream]] with configured
parameters and the provided
* message handler.
*
- * @param handler Function converting [[Record]] instances read by the KCL
to DStream type [[T]]
+ * @param handler Function converting [[KinesisClientRecord]] instances
read by the KCL to
+ * DStream type [[T]]
* @return Instance of [[KinesisInputDStream]] constructed with configured
parameters
*/
def buildWithMessageHandler[T: ClassTag](
- handler: Record => T): KinesisInputDStream[T] = {
+ handler: KinesisClientRecord => T): KinesisInputDStream[T] = {
val ssc = getRequiredParam(streamingContext, "streamingContext")
new KinesisInputDStream(
ssc,
@@ -351,9 +351,9 @@ object KinesisInputDStream {
*/
def builder: Builder = new Builder
- private[kinesis] def defaultMessageHandler(record: Record): Array[Byte] = {
+ private[kinesis] def defaultMessageHandler(record: KinesisClientRecord):
Array[Byte] = {
if (record == null) return null
- val byteBuffer = record.getData()
+ val byteBuffer = record.data
val byteArray = new Array[Byte](byteBuffer.remaining())
byteBuffer.get(byteArray)
byteArray
@@ -365,7 +365,7 @@ object KinesisInputDStream {
private[kinesis] val DEFAULT_INITIAL_POSITION: KinesisInitialPosition = new
Latest()
private[kinesis] val DEFAULT_STORAGE_LEVEL: StorageLevel =
StorageLevel.MEMORY_AND_DISK_2
private[kinesis] val DEFAULT_METRICS_LEVEL: MetricsLevel =
- KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL
+ MetricsLevel.DETAILED
private[kinesis] val DEFAULT_METRICS_ENABLED_DIMENSIONS: Set[String] =
-
KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet
+ Set(MetricsUtil.OPERATION_DIMENSION_NAME,
MetricsUtil.SHARD_ID_DIMENSION_NAME)
}
diff --git
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index ab91431035fe..ed5aee83901f 100644
---
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -16,17 +16,24 @@
*/
package org.apache.spark.streaming.kinesis
-import java.util.UUID
+import java.net.URI
+import java.util.{HashSet, List => JList, UUID}
import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal
-import
com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor,
IRecordProcessorCheckpointer, IRecordProcessorFactory}
-import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.{KinesisClientLibConfiguration,
Worker}
-import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
-import com.amazonaws.services.kinesis.model.Record
+import software.amazon.awssdk.regions.Region
+import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
+import software.amazon.kinesis.common.{ConfigsBuilder,
InitialPositionInStreamExtended, KinesisClientUtil}
+import software.amazon.kinesis.coordinator.Scheduler
+import software.amazon.kinesis.metrics.{MetricsConfig, MetricsLevel}
+import software.amazon.kinesis.processor.{RecordProcessorCheckpointer,
ShardRecordProcessor, ShardRecordProcessorFactory}
+import software.amazon.kinesis.retrieval.KinesisClientRecord
+import software.amazon.kinesis.retrieval.polling.PollingConfig
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.WORKER_URL
@@ -39,12 +46,12 @@ import org.apache.spark.util.Utils
/**
* Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
- * This implementation relies on the Kinesis Client Library (KCL) Worker as
described here:
+ * This implementation relies on the Kinesis Client Library (KCL) Scheduler as
described here:
* https://github.com/awslabs/amazon-kinesis-client
*
* The way this Receiver works is as follows:
*
- * - The receiver starts a KCL Worker, which is essentially runs a threadpool
of multiple
+ * - The receiver starts a KCL Scheduler, which is essentially runs a
threadpool of multiple
* KinesisRecordProcessor
* - Each KinesisRecordProcessor receives data from a Kinesis shard in
batches. Each batch is
* inserted into a Block Generator, and the corresponding range of sequence
numbers is recorded.
@@ -62,7 +69,7 @@ import org.apache.spark.util.Utils
* DynamoDB (lease coordination and checkpointing) and
CloudWatch (metrics)
* @param initialPosition Instance of [[KinesisInitialPosition]]
* In the absence of Kinesis checkpoint info, this is
the
- * worker's initial starting position in the stream.
+ * scheduler's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* ([[KinesisInitialPositions.TrimHorizon]]) or
@@ -92,7 +99,7 @@ private[kinesis] class KinesisReceiver[T](
checkpointAppName: String,
checkpointInterval: Duration,
storageLevel: StorageLevel,
- messageHandler: Record => T,
+ messageHandler: KinesisClientRecord => T,
kinesisCreds: SparkAWSCredentials,
dynamoDBCreds: Option[SparkAWSCredentials],
cloudWatchCreds: Option[SparkAWSCredentials],
@@ -108,19 +115,19 @@ private[kinesis] class KinesisReceiver[T](
*/
/**
- * workerId is used by the KCL should be based on the ip address of the
actual Spark Worker
+ * schedulerId is used by the KCL should be based on the ip address of the
actual Spark Worker
* where this code runs (not the driver's IP address.)
*/
- @volatile private var workerId: String = null
+ @volatile private var schedulerId: String = null
/**
- * Worker is the core client abstraction from the Kinesis Client Library
(KCL).
- * A worker can process more than one shards from the given stream.
- * Each shard is assigned its own IRecordProcessor and the worker run
multiple such
+ * Scheduler is the core client abstraction from the Kinesis Client Library
(KCL).
+ * A Scheduler can process more than one shards from the given stream.
+ * Each shard is assigned its own ShardRecordProcessor and the scheduler run
multiple such
* processors.
*/
- @volatile private var worker: Worker = null
- @volatile private var workerThread: Thread = null
+ @volatile private var scheduler: Scheduler = null
+ @volatile private var schedulerThread: Thread = null
/** BlockGenerator used to generates blocks out of Kinesis data */
@volatile private var blockGenerator: BlockGenerator = null
@@ -146,59 +153,71 @@ private[kinesis] class KinesisReceiver[T](
/**
* This is called when the KinesisReceiver starts and must be non-blocking.
- * The KCL creates and manages the receiving/processing thread pool through
Worker.run().
+ * The KCL creates and manages the receiving/processing thread pool through
Scheduler.run().
*/
override def onStart(): Unit = {
blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)
- workerId = Utils.localHostName() + ":" + UUID.randomUUID()
+ schedulerId = Utils.localHostName() + ":" + UUID.randomUUID()
- kinesisCheckpointer = new KinesisCheckpointer(receiver,
checkpointInterval, workerId)
+ kinesisCheckpointer = new KinesisCheckpointer(receiver,
checkpointInterval, schedulerId)
val kinesisProvider = kinesisCreds.provider
- val kinesisClientLibConfiguration = {
- val baseClientLibConfiguration = new KinesisClientLibConfiguration(
- checkpointAppName,
- streamName,
- kinesisProvider,
- dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
- cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
- workerId)
- .withKinesisEndpoint(endpointUrl)
- .withTaskBackoffTimeMillis(500)
- .withRegionName(regionName)
- .withMetricsLevel(metricsLevel)
- .withMetricsEnabledDimensions(metricsEnabledDimensions.asJava)
-
- // Update the Kinesis client lib config with timestamp
- // if InitialPositionInStream.AT_TIMESTAMP is passed
- initialPosition match {
- case ts: AtTimestamp =>
-
baseClientLibConfiguration.withTimestampAtInitialPositionInStream(ts.getTimestamp)
- case _ =>
-
baseClientLibConfiguration.withInitialPositionInStream(initialPosition.getPosition)
+ val kinesisClient = KinesisClientUtil.createKinesisAsyncClient(
+ KinesisAsyncClient.builder
+ .region(Region.of(regionName))
+ .credentialsProvider(kinesisProvider)
+ .endpointOverride(URI.create(endpointUrl)))
+ val dynamoClient = DynamoDbAsyncClient.builder
+ .region(Region.of(regionName))
+
.credentialsProvider(dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider))
+ .build
+ val cloudWatchClient = CloudWatchAsyncClient.builder
+ .region(Region.of(regionName))
+
.credentialsProvider(cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider))
+ .build
+ val recordProcessorFactory = new ShardRecordProcessorFactory {
+ override def shardRecordProcessor(): ShardRecordProcessor = {
+ new KinesisRecordProcessor(receiver, schedulerId)
}
}
- /*
- * RecordProcessorFactory creates impls of IRecordProcessor.
- * IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
- * IRecordProcessor.processRecords() method.
- * We're using our custom KinesisRecordProcessor in this case.
- */
- val recordProcessorFactory = new IRecordProcessorFactory {
- override def createProcessor: IRecordProcessor =
- new KinesisRecordProcessor(receiver, workerId)
+ val configsBuilder = new ConfigsBuilder(streamName, checkpointAppName,
kinesisClient,
+ dynamoClient, cloudWatchClient, schedulerId, recordProcessorFactory)
+ val metricsConfig = new MetricsConfig(cloudWatchClient, checkpointAppName)
+ .metricsLevel(metricsLevel)
+ .metricsEnabledDimensions(new HashSet(metricsEnabledDimensions.asJava))
+
+ val initialPositionInStreamExtended = initialPosition match {
+ case ts: AtTimestamp =>
+
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(ts.getTimestamp)
+ case _ =>
+
InitialPositionInStreamExtended.newInitialPosition(initialPosition.getPosition)
}
- worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration)
- workerThread = new Thread() {
+ val pollingConfig = new PollingConfig(streamName, kinesisClient)
+ // To maintain the same behavior as SDK v1, set the interval to 1000.
+ pollingConfig.idleTimeBetweenReadsInMillis(1000)
+
+ scheduler = new Scheduler(
+ configsBuilder.checkpointConfig(),
+ configsBuilder.coordinatorConfig(),
+ configsBuilder.leaseManagementConfig(),
+ configsBuilder.lifecycleConfig(),
+ metricsConfig,
+ configsBuilder.processorConfig(),
+ configsBuilder.retrievalConfig()
+ .retrievalSpecificConfig(pollingConfig)
+ .initialPositionInStreamExtended(initialPositionInStreamExtended)
+ )
+
+ schedulerThread = new Thread() {
override def run(): Unit = {
try {
- worker.run()
+ scheduler.run()
} catch {
case NonFatal(e) =>
- restart("Error running the KCL worker in Receiver", e)
+ restart("Error running the KCL scheduler in Receiver", e)
}
}
}
@@ -206,29 +225,29 @@ private[kinesis] class KinesisReceiver[T](
blockIdToSeqNumRanges.clear()
blockGenerator.start()
- workerThread.setName(s"Kinesis Receiver ${streamId}")
- workerThread.setDaemon(true)
- workerThread.start()
+ schedulerThread.setName(s"Kinesis Receiver ${streamId}")
+ schedulerThread.setDaemon(true)
+ schedulerThread.start()
- logInfo(log"Started receiver with workerId ${MDC(WORKER_URL, workerId)}")
+ logInfo(log"Started receiver with schedulerId ${MDC(WORKER_URL,
schedulerId)}")
}
/**
* This is called when the KinesisReceiver stops.
- * The KCL worker.shutdown() method stops the receiving/processing threads.
+ * The KCL scheduler.shutdown() method stops the receiving/processing
threads.
* The KCL will do its best to drain and checkpoint any in-flight records
upon shutdown.
*/
override def onStop(): Unit = {
- if (workerThread != null) {
- if (worker != null) {
- worker.shutdown()
- worker = null
+ if (schedulerThread != null) {
+ if (scheduler != null) {
+ scheduler.shutdown()
+ scheduler = null
}
- workerThread.join()
- workerThread = null
- logInfo(log"Stopped receiver for workerId ${MDC(WORKER_URL, workerId)}")
+ schedulerThread.join()
+ schedulerThread = null
+ logInfo(log"Stopped receiver for schedulerId ${MDC(WORKER_URL,
schedulerId)}")
}
- workerId = null
+ schedulerId = null
if (kinesisCheckpointer != null) {
kinesisCheckpointer.shutdown()
kinesisCheckpointer = null
@@ -236,11 +255,11 @@ private[kinesis] class KinesisReceiver[T](
}
/** Add records of the given shard to the current block being generated */
- private[kinesis] def addRecords(shardId: String, records:
java.util.List[Record]): Unit = {
+ private[kinesis] def addRecords(shardId: String, records:
JList[KinesisClientRecord]): Unit = {
if (records.size > 0) {
val dataIterator = records.iterator().asScala.map(messageHandler)
val metadata = SequenceNumberRange(streamName, shardId,
- records.get(0).getSequenceNumber(), records.get(records.size() -
1).getSequenceNumber(),
+ records.get(0).sequenceNumber, records.get(records.size -
1).sequenceNumber,
records.size())
blockGenerator.addMultipleDataWithCallback(dataIterator, metadata)
}
@@ -261,7 +280,7 @@ private[kinesis] class KinesisReceiver[T](
* Set the checkpointer that will be used to checkpoint sequence numbers to
DynamoDB for the
* given shardId.
*/
- def setCheckpointer(shardId: String, checkpointer:
IRecordProcessorCheckpointer): Unit = {
+ def setCheckpointer(shardId: String, checkpointer:
RecordProcessorCheckpointer): Unit = {
assert(kinesisCheckpointer != null, "Kinesis Checkpointer not
initialized!")
kinesisCheckpointer.setCheckpointer(shardId, checkpointer)
}
@@ -271,7 +290,7 @@ private[kinesis] class KinesisReceiver[T](
* checkpoint one last time for the given shard. If `checkpointer` is
`null`, then we will not
* checkpoint.
*/
- def removeCheckpointer(shardId: String, checkpointer:
IRecordProcessorCheckpointer): Unit = {
+ def removeCheckpointer(shardId: String, checkpointer:
RecordProcessorCheckpointer): Unit = {
assert(kinesisCheckpointer != null, "Kinesis Checkpointer not
initialized!")
kinesisCheckpointer.removeCheckpointer(shardId, checkpointer)
}
diff --git
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index 8304ddda96df..15964fcc75d9 100644
---
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -16,71 +16,68 @@
*/
package org.apache.spark.streaming.kinesis
-import java.util.List
-
import scala.util.Random
import scala.util.control.NonFatal
-import
com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException,
KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
-import
com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor,
IRecordProcessorCheckpointer}
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason
-import com.amazonaws.services.kinesis.model.Record
+import software.amazon.kinesis.exceptions.{InvalidStateException,
KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
+import software.amazon.kinesis.lifecycle.events.{InitializationInput,
LeaseLostInput, ProcessRecordsInput, ShardEndedInput, ShutdownRequestedInput}
+import software.amazon.kinesis.processor.ShardRecordProcessor
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.LogKeys.{KINESIS_REASON, RETRY_INTERVAL,
SHARD_ID, WORKER_URL}
+import org.apache.spark.internal.LogKeys.{RETRY_INTERVAL, SHARD_ID, WORKER_URL}
/**
* Kinesis-specific implementation of the Kinesis Client Library (KCL)
IRecordProcessor.
* This implementation operates on the Array[Byte] from the KinesisReceiver.
- * The Kinesis Worker creates an instance of this KinesisRecordProcessor for
each
+ * The Kinesis scheduler creates an instance of this KinesisRecordProcessor
for each
* shard in the Kinesis stream upon startup. This is normally done in
separate threads,
* but the KCLs within the KinesisReceivers will balance themselves out if you
create
* multiple Receivers.
*
* @param receiver Kinesis receiver
- * @param workerId for logging purposes
+ * @param schedulerId for logging purposes
*/
-private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T],
workerId: String)
- extends IRecordProcessor with Logging {
+private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T],
schedulerId: String)
+ extends ShardRecordProcessor with Logging {
// shardId populated during initialize()
@volatile
private var shardId: String = _
/**
- * The Kinesis Client Library calls this method during IRecordProcessor
initialization.
+ * The Kinesis Client Library calls this method during ShardRecordProcessor
initialization.
*
- * @param shardId assigned by the KCL to this particular RecordProcessor.
+ * @param initializationInput contains parameters to the
ShardRecordProcessor initialize method
*/
- override def initialize(shardId: String): Unit = {
- this.shardId = shardId
- logInfo(log"Initialized workerId ${MDC(WORKER_URL, workerId)} " +
+ override def initialize(initializationInput: InitializationInput): Unit = {
+ this.shardId = initializationInput.shardId
+ logInfo(log"Initialized schedulerId ${MDC(WORKER_URL, schedulerId)} " +
log"with shardId ${MDC(SHARD_ID, shardId)}")
}
/**
* This method is called by the KCL when a batch of records is pulled from
the Kinesis stream.
- * This is the record-processing bridge between the KCL's
IRecordProcessor.processRecords()
- * and Spark Streaming's Receiver.store().
+ * This is the record-processing bridge between the KCL's
ShardRecordProcessor.processRecords()
+ * and Spark Streaming's Receiver
*
- * @param batch list of records from the Kinesis stream shard
- * @param checkpointer used to update Kinesis when this batch has been
processed/stored
- * in the DStream
+ * @param processRecordsInput Provides the records to be processed as well
as information and
+ * capabilities related to them (eg checkpointing).
*/
- override def processRecords(batch: List[Record],
- checkpointer: IRecordProcessorCheckpointer): Unit = {
+ override def processRecords(processRecordsInput: ProcessRecordsInput): Unit
= {
+ val batch = processRecordsInput.records
+ val checkpointer = processRecordsInput.checkpointer
if (!receiver.isStopped()) {
try {
// Limit the number of processed records from Kinesis stream. This is
because the KCL cannot
// control the number of aggregated records to be fetched even if we
set `MaxRecords`
- // in `KinesisClientLibConfiguration`. For example, if we set 10 to
the number of max
- // records in a worker and a producer aggregates two records into one
message, the worker
+ // in `PollingConfig`. For example, if we set 10 to the number of max
records
+ // in a scheduler and a producer aggregates two records into one
message, the scheduler
// possibly 20 records every callback function called.
val maxRecords = receiver.getCurrentLimit
for (start <- 0 until batch.size by maxRecords) {
val miniBatch = batch.subList(start, math.min(start + maxRecords,
batch.size))
receiver.addRecords(shardId, miniBatch)
- logDebug(s"Stored: Worker $workerId stored ${miniBatch.size} records
" +
+ logDebug(s"Stored: Scheduler $schedulerId stored ${miniBatch.size}
records " +
s"for shardId $shardId")
}
receiver.setCheckpointer(shardId, checkpointer)
@@ -91,56 +88,68 @@ private[kinesis] class KinesisRecordProcessor[T](receiver:
KinesisReceiver[T], w
* This will potentially cause records since the last checkpoint to
be processed
* more than once.
*/
- logError(log"Exception: WorkerId ${MDC(WORKER_URL, workerId)}
encountered and " +
- log"exception while storing or checkpointing a batch for workerId
" +
- log"${MDC(WORKER_URL, workerId)} and shardId ${MDC(SHARD_ID,
shardId)}.", e)
+ logError(log"Exception: SchedulerId ${MDC(WORKER_URL, schedulerId)}
encountered and " +
+ log"exception while storing or checkpointing a batch for
schedulerId " +
+ log"${MDC(WORKER_URL, schedulerId)} and shardId ${MDC(SHARD_ID,
shardId)}.", e)
- /* Rethrow the exception to the Kinesis Worker that is managing this
RecordProcessor. */
+ /* Rethrow the exception to the Kinesis scheduler that is managing
+ this RecordProcessor. */
throw e
}
} else {
/* RecordProcessor has been stopped. */
- logInfo(log"Stopped: KinesisReceiver has stopped for workerId
${MDC(WORKER_URL, workerId)}" +
- log" and shardId ${MDC(SHARD_ID, shardId)}. No more records will be
processed.")
+ logInfo(log"Stopped: KinesisReceiver has stopped for schedulerId " +
+ log"${MDC(WORKER_URL, schedulerId)} and shardId ${MDC(SHARD_ID,
shardId)}. " +
+ log"No more records will be processed.")
}
}
/**
- * Kinesis Client Library is shutting down this Worker for 1 of 2 reasons:
- * 1) the stream is resharding by splitting or merging adjacent shards
- * (ShutdownReason.TERMINATE)
- * 2) the failed or latent Worker has stopped sending heartbeats for
whatever reason
- * (ShutdownReason.ZOMBIE)
+ * Called when the lease that tied to this Kinesis record processor has been
lost.
+ * Once the lease has been lost the record processor can no longer
checkpoint.
+ *
+ * @param leaseLostInput gives access to information related to the loss of
the lease.
+ * Currently this has no functionality.
+ */
+ override def leaseLost(leaseLostInput: LeaseLostInput): Unit = {
+ logInfo(log"The lease for shardId: ${MDC(SHARD_ID, shardId)} is lost.")
+ receiver.removeCheckpointer(shardId, null)
+ }
+
+ /**
+ * Called when the shard that this Kinesis record processor is handling has
been completed.
+ * Once a shard has been completed no further records will ever arrive on
that shard.
*
- * @param checkpointer used to perform a Kinesis checkpoint for
ShutdownReason.TERMINATE
- * @param reason for shutdown (ShutdownReason.TERMINATE or
ShutdownReason.ZOMBIE)
+ * When this is called the record processor <b>must</b> checkpoint.
Otherwise an exception
+ * will be thrown and the all child shards of this shard will not make
progress.
+ *
+ * @param shardEndedInput provides access to a checkpointer method for
completing processing of
+ * the shard.
*/
- override def shutdown(
- checkpointer: IRecordProcessorCheckpointer,
- reason: ShutdownReason): Unit = {
- logInfo(log"Shutdown: Shutting down workerId ${MDC(WORKER_URL, workerId)}
" +
- log"with reason ${MDC(KINESIS_REASON, reason)}")
- // null if not initialized before shutdown:
+ override def shardEnded(shardEndedInput: ShardEndedInput): Unit = {
+ logInfo(log"Reached shard end. Checkpointing for shardId: ${MDC(SHARD_ID,
shardId)}")
if (shardId == null) {
- logWarning(log"No shardId for workerId ${MDC(WORKER_URL, workerId)}?")
+ logWarning(log"No shardId for schedulerId ${MDC(WORKER_URL,
schedulerId)}?")
} else {
- reason match {
- /*
- * TERMINATE Use Case. Checkpoint.
- * Checkpoint to indicate that all records from the shard have been
drained and processed.
- * It's now OK to read from the new shards that resulted from a
resharding event.
- */
- case ShutdownReason.TERMINATE => receiver.removeCheckpointer(shardId,
checkpointer)
+ receiver.removeCheckpointer(shardId, shardEndedInput.checkpointer)
+ }
+ }
- /*
- * ZOMBIE Use Case or Unknown reason. NoOp.
- * No checkpoint because other workers may have taken over and already
started processing
- * the same records.
- * This may lead to records being processed more than once.
- * Return null so that we don't checkpoint
- */
- case _ => receiver.removeCheckpointer(shardId, null)
- }
+ /**
+ * Called when the Scheduler has been requested to shutdown. This is called
while the
+ * Kinesis record processor still holds the lease so checkpointing is
possible. Once this method
+ * has completed the lease for the record processor is released, and
+ * {@link # leaseLost ( LeaseLostInput )} will be called at a later time.
+ *
+ * @param shutdownRequestedInput provides access to a checkpointer allowing
a record processor to
+ * checkpoint before the shutdown is completed.
+ */
+ override def shutdownRequested(shutdownRequestedInput:
ShutdownRequestedInput): Unit = {
+ logInfo(log"Shutdown: Shutting down schedulerId: ${MDC(WORKER_URL,
schedulerId)} ")
+ if (shardId == null) {
+ logWarning(log"No shardId for schedulerId ${MDC(WORKER_URL,
schedulerId)}?")
+ } else {
+ receiver.removeCheckpointer(shardId, shutdownRequestedInput.checkpointer)
}
}
}
diff --git
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtilsPythonHelper.scala
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtilsPythonHelper.scala
index 8abaef6b834e..dc1098a33633 100644
---
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtilsPythonHelper.scala
+++
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtilsPythonHelper.scala
@@ -16,8 +16,8 @@
*/
package org.apache.spark.streaming.kinesis
-import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
-import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
+import software.amazon.kinesis.common.InitialPositionInStream
+import software.amazon.kinesis.metrics.MetricsLevel
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Duration
diff --git
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/SparkAWSCredentials.scala
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/SparkAWSCredentials.scala
index e821adca20d2..e8ccdcd6a99b 100644
---
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/SparkAWSCredentials.scala
+++
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/SparkAWSCredentials.scala
@@ -17,49 +17,52 @@
package org.apache.spark.streaming.kinesis
-import com.amazonaws.auth._
+import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials,
AwsCredentialsProvider, DefaultCredentialsProvider, StaticCredentialsProvider}
+import software.amazon.awssdk.services.sts.StsClient
+import
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest
import org.apache.spark.internal.Logging
/**
* Serializable interface providing a method executors can call to obtain an
- * AWSCredentialsProvider instance for authenticating to AWS services.
+ * AwsCredentialsProvider instance for authenticating to AWS services.
*/
private[kinesis] sealed trait SparkAWSCredentials extends Serializable {
/**
- * Return an AWSCredentialProvider instance that can be used by the Kinesis
Client
+ * Return an AwsCredentialProvider instance that can be used by the Kinesis
Client
* Library to authenticate to AWS services (Kinesis, CloudWatch and
DynamoDB).
*/
- def provider: AWSCredentialsProvider
+ def provider: AwsCredentialsProvider
}
-/** Returns DefaultAWSCredentialsProviderChain for authentication. */
+/** Returns DefaultCredentialsProvider for authentication. */
private[kinesis] final case object DefaultCredentials extends
SparkAWSCredentials {
- def provider: AWSCredentialsProvider = new DefaultAWSCredentialsProviderChain
+ def provider: AwsCredentialsProvider = DefaultCredentialsProvider.create()
}
/**
- * Returns AWSStaticCredentialsProvider constructed using basic AWS keypair.
Falls back to using
- * DefaultCredentialsProviderChain if unable to construct a
AWSCredentialsProviderChain
+ * Returns StaticCredentialsProvider constructed using basic AWS keypair.
Falls back to using
+ * DefaultCredentialsProvider if unable to construct a
StaticCredentialsProvider
* instance with the provided arguments (e.g. if they are null).
*/
private[kinesis] final case class BasicCredentials(
awsAccessKeyId: String,
awsSecretKey: String) extends SparkAWSCredentials with Logging {
- def provider: AWSCredentialsProvider = try {
- new AWSStaticCredentialsProvider(new BasicAWSCredentials(awsAccessKeyId,
awsSecretKey))
+ def provider: AwsCredentialsProvider = try {
+
StaticCredentialsProvider.create(AwsBasicCredentials.create(awsAccessKeyId,
awsSecretKey))
} catch {
case e: IllegalArgumentException =>
- logWarning("Unable to construct AWSStaticCredentialsProvider with
provided keypair; " +
- "falling back to DefaultCredentialsProviderChain.", e)
- new DefaultAWSCredentialsProviderChain
+ logWarning("Unable to construct StaticCredentialsProvider with provided
keypair; " +
+ "falling back to DefaultCredentialsProvider.", e)
+ DefaultCredentialsProvider.create()
}
}
/**
- * Returns an STSAssumeRoleSessionCredentialsProvider instance which assumes
an IAM
+ * Returns an StsAssumeRoleCredentialsProvider instance which assumes an IAM
* role in order to authenticate against resources in an external account.
*/
private[kinesis] final case class STSCredentials(
@@ -69,16 +72,24 @@ private[kinesis] final case class STSCredentials(
longLivedCreds: SparkAWSCredentials = DefaultCredentials)
extends SparkAWSCredentials {
- def provider: AWSCredentialsProvider = {
- val builder = new
STSAssumeRoleSessionCredentialsProvider.Builder(stsRoleArn, stsSessionName)
- .withLongLivedCredentialsProvider(longLivedCreds.provider)
+ def provider: AwsCredentialsProvider = {
+ val stsClient = StsClient.builder()
+ .credentialsProvider(longLivedCreds.provider)
+ .build()
+
+ val assumeRoleRequestBuilder = AssumeRoleRequest.builder()
+ .roleArn(stsRoleArn)
+ .roleSessionName(stsSessionName)
stsExternalId match {
case Some(stsExternalId) =>
- builder.withExternalId(stsExternalId)
- .build()
+ assumeRoleRequestBuilder.externalId(stsExternalId)
case None =>
- builder.build()
}
+
+ StsAssumeRoleCredentialsProvider.builder()
+ .stsClient(stsClient)
+ .refreshRequest(assumeRoleRequestBuilder.build())
+ .build()
}
}
@@ -98,8 +109,8 @@ object SparkAWSCredentials {
*
* @note The given AWS keypair will be saved in DStream checkpoints if
checkpointing is
* enabled. Make sure that your checkpoint directory is secure. Prefer
using the
- * default provider chain instead if possible
- *
(http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default).
+ * default credentials provider instead if possible
+ *
(https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials-chain.html).
*
* @param accessKeyId AWS access key ID
* @param secretKey AWS secret key
diff --git
a/connector/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java
b/connector/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java
index e64e08a38a4a..b10b7e04d2b7 100644
---
a/connector/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java
+++
b/connector/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java
@@ -17,7 +17,6 @@
package org.apache.spark.streaming.kinesis;
-import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -26,6 +25,7 @@ import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.Seconds;
+import software.amazon.kinesis.common.InitialPositionInStream;
public class JavaKinesisInputDStreamBuilderSuite extends
LocalJavaStreamingContext {
/**
diff --git
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
index 87592b6877b3..6dd589fe4d21 100644
---
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
+++
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
@@ -22,13 +22,13 @@ import java.util.concurrent.TimeoutException
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
-import
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
import org.scalatest.concurrent.Eventually
import org.scalatestplus.mockito.MockitoSugar
+import software.amazon.kinesis.processor.RecordProcessorCheckpointer
import org.apache.spark.streaming.{Duration, TestSuiteBase}
import org.apache.spark.util.ManualClock
@@ -39,7 +39,7 @@ class KinesisCheckpointerSuite extends TestSuiteBase
with PrivateMethodTester
with Eventually {
- private val workerId = "dummyWorkerId"
+ private val schedulerId = "dummySchedulerId"
private val shardId = "dummyShardId"
private val seqNum = "123"
private val otherSeqNum = "245"
@@ -48,7 +48,7 @@ class KinesisCheckpointerSuite extends TestSuiteBase
private val someOtherSeqNum = Some(otherSeqNum)
private var receiverMock: KinesisReceiver[Array[Byte]] = _
- private var checkpointerMock: IRecordProcessorCheckpointer = _
+ private var checkpointerMock: RecordProcessorCheckpointer = _
private var kinesisCheckpointer: KinesisCheckpointer = _
private var clock: ManualClock = _
@@ -56,9 +56,13 @@ class KinesisCheckpointerSuite extends TestSuiteBase
override def beforeEach(): Unit = {
receiverMock = mock[KinesisReceiver[Array[Byte]]]
- checkpointerMock = mock[IRecordProcessorCheckpointer]
+ checkpointerMock = mock[RecordProcessorCheckpointer]
clock = new ManualClock()
- kinesisCheckpointer = new KinesisCheckpointer(receiverMock,
checkpointInterval, workerId, clock)
+ kinesisCheckpointer = new KinesisCheckpointer(
+ receiverMock,
+ checkpointInterval,
+ schedulerId,
+ clock)
}
test("checkpoint is not called twice for the same sequence number") {
diff --git
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
index 9f2e34e2e2f9..2d82282ecff0 100644
---
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
+++
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
@@ -21,10 +21,10 @@ import java.util.Calendar
import scala.jdk.CollectionConverters._
-import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream,
KinesisClientLibConfiguration}
-import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
import org.scalatest.BeforeAndAfterEach
import org.scalatestplus.mockito.MockitoSugar
+import software.amazon.kinesis.common.InitialPositionInStream
+import software.amazon.kinesis.metrics.{MetricsConfig, MetricsLevel}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext,
TestSuiteBase}
@@ -101,7 +101,7 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase
with BeforeAndAfterE
val customCloudWatchCreds = mock[SparkAWSCredentials]
val customMetricsLevel = MetricsLevel.NONE
val customMetricsEnabledDimensions =
-
KinesisClientLibConfiguration.METRICS_ALWAYS_ENABLED_DIMENSIONS.asScala.toSet
+ MetricsConfig.METRICS_ALWAYS_ENABLED_DIMENSIONS.asScala.toSet
val dstream = builder
.endpointUrl(customEndpointUrl)
diff --git
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index d008de3b3f1c..c83745f3f785 100644
---
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -20,16 +20,16 @@ import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.Arrays
-import com.amazonaws.services.kinesis.clientlibrary.exceptions._
-import
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason
-import com.amazonaws.services.kinesis.model.Record
import org.mockito.ArgumentMatchers.{anyList, anyString, eq => meq}
import org.mockito.Mockito.{never, times, verify, when}
import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers._
import org.scalatestplus.mockito.MockitoSugar
+import software.amazon.kinesis.exceptions.{InvalidStateException,
KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
+import software.amazon.kinesis.lifecycle.events.{InitializationInput,
LeaseLostInput, ProcessRecordsInput, ShardEndedInput, ShutdownRequestedInput}
+import software.amazon.kinesis.processor.RecordProcessorCheckpointer
+import software.amazon.kinesis.retrieval.KinesisClientRecord
import org.apache.spark.streaming.{Duration, TestSuiteBase}
@@ -42,33 +42,43 @@ class KinesisReceiverSuite extends TestSuiteBase with
Matchers with BeforeAndAft
val app = "TestKinesisReceiver"
val stream = "mySparkStream"
val endpoint = "endpoint-url"
- val workerId = "dummyWorkerId"
+ val schedulerId = "dummySchedulerId"
val shardId = "dummyShardId"
val seqNum = "dummySeqNum"
val checkpointInterval = Duration(10)
val someSeqNum = Some(seqNum)
- val record1 = new Record()
- record1.setData(ByteBuffer.wrap("Spark In
Action".getBytes(StandardCharsets.UTF_8)))
- val record2 = new Record()
- record2.setData(ByteBuffer.wrap("Learning
Spark".getBytes(StandardCharsets.UTF_8)))
+ val dummyInitializationInput = InitializationInput.builder()
+ .shardId(shardId)
+ .build()
+
+ val record1 = KinesisClientRecord.builder()
+ .data(ByteBuffer.wrap("Spark In Action".getBytes(StandardCharsets.UTF_8)))
+ .build()
+ val record2 = KinesisClientRecord.builder()
+ .data(ByteBuffer.wrap("Learning Spark".getBytes(StandardCharsets.UTF_8)))
+ .build()
val batch = Arrays.asList(record1, record2)
var receiverMock: KinesisReceiver[Array[Byte]] = _
- var checkpointerMock: IRecordProcessorCheckpointer = _
+ var checkpointerMock: RecordProcessorCheckpointer = _
override def beforeFunction(): Unit = {
receiverMock = mock[KinesisReceiver[Array[Byte]]]
- checkpointerMock = mock[IRecordProcessorCheckpointer]
+ checkpointerMock = mock[RecordProcessorCheckpointer]
}
test("process records including store and set checkpointer") {
when(receiverMock.isStopped()).thenReturn(false)
when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
- recordProcessor.initialize(shardId)
- recordProcessor.processRecords(batch, checkpointerMock)
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, schedulerId)
+ recordProcessor.initialize(dummyInitializationInput)
+ val processRecordsInput = ProcessRecordsInput.builder()
+ .records(batch)
+ .checkpointer(checkpointerMock)
+ .build()
+ recordProcessor.processRecords(processRecordsInput)
verify(receiverMock, times(1)).isStopped()
verify(receiverMock, times(1)).addRecords(shardId, batch)
@@ -79,9 +89,13 @@ class KinesisReceiverSuite extends TestSuiteBase with
Matchers with BeforeAndAft
when(receiverMock.isStopped()).thenReturn(false)
when(receiverMock.getCurrentLimit).thenReturn(1)
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
- recordProcessor.initialize(shardId)
- recordProcessor.processRecords(batch, checkpointerMock)
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, schedulerId)
+ recordProcessor.initialize(dummyInitializationInput)
+ val processRecordsInput = ProcessRecordsInput.builder()
+ .records(batch)
+ .checkpointer(checkpointerMock)
+ .build()
+ recordProcessor.processRecords(processRecordsInput)
verify(receiverMock, times(1)).isStopped()
verify(receiverMock, times(1)).addRecords(shardId, batch.subList(0, 1))
@@ -93,8 +107,12 @@ class KinesisReceiverSuite extends TestSuiteBase with
Matchers with BeforeAndAft
when(receiverMock.isStopped()).thenReturn(true)
when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
- recordProcessor.processRecords(batch, checkpointerMock)
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, schedulerId)
+ val processRecordsInput = ProcessRecordsInput.builder()
+ .records(batch)
+ .checkpointer(checkpointerMock)
+ .build()
+ recordProcessor.processRecords(processRecordsInput)
verify(receiverMock, times(1)).isStopped()
verify(receiverMock, never).addRecords(anyString, anyList())
@@ -109,9 +127,13 @@ class KinesisReceiverSuite extends TestSuiteBase with
Matchers with BeforeAndAft
).thenThrow(new RuntimeException())
intercept[RuntimeException] {
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
- recordProcessor.initialize(shardId)
- recordProcessor.processRecords(batch, checkpointerMock)
+ val recordProcessor = new KinesisRecordProcessor(receiverMock,
schedulerId)
+ recordProcessor.initialize(dummyInitializationInput)
+ val processRecordsInput = ProcessRecordsInput.builder()
+ .records(batch)
+ .checkpointer(checkpointerMock)
+ .build()
+ recordProcessor.processRecords(processRecordsInput)
}
verify(receiverMock, times(1)).isStopped()
@@ -119,27 +141,42 @@ class KinesisReceiverSuite extends TestSuiteBase with
Matchers with BeforeAndAft
verify(receiverMock, never).setCheckpointer(anyString,
meq(checkpointerMock))
}
- test("shutdown should checkpoint if the reason is TERMINATE") {
+ test("SPARK-45720: shutdownRequest should checkpoint") {
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
- recordProcessor.initialize(shardId)
- recordProcessor.shutdown(checkpointerMock, ShutdownReason.TERMINATE)
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, schedulerId)
+ val shutdownRequestedInput = ShutdownRequestedInput.builder()
+ .checkpointer(checkpointerMock)
+ .build()
+ recordProcessor.initialize(dummyInitializationInput)
+ recordProcessor.shutdownRequested(shutdownRequestedInput)
verify(receiverMock, times(1)).removeCheckpointer(meq(shardId),
meq(checkpointerMock))
}
+ test("SPARK-45720: shardEnded should checkpoint") {
+
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, schedulerId)
+ val shardEndedInput = ShardEndedInput.builder()
+ .checkpointer(checkpointerMock)
+ .build()
+ recordProcessor.initialize(dummyInitializationInput)
+ recordProcessor.shardEnded(shardEndedInput)
+
+ verify(receiverMock, times(1)).removeCheckpointer(meq(shardId),
meq(checkpointerMock))
+ }
- test("shutdown should not checkpoint if the reason is something other than
TERMINATE") {
+ test("SPARK-45720: leaseLost should not checkpoint") {
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
- recordProcessor.initialize(shardId)
- recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
- recordProcessor.shutdown(checkpointerMock, null)
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, schedulerId)
+ val leaseLostInput = LeaseLostInput.builder().build()
+ recordProcessor.initialize(dummyInitializationInput)
+ recordProcessor.leaseLost(leaseLostInput)
- verify(receiverMock, times(2)).removeCheckpointer(meq(shardId),
- meq[IRecordProcessorCheckpointer](null))
+ verify(receiverMock, times(1)).removeCheckpointer(meq(shardId),
+ meq[RecordProcessorCheckpointer](null))
}
test("retry success on first attempt") {
diff --git
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index 43c4118d8f59..f3b1015df32c 100644
---
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -21,10 +21,10 @@ import scala.collection.mutable
import scala.concurrent.duration._
import scala.util.Random
-import com.amazonaws.services.kinesis.model.Record
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import org.scalatest.concurrent.Eventually
import org.scalatest.matchers.should.Matchers._
+import software.amazon.kinesis.retrieval.KinesisClientRecord
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.network.util.JavaUtils
@@ -195,7 +195,7 @@ abstract class KinesisStreamTests(aggregateTestData:
Boolean) extends KinesisFun
}
testIfEnabled("custom message handling") {
- def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
+ def addFive(r: KinesisClientRecord): Int =
JavaUtils.bytesToString(r.data).toInt + 5
val stream = KinesisInputDStream.builder.streamingContext(ssc)
.checkpointAppName(appName)
@@ -305,7 +305,7 @@ abstract class KinesisStreamTests(aggregateTestData:
Boolean) extends KinesisFun
val testData2 = 11 to 20
val testData3 = 21 to 30
- eventually(timeout(1.minute), interval(10.seconds)) {
+ eventually(timeout(2.minute), interval(10.seconds)) {
localTestUtils.pushData(testData1, aggregateTestData)
collected.synchronized {
assert(collected === testData1.toSet, "\nData received does not
match data sent")
@@ -313,9 +313,9 @@ abstract class KinesisStreamTests(aggregateTestData:
Boolean) extends KinesisFun
}
val shardToSplit = localTestUtils.getShards().head
- localTestUtils.splitShard(shardToSplit.getShardId)
+ localTestUtils.splitShard(shardToSplit.shardId)
val (splitOpenShards, splitCloseShards) =
localTestUtils.getShards().partition { shard =>
- shard.getSequenceNumberRange.getEndingSequenceNumber == null
+ shard.sequenceNumberRange.endingSequenceNumber == null
}
// We should have one closed shard and two open shards
@@ -331,9 +331,9 @@ abstract class KinesisStreamTests(aggregateTestData:
Boolean) extends KinesisFun
}
val Seq(shardToMerge, adjShard) = splitOpenShards
- localTestUtils.mergeShard(shardToMerge.getShardId, adjShard.getShardId)
+ localTestUtils.mergeShard(shardToMerge.shardId, adjShard.shardId)
val (mergedOpenShards, mergedCloseShards) =
localTestUtils.getShards().partition { shard =>
- shard.getSequenceNumberRange.getEndingSequenceNumber == null
+ shard.sequenceNumberRange.endingSequenceNumber == null
}
// We should have three closed shards and one open shard
diff --git
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 7674cef105e7..09d92b718447 100644
---
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.kinesis
-import java.nio.ByteBuffer
+import java.net.URI
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
@@ -26,13 +26,15 @@ import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Random, Success, Try}
-import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
-import com.amazonaws.regions.RegionUtils
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
-import com.amazonaws.services.dynamodbv2.document.DynamoDB
-import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClient}
-import com.amazonaws.services.kinesis.model._
-import com.amazonaws.waiters.WaiterParameters
+import software.amazon.awssdk.auth.credentials.{AwsCredentials,
DefaultCredentialsProvider}
+import software.amazon.awssdk.core.SdkBytes
+import software.amazon.awssdk.http.apache.ApacheHttpClient
+import software.amazon.awssdk.regions.Region
+import software.amazon.awssdk.regions.servicemetadata.KinesisServiceMetadata
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient
+import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest
+import software.amazon.awssdk.services.kinesis.KinesisClient
+import software.amazon.awssdk.services.kinesis.model.{CreateStreamRequest,
DeleteStreamRequest, DescribeStreamRequest, MergeShardsRequest,
PutRecordRequest, ResourceNotFoundException, Shard, SplitShardRequest,
StreamDescription}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.{STREAM_NAME, TABLE_NAME}
@@ -47,7 +49,6 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int
= 2) extends Loggi
val endpointUrl = KinesisTestUtils.endpointUrl
val regionName = KinesisTestUtils.getRegionNameByEndpoint(endpointUrl)
- private val createStreamTimeoutSeconds = 300
private val describeStreamPollTimeSeconds = 1
@volatile
@@ -56,18 +57,23 @@ private[kinesis] class KinesisTestUtils(streamShardCount:
Int = 2) extends Loggi
@volatile
private var _streamName: String = _
- protected lazy val kinesisClient = {
- val client = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials())
- client.setEndpoint(endpointUrl)
- client
+ protected lazy val kinesisClient: KinesisClient = {
+ KinesisClient.builder()
+ .credentialsProvider(DefaultCredentialsProvider.create())
+ .region(Region.of(regionName))
+ .httpClientBuilder(ApacheHttpClient.builder())
+ .endpointOverride(URI.create(endpointUrl))
+ .build()
}
- private lazy val streamExistsWaiter = kinesisClient.waiters().streamExists()
+ private lazy val streamExistsWaiter = kinesisClient.waiter()
private lazy val dynamoDB = {
- val dynamoDBClient = new AmazonDynamoDBClient(new
DefaultAWSCredentialsProviderChain())
- dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
- new DynamoDB(dynamoDBClient)
+ DynamoDbClient.builder()
+ .credentialsProvider(DefaultCredentialsProvider.create())
+ .region(Region.of(regionName))
+ .httpClientBuilder(ApacheHttpClient.builder())
+ .build()
}
protected def getProducer(aggregate: Boolean): KinesisDataGenerator = {
@@ -89,9 +95,10 @@ private[kinesis] class KinesisTestUtils(streamShardCount:
Int = 2) extends Loggi
// Create a stream. The number of shards determines the provisioned
throughput.
logInfo(s"Creating stream ${_streamName}")
- val createStreamRequest = new CreateStreamRequest()
- createStreamRequest.setStreamName(_streamName)
- createStreamRequest.setShardCount(streamShardCount)
+ val createStreamRequest = CreateStreamRequest.builder()
+ .streamName(_streamName)
+ .shardCount(streamShardCount)
+ .build()
kinesisClient.createStream(createStreamRequest)
// The stream is now being created. Wait for it to become active.
@@ -101,25 +108,30 @@ private[kinesis] class KinesisTestUtils(streamShardCount:
Int = 2) extends Loggi
}
def getShards(): Seq[Shard] = {
-
kinesisClient.describeStream(_streamName).getStreamDescription.getShards.asScala.toSeq
+ val describeStreamRequest = DescribeStreamRequest.builder()
+ .streamName(_streamName)
+ .build()
+
kinesisClient.describeStream(describeStreamRequest).streamDescription.shards.asScala.toSeq
}
def splitShard(shardId: String): Unit = {
- val splitShardRequest = new SplitShardRequest()
- splitShardRequest.withStreamName(_streamName)
- splitShardRequest.withShardToSplit(shardId)
- // Set a half of the max hash value
-
splitShardRequest.withNewStartingHashKey("170141183460469231731687303715884105728")
+ val splitShardRequest = SplitShardRequest.builder()
+ .streamName(_streamName)
+ .shardToSplit(shardId)
+ // Set a half of the max hash value
+ .newStartingHashKey("170141183460469231731687303715884105728")
+ .build()
kinesisClient.splitShard(splitShardRequest)
// Wait for the shards to become active
waitForStreamToBeActive(_streamName)
}
def mergeShard(shardToMerge: String, adjacentShardToMerge: String): Unit = {
- val mergeShardRequest = new MergeShardsRequest
- mergeShardRequest.withStreamName(_streamName)
- mergeShardRequest.withShardToMerge(shardToMerge)
- mergeShardRequest.withAdjacentShardToMerge(adjacentShardToMerge)
+ val mergeShardRequest = MergeShardsRequest.builder()
+ .streamName(_streamName)
+ .shardToMerge(shardToMerge)
+ .adjacentShardToMerge(adjacentShardToMerge)
+ .build()
kinesisClient.mergeShards(mergeShardRequest)
// Wait for the shards to become active
waitForStreamToBeActive(_streamName)
@@ -145,9 +157,12 @@ private[kinesis] class KinesisTestUtils(streamShardCount:
Int = 2) extends Loggi
}
def deleteStream(): Unit = {
+ val deleteStreamRequest = DeleteStreamRequest.builder()
+ .streamName(streamName)
+ .build()
try {
if (streamCreated) {
- kinesisClient.deleteStream(streamName)
+ kinesisClient.deleteStream(deleteStreamRequest)
}
} catch {
case e: Exception =>
@@ -156,10 +171,11 @@ private[kinesis] class KinesisTestUtils(streamShardCount:
Int = 2) extends Loggi
}
def deleteDynamoDBTable(tableName: String): Unit = {
+ val deleteTableRequest = DeleteTableRequest.builder()
+ .tableName(tableName)
+ .build()
try {
- val table = dynamoDB.getTable(tableName)
- table.delete()
- table.waitForDelete()
+ dynamoDB.deleteTable(deleteTableRequest)
} catch {
case e: Exception =>
logWarning(log"Could not delete DynamoDB table ${MDC(TABLE_NAME,
tableName)}", e)
@@ -168,11 +184,14 @@ private[kinesis] class KinesisTestUtils(streamShardCount:
Int = 2) extends Loggi
private def describeStream(streamNameToDescribe: String):
Option[StreamDescription] = {
try {
- val describeStreamRequest = new
DescribeStreamRequest().withStreamName(streamNameToDescribe)
- val desc =
kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
+ val describeStreamRequest = DescribeStreamRequest.builder()
+ .streamName(streamNameToDescribe)
+ .build()
+ val desc =
kinesisClient.describeStream(describeStreamRequest).streamDescription
Some(desc)
} catch {
case rnfe: ResourceNotFoundException =>
+ logWarning(s"Could not describe stream $streamNameToDescribe", rnfe)
None
}
}
@@ -187,9 +206,10 @@ private[kinesis] class KinesisTestUtils(streamShardCount:
Int = 2) extends Loggi
}
private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
- val describeStreamRequest = new DescribeStreamRequest()
- .withStreamName(streamNameToWaitFor)
- streamExistsWaiter.run(new WaiterParameters(describeStreamRequest))
+ val describeStreamRequest = DescribeStreamRequest.builder()
+ .streamName(streamNameToWaitFor)
+ .build()
+ streamExistsWaiter.waitUntilStreamExists(describeStreamRequest)
}
}
@@ -201,10 +221,11 @@ private[kinesis] object KinesisTestUtils {
def getRegionNameByEndpoint(endpoint: String): String = {
val uri = new java.net.URI(endpoint)
- RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX)
+ val kinesisServiceMetadata = new KinesisServiceMetadata()
+ kinesisServiceMetadata.regions
.asScala
- .find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
- .map(_.getName)
+ .find(r =>
kinesisServiceMetadata.endpointFor(r).toString.equals(uri.getHost))
+ .map(_.id)
.getOrElse(
throw new IllegalArgumentException(s"Could not resolve region for
endpoint: $endpoint"))
}
@@ -239,20 +260,20 @@ private[kinesis] object KinesisTestUtils {
}
def isAWSCredentialsPresent: Boolean = {
- Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
+ Try { DefaultCredentialsProvider.create().resolveCredentials() }.isSuccess
}
- def getAWSCredentials(): AWSCredentials = {
+ def getAWSCredentials(): AwsCredentials = {
assert(shouldRunTests,
"Kinesis test not enabled, should not attempt to get AWS credentials")
- Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match {
+ Try { DefaultCredentialsProvider.create().resolveCredentials() } match {
case Success(cred) => cred
case Failure(e) =>
throw new Exception(
s"""
|Kinesis tests enabled using environment variable
$envVarNameForEnablingTests
|but could not find AWS credentials. Please follow instructions
in AWS documentation
- |to set the credentials in your system such that the
DefaultAWSCredentialsProviderChain
+ |to set the credentials in your system such that the
DefaultCredentialsProvider
|can find the credentials.
""".stripMargin)
}
@@ -266,19 +287,21 @@ private[kinesis] trait KinesisDataGenerator {
}
private[kinesis] class SimpleDataGenerator(
- client: AmazonKinesisClient) extends KinesisDataGenerator {
+ client: KinesisClient) extends KinesisDataGenerator {
override def sendData(streamName: String, data: Seq[Int]): Map[String,
Seq[(Int, String)]] = {
val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int,
String)]]()
data.foreach { num =>
val str = num.toString
- val data = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8))
- val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
- .withData(data)
- .withPartitionKey(str)
-
- val putRecordResult = client.putRecord(putRecordRequest)
- val shardId = putRecordResult.getShardId
- val seqNumber = putRecordResult.getSequenceNumber()
+ val data = SdkBytes.fromByteArray(str.getBytes(StandardCharsets.UTF_8))
+ val putRecordRequest = PutRecordRequest.builder()
+ .streamName(streamName)
+ .data(data)
+ .partitionKey(str)
+ .build()
+
+ val putRecordResponse = client.putRecord(putRecordRequest)
+ val shardId = putRecordResponse.shardId
+ val seqNumber = putRecordResponse.sequenceNumber
val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
new ArrayBuffer[(Int, String)]())
sentSeqNumbers += ((num, seqNumber))
diff --git a/docs/streaming-kinesis-integration.md
b/docs/streaming-kinesis-integration.md
index 0396d3cc64d1..34587d9b17ca 100644
--- a/docs/streaming-kinesis-integration.md
+++ b/docs/streaming-kinesis-integration.md
@@ -115,18 +115,16 @@ A Kinesis stream can be set up at one of the valid
Kinesis endpoints with 1 or m
You may also provide the following settings. This is currently only
supported in Scala and Java.
- - A "message handler function" that takes a Kinesis `Record` and returns a
generic object `T`, in case you would like to use other data included in a
`Record` such as partition key.
+ - A "message handler function" that takes a Kinesis `KinesisClientRecord`
and returns a generic object `T`, in case you would like to use other data
included in a `Record` such as partition key.
<div class="codetabs">
<div data-lang="scala" markdown="1">
```scala
- import collection.JavaConverters._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
- import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
- import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
+ import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}
val kinesisStream = KinesisInputDStream.builder
.streamingContext(streamingContext)
@@ -138,21 +136,23 @@ A Kinesis stream can be set up at one of the valid
Kinesis endpoints with 1 or m
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.metricsLevel(MetricsLevel.DETAILED)
-
.metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
+ .metricsEnabledDimensions(
+ Set(MetricsUtil.OPERATION_DIMENSION_NAME,
MetricsUtil.SHARD_ID_DIMENSION_NAME))
.buildWithMessageHandler([message handler])
```
</div>
<div data-lang="java" markdown="1">
```java
+ import java.util.Set;
+ import scala.jdk.javaapi.CollectionConverters;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
- import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
- import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
- import scala.collection.JavaConverters;
+ import software.amazon.kinesis.metrics.MetricsLevel;
+ import software.amazon.kinesis.metrics.MetricsUtil;
KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
.streamingContext(streamingContext)
@@ -165,11 +165,10 @@ A Kinesis stream can be set up at one of the valid
Kinesis endpoints with 1 or m
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.metricsLevel(MetricsLevel.DETAILED)
.metricsEnabledDimensions(
- JavaConverters.asScalaSetConverter(
-
KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS
- )
- .asScala().toSet()
- )
+ CollectionConverters.asScala(
+ Set.of(
+ MetricsUtil.OPERATION_DIMENSION_NAME,
+ MetricsUtil.SHARD_ID_DIMENSION_NAME)).toSet())
.buildWithMessageHandler([message handler]);
```
@@ -194,7 +193,7 @@ A Kinesis stream can be set up at one of the valid Kinesis
endpoints with 1 or m
- `[initial position]`: Can be either
`KinesisInitialPositions.TrimHorizon` or `KinesisInitialPositions.Latest` or
`KinesisInitialPositions.AtTimestamp` (see [`Kinesis
Checkpointing`](#kinesis-checkpointing) section and [`Amazon Kinesis API
documentation`](http://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-sdk.html)
for more details).
- - `[message handler]`: A function that takes a Kinesis `Record` and
outputs generic `T`.
+ - `[message handler]`: A function that takes a Kinesis
`KinesisClientRecord` and outputs generic `T`.
In other versions of the API, you can also specify the AWS access key and
secret key directly.
diff --git a/pom.xml b/pom.xml
index ae38b87c3f95..143808b48841 100644
--- a/pom.xml
+++ b/pom.xml
@@ -159,12 +159,11 @@
<codahale.metrics.version>4.2.37</codahale.metrics.version>
<!-- Should be consistent with SparkBuild.scala and docs -->
<avro.version>1.12.1</avro.version>
- <aws.kinesis.client.version>1.15.3</aws.kinesis.client.version>
+ <aws.kinesis.client.version>2.7.2</aws.kinesis.client.version>
<!-- Should be consistent with Kinesis client dependency -->
- <aws.java.sdk.version>1.12.681</aws.java.sdk.version>
<aws.java.sdk.v2.version>2.35.4</aws.java.sdk.v2.version>
<!-- the producer is used in tests -->
- <aws.kinesis.producer.version>1.0.5</aws.kinesis.producer.version>
+ <aws.kinesis.producer.version>1.0.6</aws.kinesis.producer.version>
<!-- Do not use 3.0.0:
https://github.com/GoogleCloudDataproc/hadoop-connectors/issues/1114 -->
<gcs-connector.version>hadoop3-2.2.29</gcs-connector.version>
<analyticsaccelerator-s3.version>1.3.0</analyticsaccelerator-s3.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]