This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new c373cb86e5 Add support for retry in kinesis producer (#8609) c373cb86e5 is described below commit c373cb86e590e5f4a3e845c8b23d2d1c4f2b738c Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Fri Apr 29 04:54:38 2022 +0530 Add support for retry in kinesis producer (#8609) Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local> --- .../stream/kinesis/server/KinesisDataProducer.java | 105 ++++++++++++++++++--- 1 file changed, 90 insertions(+), 15 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java index 150ac78151..4b67797e80 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java @@ -20,9 +20,17 @@ package org.apache.pinot.plugin.stream.kinesis.server; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; import java.util.UUID; import org.apache.pinot.spi.stream.StreamDataProducer; +import org.apache.pinot.spi.utils.retry.AttemptsExceededException; +import org.apache.pinot.spi.utils.retry.FixedDelayRetryPolicy; +import org.apache.pinot.spi.utils.retry.RetriableOperationException; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; @@ -34,16 +42,27 @@ import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; + public class KinesisDataProducer implements StreamDataProducer { + private static final Logger LOGGER = LoggerFactory.getLogger(KinesisDataProducer.class); + public static final String ENDPOINT = "endpoint"; public static final String REGION = "region"; public static final String ACCESS = "access"; public static final String SECRET = "secret"; - public static final String DEFAULT_PORT = "4566"; + public static final String NUM_RETRIES = "num_retries"; + public static final String RETRY_DELAY_MILLIS = "retry_delay_millis"; + public static final String DEFAULT_ENDPOINT = "http://localhost:4566"; + public static final String DEFAULT_RETRY_DELAY_MILLIS = "10000L"; + public static final String DEFAULT_NUM_RETRIES = "0"; private KinesisClient _kinesisClient; + private RetryPolicy _retryPolicy; @Override public void init(Properties props) { @@ -54,10 +73,9 @@ public class KinesisDataProducer implements StreamDataProducer { .credentialsProvider(getLocalAWSCredentials(props)) .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder()); } else { - kinesisClientBuilder = - KinesisClient.builder().region(Region.of(props.getProperty(REGION))) - .credentialsProvider(DefaultCredentialsProvider.create()) - .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder()); + kinesisClientBuilder = KinesisClient.builder().region(Region.of(props.getProperty(REGION))) + .credentialsProvider(DefaultCredentialsProvider.create()) + .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder()); } if (props.containsKey(ENDPOINT)) { @@ -65,12 +83,16 @@ public class KinesisDataProducer implements StreamDataProducer { try { kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new URI(kinesisEndpoint)); } catch (URISyntaxException e) { - throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: " - + kinesisEndpoint, e); + throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: " + kinesisEndpoint, + e); } } _kinesisClient = kinesisClientBuilder.build(); + + int numRetries = Integer.parseInt(props.getProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES)); + long retryDelayMs = Long.parseLong(props.getProperty(RETRY_DELAY_MILLIS, DEFAULT_RETRY_DELAY_MILLIS)); + _retryPolicy = new FixedDelayRetryPolicy(numRetries + 1, retryDelayMs); } catch (Exception e) { _kinesisClient = null; } @@ -78,18 +100,35 @@ public class KinesisDataProducer implements StreamDataProducer { @Override public void produce(String topic, byte[] payload) { - PutRecordRequest putRecordRequest = - PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload)) - .partitionKey(UUID.randomUUID().toString()).build(); - PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest); + try { + _retryPolicy.attempt(() -> putRecord(topic, null, payload)); + } catch (AttemptsExceededException ae) { + LOGGER.error("Retries exhausted while pushing record in stream {}", topic); + } catch (RetriableOperationException roe) { + LOGGER.error("Error occurred while pushing records in stream {}", topic, roe); + } } @Override public void produce(String topic, byte[] key, byte[] payload) { - PutRecordRequest putRecordRequest = - PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload)).partitionKey(new String(key)) - .build(); - PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest); + try { + _retryPolicy.attempt(() -> putRecord(topic, key, payload)); + } catch (AttemptsExceededException ae) { + LOGGER.error("Retries exhausted while pushing record in stream {}", topic); + } catch (RetriableOperationException roe) { + LOGGER.error("Error occurred while pushing records in stream {}", topic, roe); + } + } + + @Override + public void produceBatch(String topic, List<byte[]> rows) { + try { + _retryPolicy.attempt(() -> putRecordBatch(topic, rows)); + } catch (AttemptsExceededException ae) { + LOGGER.error("Retries exhausted while pushing record in stream {}", topic); + } catch (RetriableOperationException roe) { + LOGGER.error("Error occurred while pushing records in stream {}", topic, roe); + } } @Override @@ -101,4 +140,40 @@ public class KinesisDataProducer implements StreamDataProducer { return StaticCredentialsProvider.create( AwsBasicCredentials.create(props.getProperty(ACCESS), props.getProperty(SECRET))); } + + private boolean putRecordBatch(String topic, List<byte[]> rows) { + try { + List<PutRecordsRequestEntry> putRecordsRequestEntries = new ArrayList<>(); + for (byte[] row : rows) { + putRecordsRequestEntries.add(PutRecordsRequestEntry.builder().data(SdkBytes.fromByteArray(row)) + .partitionKey(UUID.randomUUID().toString()).build()); + } + PutRecordsRequest putRecordsRequest = + PutRecordsRequest.builder().streamName(topic).records(putRecordsRequestEntries).build(); + + PutRecordsResponse putRecordsResponse = _kinesisClient.putRecords(putRecordsRequest); + return putRecordsResponse.sdkHttpResponse().isSuccessful(); + } catch (Exception e) { + LOGGER.warn("Exception occurred while pushing record to Kinesis {}", e.getMessage()); + return false; + } + } + + private boolean putRecord(String topic, byte[] key, byte[] payload) { + try { + PutRecordRequest.Builder putRecordRequestBuilder = + PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload)); + + if (key != null) { + putRecordRequestBuilder = putRecordRequestBuilder.partitionKey(new String(key)); + } else { + putRecordRequestBuilder = putRecordRequestBuilder.partitionKey(UUID.randomUUID().toString()); + } + PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequestBuilder.build()); + return putRecordResponse.sdkHttpResponse().isSuccessful(); + } catch (Exception e) { + LOGGER.warn("Exception occurred while pushing record to Kinesis {}", e.getMessage()); + return false; + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org