ruchirvaninasdaq edited a comment on issue #97: URL: https://github.com/apache/camel-kafka-connector/issues/97#issuecomment-673628462
In S3Enpoint.java class: createExchange(ExchangePattern pattern, S3Object s3Object) `public Exchange createExchange(ExchangePattern pattern, S3Object s3Object) { LOG.trace("Getting object with key [{}] from bucket [{}]...", s3Object.getKey(), s3Object.getBucketName()); ObjectMetadata objectMetadata = s3Object.getObjectMetadata(); LOG.trace("Got object [{}]", s3Object); Exchange exchange = super.createExchange(pattern); Message message = exchange.getIn(); if (configuration.isIncludeBody()) { if(verifySchema()){ message.setBody(s3Object.getObjectContent()); message.setHeader(S3Constants.SUCCESSFUL_MESSAGE, "true"); } else { Log.error() message.setHeader(S3Constants.SUCCESSFUL_MESSAGE, "false"); message.setBody(null); } } else { message.setBody(null); } message.setHeader(S3Constants.KEY, s3Object.getKey()); message.setHeader(S3Constants.BUCKET_NAME, s3Object.getBucketName()); message.setHeader(S3Constants.E_TAG, objectMetadata.getETag()); message.setHeader(S3Constants.LAST_MODIFIED, objectMetadata.getLastModified()); message.setHeader(S3Constants.VERSION_ID, objectMetadata.getVersionId()); message.setHeader(S3Constants.CONTENT_TYPE, objectMetadata.getContentType()); message.setHeader(S3Constants.CONTENT_MD5, objectMetadata.getContentMD5()); message.setHeader(S3Constants.CONTENT_LENGTH, objectMetadata.getContentLength()); message.setHeader(S3Constants.CONTENT_ENCODING, objectMetadata.getContentEncoding()); message.setHeader(S3Constants.CONTENT_DISPOSITION, objectMetadata.getContentDisposition()); message.setHeader(S3Constants.CACHE_CONTROL, objectMetadata.getCacheControl()); message.setHeader(S3Constants.S3_HEADERS, objectMetadata.getRawMetadata()); message.setHeader(S3Constants.SERVER_SIDE_ENCRYPTION, objectMetadata.getSSEAlgorithm()); message.setHeader(S3Constants.USER_METADATA, objectMetadata.getUserMetadata()); message.setHeader(S3Constants.EXPIRATION_TIME, objectMetadata.getExpirationTime()); message.setHeader(S3Constants.REPLICATION_STATUS, objectMetadata.getReplicationStatus()); message.setHeader(S3Constants.STORAGE_CLASS, objectMetadata.getStorageClass()); /** * If includeBody != true, it is safe to close the object here. If * includeBody == true, the caller is responsible for closing the stream * and object once the body has been fully consumed. As of 2.17, the * consumer does not close the stream or object on commit. */ if (!configuration.isIncludeBody()) { IOHelper.close(s3Object); } else { if (configuration.isAutocloseBody()) { exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { IOHelper.close(s3Object); } }); } } return exchange; }` And within S3Consumer: ` protected void processCommit(Exchange exchange) { try { if (getConfiguration().isDeleteAfterRead()) { String bucketName = exchange.getIn().getHeader(S3Constants.BUCKET_NAME, String.class); String key = exchange.getIn().getHeader(S3Constants.KEY, String.class); LOG.trace("Deleting object from bucket {} with key {}...", bucketName, key); if (exchange.getIn().getHeader(S3Constants.SUCCESSFUL_MESSAGE, String.class).equals("true")) { getAmazonS3Client().copyObject(bucketName, key, getConfiguration().getTargetBucketName(), generateSuccessKey(exchange)); } else { getAmazonS3Client().copyObject(bucketName, key, getConfiguration().getTargetBucketName(), generatefailedKey(exchange)); } getAmazonS3Client().deleteObject(bucketName, key); LOG.trace("Deleted object from bucket {} with key {}...", bucketName, key); } } catch (AmazonClientException e) { getExceptionHandler().handleException("Error occurred during deleting object. This exception is ignored.", exchange, e); } }` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org