ruchirvaninasdaq edited a comment on issue #97:
URL: 
https://github.com/apache/camel-kafka-connector/issues/97#issuecomment-673628462


   In S3Enpoint.java class: 
   
   createExchange(ExchangePattern pattern, S3Object s3Object)
   
   `public Exchange createExchange(ExchangePattern pattern, S3Object s3Object) {
           LOG.trace("Getting object with key [{}] from bucket [{}]...", 
s3Object.getKey(), s3Object.getBucketName());
   
           ObjectMetadata objectMetadata = s3Object.getObjectMetadata();
   
           LOG.trace("Got object [{}]", s3Object);
   
           Exchange exchange = super.createExchange(pattern);
           Message message = exchange.getIn();
   
           if (configuration.isIncludeBody()) {
               if(verifySchema()){
                   message.setBody(s3Object.getObjectContent());
                   message.setHeader(S3Constants.SUCCESSFUL_MESSAGE, "true");
               } else {
                   Log.error()
                   message.setHeader(S3Constants.SUCCESSFUL_MESSAGE, "false");
                   message.setBody(null);
               }
           } else {
               message.setBody(null);
           }
   
           message.setHeader(S3Constants.KEY, s3Object.getKey());
           message.setHeader(S3Constants.BUCKET_NAME, s3Object.getBucketName());
           message.setHeader(S3Constants.E_TAG, objectMetadata.getETag());
           message.setHeader(S3Constants.LAST_MODIFIED, 
objectMetadata.getLastModified());
           message.setHeader(S3Constants.VERSION_ID, 
objectMetadata.getVersionId());
           message.setHeader(S3Constants.CONTENT_TYPE, 
objectMetadata.getContentType());
           message.setHeader(S3Constants.CONTENT_MD5, 
objectMetadata.getContentMD5());
           message.setHeader(S3Constants.CONTENT_LENGTH, 
objectMetadata.getContentLength());
           message.setHeader(S3Constants.CONTENT_ENCODING, 
objectMetadata.getContentEncoding());
           message.setHeader(S3Constants.CONTENT_DISPOSITION, 
objectMetadata.getContentDisposition());
           message.setHeader(S3Constants.CACHE_CONTROL, 
objectMetadata.getCacheControl());
           message.setHeader(S3Constants.S3_HEADERS, 
objectMetadata.getRawMetadata());
           message.setHeader(S3Constants.SERVER_SIDE_ENCRYPTION, 
objectMetadata.getSSEAlgorithm());
           message.setHeader(S3Constants.USER_METADATA, 
objectMetadata.getUserMetadata());
           message.setHeader(S3Constants.EXPIRATION_TIME, 
objectMetadata.getExpirationTime());
           message.setHeader(S3Constants.REPLICATION_STATUS, 
objectMetadata.getReplicationStatus());
           message.setHeader(S3Constants.STORAGE_CLASS, 
objectMetadata.getStorageClass());
   
           /**
            * If includeBody != true, it is safe to close the object here. If
            * includeBody == true, the caller is responsible for closing the 
stream
            * and object once the body has been fully consumed. As of 2.17, the
            * consumer does not close the stream or object on commit.
            */
           if (!configuration.isIncludeBody()) {
               IOHelper.close(s3Object);
           } else {
               if (configuration.isAutocloseBody()) {
                   exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
SynchronizationAdapter() {
                       @Override
                       public void onDone(Exchange exchange) {
                           IOHelper.close(s3Object);
                       }
                   });
               }
           }
   
           return exchange;
       }`
   
   
   
   And within S3Consumer:
   
   `protected void processCommit(Exchange exchange) {
           try {
               if (getConfiguration().isDeleteAfterRead()) {
                   String bucketName = 
exchange.getIn().getHeader(S3Constants.BUCKET_NAME, String.class);
                   String key = exchange.getIn().getHeader(S3Constants.KEY, 
String.class);
   
   
                   LOG.trace("Deleting object from bucket {} with key {}...", 
bucketName, key);
   
                   if 
(exchange.getIn().getHeader(S3Constants.SUCCESSFUL_MESSAGE, 
String.class).equals("true")) {
                       getAmazonS3Client().copyObject(bucketName, key, 
getConfiguration().getTargetBucketName(), generateSuccessKey(exchange));
   
                   }
                   else {
                       getAmazonS3Client().copyObject(bucketName, key, 
getConfiguration().getTargetBucketName(), generatefailedKey(exchange));
   
                   }
                   getAmazonS3Client().deleteObject(bucketName, key);
   
                   LOG.trace("Deleted object from bucket {} with key {}...", 
bucketName, key);
               }
           } catch (AmazonClientException e) {
               getExceptionHandler().handleException("Error occurred during 
deleting object. This exception is ignored.", exchange, e);
           }
       }`
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to