orpiske commented on pull request #1262: URL: https://github.com/apache/camel-kafka-connector/pull/1262#issuecomment-933633337
I understand the motivation for the fix, but I think that adding a fix that is somewhat specific to Kinesis in the core code is not the right way. Otherwise, the header `CamelAwsKinesisPartitionKey` as added on the PR would leak for other connectors that are not related to AWS Kinesis in any way. The proposed solution to use the SMT transformation is more adequate precisely because it can be implemented at the connector level. For instance, if you have an transformation such as: ``` @Override public R apply(R r) { Object value = r.value(); if (value instanceof MyTypeMessage) { // do any other specific transformation that you need and create the record using API that allows it to include extra headers return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(), buildSchemaBuilderForType(// ... code); } else { LOG.debug("Unexpected message type: {}", value == null ? "null instance" : value.getClass()); return r; } } ``` I think that if you create a SMT that uses this [connect record API](https://kafka.apache.org/23/javadoc/org/apache/kafka/connect/connector/ConnectRecord.html#newRecord-java.lang.String-java.lang.Integer-org.apache.kafka.connect.data.Schema-java.lang.Object-org.apache.kafka.connect.data.Schema-java.lang.Object-java.lang.Long-java.lang.Iterable-) to include the header as you need, it would do the trick. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org