ravikalla opened a new pull request, #20212:
URL: https://github.com/apache/kafka/pull/20212
## Summary
This PR implements KAFKA-10840 to expose authentication failures in the
KafkaConsumer.poll() method, allowing applications to catch authentication
issues immediately instead of experiencing silent failures.
## Problem
Previously, when SSL certificates expired or other authentication issues
occurred, the consumer would stop fetching data without clear indication of the
underlying problem. This led to "data flow stops without indication" scenarios
that were difficult to troubleshoot and handle gracefully.
## Solution
### New Exception Classes
- **CertificateExpiredAuthenticationException**: Specifically for SSL
certificate expiration scenarios
- **PersistentAuthenticationException**: For non-retriable authentication
failures (SASL, general SSL handshake failures)
### Core Changes
- Modified `ClassicKafkaConsumer.poll()` to actively check all known cluster
nodes for authentication exceptions before proceeding with fetch operations
- Added `authenticationException(Node)` method to `ConsumerNetworkClient` to
expose authentication state from the underlying `KafkaClient`
- Enhanced `MockClient` with `setNodeAuthenticationFailure()` method for
testing authentication failure scenarios
### Error Detection Logic
The implementation detects certificate expiration by checking for specific
patterns in SSL exception messages:
- "certificate expired"
- "Certificate expired"
- "CERTIFICATE_EXPIRED"
- "certificate has expired"
- "expired certificate"
## Usage Example
```java
try {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
} catch (CertificateExpiredAuthenticationException e) {
log.error("SSL certificate expired: {}", e.getMessage());
// Handle certificate renewal
} catch (PersistentAuthenticationException e) {
log.error("Authentication failed: {}", e.getMessage());
// Handle authentication configuration
}
```
## Test Plan
- [x] All existing Kafka consumer tests pass
- [x] All ConsumerNetworkClient tests pass
- [x] Code passes checkstyle and spotbugs checks
- [x] Implementation is backward compatible
- [x] Authentication failure scenarios can be tested using MockClient
enhancements
## Files Changed
1.
`clients/src/main/java/org/apache/kafka/common/errors/CertificateExpiredAuthenticationException.java`
(NEW)
2.
`clients/src/main/java/org/apache/kafka/common/errors/PersistentAuthenticationException.java`
(NEW)
3.
`clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java`
(MODIFIED)
4.
`clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java`
(MODIFIED)
5. `clients/src/test/java/org/apache/kafka/clients/MockClient.java`
(MODIFIED)
This addresses the core issue where "data flow stops without indication"
when authentication fails, enabling applications to detect and handle these
failures proactively rather than experiencing silent timeouts.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]