ruchirvaninasdaq opened a new issue #834: URL: https://github.com/apache/camel-kafka-connector/issues/834
Hello, I am using the `aws2-s3-kafka-source-connector` connector (https://github.com/apache/camel-kafka-connector/tree/camel-kafka-connector-0.7.x/connectors/camel-aws2-s3-kafka-connector) with the following configs. ``` apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaConnector metadata: name: name-connector labels: strimzi.io/cluster: benzinga-kafka-connect-cluster spec: tasksMax: 1 class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector config: client.id: client topics: topic connector.class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector key.converter: org.apache.kafka.connect.storage.StringConverter value.converter: org.apache.camel.kafkaconnector.aws2s3.converters.S3ObjectConverter camel.source.kafka.topic: topic camel.source.url: aws2-s3://source-bucket?useDefaultCredentialsProvider=true&moveAfterRead=true&destinationBucket=destination-bucket camel.source.maxPollDuration: 10 camel.source.maxBatchPollSize: 1000 camel.component.aws2-s3.includeBody: false camel.source.endpoint.useDefaultCredentialsProvider : true camel.component.aws2-s3.autocloseBody : true ``` I have updated the S3objectConverter for my customization for searlizer. Code is as follows: ``` public class S3ObjectConverter implements Converter { private static final Logger LOG = LoggerFactory.getLogger(S3ObjectConverter.class); //private final S3ObjectSerializer serializer = new S3ObjectSerializer(); private final S3ObjectAvroSerializer serializer; public S3ObjectConverter() throws IOException { serializer = new S3ObjectAvroSerializer(); } @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public byte[] fromConnectData(String topic, Schema schema, Object value) { return serializer.serialize(topic, (ResponseInputStream<GetObjectResponse>)value); } @Override public SchemaAndValue toConnectData(String arg0, byte[] arg1) { return null; } } ``` This works as expected and the object gets serialize as expected and added to Kafka topic and files get moved to destination-bucket also. I have problem with on failure cases: When the object fail on seralization, even after that it moves to destinationbucket (I expect it to stay in source bucket), Is there any config am I using wrong? Thank you. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org