AndrewJSchofield commented on code in PR #17099:
URL: https://github.com/apache/kafka/pull/17099#discussion_r1746375669
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java:
##########
@@ -82,11 +83,12 @@ public ProducerRecord<K, V> onSend(ProducerRecord<K, V>
record) {
* @param metadata The metadata for the record that was sent (i.e. the
partition and offset).
* If an error occurred, metadata will only contain valid
topic and maybe partition.
* @param exception The exception thrown during processing of this record.
Null if no error occurred.
+ * @param headers The headers for the record that was sent
*/
- public void onAcknowledgement(RecordMetadata metadata, Exception
exception) {
+ public void onAcknowledgement(RecordMetadata metadata, Exception
exception, Headers headers) {
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
- interceptor.onAcknowledgement(metadata, exception);
+ interceptor.onAcknowledgement(metadata, exception, headers);
Review Comment:
I know this came up during the KIP review, but really the headers ought to
be read-only here. You could make a read-only implementation of `Headers` that
delegates all read operations to the actual `Headers`. Otherwise, some cunning
person might start modifying the headers structure and passing it between
interceptors.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java:
##########
@@ -82,11 +83,12 @@ public ProducerRecord<K, V> onSend(ProducerRecord<K, V>
record) {
* @param metadata The metadata for the record that was sent (i.e. the
partition and offset).
* If an error occurred, metadata will only contain valid
topic and maybe partition.
* @param exception The exception thrown during processing of this record.
Null if no error occurred.
+ * @param headers The headers for the record that was sent
*/
- public void onAcknowledgement(RecordMetadata metadata, Exception
exception) {
+ public void onAcknowledgement(RecordMetadata metadata, Exception
exception, Headers headers) {
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
- interceptor.onAcknowledgement(metadata, exception);
+ interceptor.onAcknowledgement(metadata, exception, headers);
Review Comment:
Ah, no. Not necessary. The producer has already made them read-only. :)
##########
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##########
@@ -2397,7 +2399,15 @@ public ProducerRecord<byte[], byte[]>
onSend(ProducerRecord<byte[], byte[]> reco
}
@Override
- public void onAcknowledgement(RecordMetadata metadata, Exception
exception) {
+ public void onAcknowledgement(RecordMetadata metadata, Exception
exception, Headers headers) {
+ if (headers == null) {
+ return;
+ }
+ if (!(headers instanceof RecordHeaders)) {
+ return;
+ }
+ RecordHeaders recordHeaders = (RecordHeaders) headers;
Review Comment:
Well, I think it is important that they are read-only so this assertion
probably does have some value.
##########
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##########
@@ -2397,7 +2399,12 @@ public ProducerRecord<byte[], byte[]>
onSend(ProducerRecord<byte[], byte[]> reco
}
@Override
- public void onAcknowledgement(RecordMetadata metadata, Exception
exception) {
+ public void onAcknowledgement(RecordMetadata metadata, Exception
exception, Headers headers) {
+ if (!(headers instanceof RecordHeaders)) {
Review Comment:
Yes, I agree.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]