ruchirvaninasdaq opened a new issue #834:
URL: https://github.com/apache/camel-kafka-connector/issues/834


   Hello, 
   
   I am using the `aws2-s3-kafka-source-connector` connector 
(https://github.com/apache/camel-kafka-connector/tree/camel-kafka-connector-0.7.x/connectors/camel-aws2-s3-kafka-connector)
 with the following configs.
   
   ```
   apiVersion: kafka.strimzi.io/v1alpha1
   kind: KafkaConnector
   metadata:
     name: name-connector
     labels:
       strimzi.io/cluster: benzinga-kafka-connect-cluster
   spec:
     tasksMax: 1
     class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
     config:
       client.id: client
       topics: topic
       connector.class: 
org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
       key.converter: org.apache.kafka.connect.storage.StringConverter
       value.converter: 
org.apache.camel.kafkaconnector.aws2s3.converters.S3ObjectConverter
       camel.source.kafka.topic: topic
       camel.source.url: 
aws2-s3://source-bucket?useDefaultCredentialsProvider=true&moveAfterRead=true&destinationBucket=destination-bucket
       camel.source.maxPollDuration: 10
       camel.source.maxBatchPollSize: 1000
       camel.component.aws2-s3.includeBody: false
       camel.source.endpoint.useDefaultCredentialsProvider : true
       camel.component.aws2-s3.autocloseBody : true
   ```
   
   I have updated the S3objectConverter for my customization for searlizer. 
Code is as follows: 
   ```
   public class S3ObjectConverter implements Converter {
   
       private static final Logger LOG = 
LoggerFactory.getLogger(S3ObjectConverter.class);
       //private final S3ObjectSerializer serializer = new S3ObjectSerializer();
       private final S3ObjectAvroSerializer serializer;
   
       public S3ObjectConverter() throws IOException {
           serializer = new S3ObjectAvroSerializer();
       }
   
       @Override
       public void configure(Map<String, ?> configs, boolean isKey) {
       }
   
       @Override
       public byte[] fromConnectData(String topic, Schema schema, Object value) 
{
           return serializer.serialize(topic, 
(ResponseInputStream<GetObjectResponse>)value);
       }
   
       @Override
       public SchemaAndValue toConnectData(String arg0, byte[] arg1) {
           return null;
       }
   
   }
   ```
   
   This works as expected and the object gets serialize as expected and added 
to Kafka topic and files get moved to destination-bucket also. 
   
   I have problem with on failure cases: 
   When the object fail on seralization, even after that it moves to 
destinationbucket (I expect it to stay in source bucket), Is there any config 
am I using wrong? 
   
   Thank you. 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to