This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c373cb86e5 Add support for retry in kinesis producer (#8609)
c373cb86e5 is described below

commit c373cb86e590e5f4a3e845c8b23d2d1c4f2b738c
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Fri Apr 29 04:54:38 2022 +0530

    Add support for retry in kinesis producer (#8609)
    
    Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local>
---
 .../stream/kinesis/server/KinesisDataProducer.java | 105 ++++++++++++++++++---
 1 file changed, 90 insertions(+), 15 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
index 150ac78151..4b67797e80 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
@@ -20,9 +20,17 @@ package org.apache.pinot.plugin.stream.kinesis.server;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
 import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.FixedDelayRetryPolicy;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
@@ -34,16 +42,27 @@ import 
software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
 import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
 import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
+
 
 public class KinesisDataProducer implements StreamDataProducer {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KinesisDataProducer.class);
+
   public static final String ENDPOINT = "endpoint";
   public static final String REGION = "region";
   public static final String ACCESS = "access";
   public static final String SECRET = "secret";
-  public static final String DEFAULT_PORT = "4566";
+  public static final String NUM_RETRIES = "num_retries";
+  public static final String RETRY_DELAY_MILLIS = "retry_delay_millis";
+
   public static final String DEFAULT_ENDPOINT = "http://localhost:4566";;
+  public static final String DEFAULT_RETRY_DELAY_MILLIS = "10000L";
+  public static final String DEFAULT_NUM_RETRIES = "0";
 
   private KinesisClient _kinesisClient;
+  private RetryPolicy _retryPolicy;
 
   @Override
   public void init(Properties props) {
@@ -54,10 +73,9 @@ public class KinesisDataProducer implements 
StreamDataProducer {
             .credentialsProvider(getLocalAWSCredentials(props))
             .httpClientBuilder(new 
ApacheSdkHttpService().createHttpClientBuilder());
       } else {
-        kinesisClientBuilder =
-            
KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
-                .credentialsProvider(DefaultCredentialsProvider.create())
-                .httpClientBuilder(new 
ApacheSdkHttpService().createHttpClientBuilder());
+        kinesisClientBuilder = 
KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
+            .credentialsProvider(DefaultCredentialsProvider.create())
+            .httpClientBuilder(new 
ApacheSdkHttpService().createHttpClientBuilder());
       }
 
       if (props.containsKey(ENDPOINT)) {
@@ -65,12 +83,16 @@ public class KinesisDataProducer implements 
StreamDataProducer {
         try {
           kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new 
URI(kinesisEndpoint));
         } catch (URISyntaxException e) {
-          throw new IllegalArgumentException("URI syntax is not correctly 
specified for endpoint: "
-              + kinesisEndpoint, e);
+          throw new IllegalArgumentException("URI syntax is not correctly 
specified for endpoint: " + kinesisEndpoint,
+              e);
         }
       }
 
       _kinesisClient = kinesisClientBuilder.build();
+
+      int numRetries = Integer.parseInt(props.getProperty(NUM_RETRIES, 
DEFAULT_NUM_RETRIES));
+      long retryDelayMs = Long.parseLong(props.getProperty(RETRY_DELAY_MILLIS, 
DEFAULT_RETRY_DELAY_MILLIS));
+      _retryPolicy = new FixedDelayRetryPolicy(numRetries + 1, retryDelayMs);
     } catch (Exception e) {
       _kinesisClient = null;
     }
@@ -78,18 +100,35 @@ public class KinesisDataProducer implements 
StreamDataProducer {
 
   @Override
   public void produce(String topic, byte[] payload) {
-    PutRecordRequest putRecordRequest =
-        
PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload))
-            .partitionKey(UUID.randomUUID().toString()).build();
-    PutRecordResponse putRecordResponse = 
_kinesisClient.putRecord(putRecordRequest);
+    try {
+      _retryPolicy.attempt(() -> putRecord(topic, null, payload));
+    } catch (AttemptsExceededException ae) {
+      LOGGER.error("Retries exhausted while pushing record in stream {}", 
topic);
+    } catch (RetriableOperationException roe) {
+      LOGGER.error("Error occurred while pushing records in stream {}", topic, 
roe);
+    }
   }
 
   @Override
   public void produce(String topic, byte[] key, byte[] payload) {
-    PutRecordRequest putRecordRequest =
-        
PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload)).partitionKey(new
 String(key))
-            .build();
-    PutRecordResponse putRecordResponse = 
_kinesisClient.putRecord(putRecordRequest);
+    try {
+      _retryPolicy.attempt(() -> putRecord(topic, key, payload));
+    } catch (AttemptsExceededException ae) {
+      LOGGER.error("Retries exhausted while pushing record in stream {}", 
topic);
+    } catch (RetriableOperationException roe) {
+      LOGGER.error("Error occurred while pushing records in stream {}", topic, 
roe);
+    }
+  }
+
+  @Override
+  public void produceBatch(String topic, List<byte[]> rows) {
+    try {
+      _retryPolicy.attempt(() -> putRecordBatch(topic, rows));
+    } catch (AttemptsExceededException ae) {
+      LOGGER.error("Retries exhausted while pushing record in stream {}", 
topic);
+    } catch (RetriableOperationException roe) {
+      LOGGER.error("Error occurred while pushing records in stream {}", topic, 
roe);
+    }
   }
 
   @Override
@@ -101,4 +140,40 @@ public class KinesisDataProducer implements 
StreamDataProducer {
     return StaticCredentialsProvider.create(
         AwsBasicCredentials.create(props.getProperty(ACCESS), 
props.getProperty(SECRET)));
   }
+
+  private boolean putRecordBatch(String topic, List<byte[]> rows) {
+    try {
+      List<PutRecordsRequestEntry> putRecordsRequestEntries = new 
ArrayList<>();
+      for (byte[] row : rows) {
+        
putRecordsRequestEntries.add(PutRecordsRequestEntry.builder().data(SdkBytes.fromByteArray(row))
+            .partitionKey(UUID.randomUUID().toString()).build());
+      }
+      PutRecordsRequest putRecordsRequest =
+          
PutRecordsRequest.builder().streamName(topic).records(putRecordsRequestEntries).build();
+
+      PutRecordsResponse putRecordsResponse = 
_kinesisClient.putRecords(putRecordsRequest);
+      return putRecordsResponse.sdkHttpResponse().isSuccessful();
+    } catch (Exception e) {
+      LOGGER.warn("Exception occurred while pushing record to Kinesis {}", 
e.getMessage());
+      return false;
+    }
+  }
+
+  private boolean putRecord(String topic, byte[] key, byte[] payload) {
+    try {
+      PutRecordRequest.Builder putRecordRequestBuilder =
+          
PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload));
+
+      if (key != null) {
+        putRecordRequestBuilder = putRecordRequestBuilder.partitionKey(new 
String(key));
+      } else {
+        putRecordRequestBuilder = 
putRecordRequestBuilder.partitionKey(UUID.randomUUID().toString());
+      }
+      PutRecordResponse putRecordResponse = 
_kinesisClient.putRecord(putRecordRequestBuilder.build());
+      return putRecordResponse.sdkHttpResponse().isSuccessful();
+    } catch (Exception e) {
+      LOGGER.warn("Exception occurred while pushing record to Kinesis {}", 
e.getMessage());
+      return false;
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to