vamossagar12 commented on code in PR #14372:
URL: https://github.com/apache/kafka/pull/14372#discussion_r1670154691
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##########
@@ -2156,6 +2157,42 @@ public void testAlterOffsetsSourceConnector(boolean
enableTopicCreation) throws
verifyKafkaClusterId();
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testGetSourceConnectorOffsetsFetchError(boolean
enableTopicCreation) {
+ setup(enableTopicCreation);
+ mockKafkaClusterId();
+
+ ConnectorOffsetBackingStore offsetStore =
mock(ConnectorOffsetBackingStore.class);
+ CloseableOffsetStorageReader offsetReader =
mock(CloseableOffsetStorageReader.class);
+
+ Set<Map<String, Object>> connectorPartitions =
+ Collections.singleton(Collections.singletonMap("partitionKey",
"partitionValue"));
+
+
when(executorService.submit(any(Runnable.class))).thenAnswer(invocation -> {
+ invocation.getArgument(0, Runnable.class).run();
+ return null;
+ });
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
offsetBackingStore, executorService,
+ allConnectorClientConfigOverridePolicy, null);
+ worker.start();
+
+
when(offsetStore.connectorPartitions(CONNECTOR_ID)).thenReturn(connectorPartitions);
+ doAnswer(invocation -> {
+ throw new ExecutionException(new
SaslAuthenticationException("error"));
+ }).when(offsetReader).offsets(connectorPartitions);
Review Comment:
The reason I am throwing `ExecutionException` here is that when the read to
offsets end fails, the associated callback is set with the error. That error is
usually wrapped in an `ExecutionException` when eventually we do a get on the
callback
[here](https://github.com/apache/kafka/blob/67e68596329df1d990310f02767a47512a44d568/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java#L135).
In the case of OffsetStorageReadImpl::offsets as well, something similar
happens
[here](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java#L102)
where we wait on the future callback object which is what is submitted to the
`readToEnd` operation
[here](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L281)
and that gets handled eventually in the WorkThread.
Yes `ConnectException` is still thrown but that would wrap this exception
imo. In the ticket , I had shared a stacktrace, and this is how it looks like :
```
[2022-11-20 09:00:59,307] ERROR Unexpected exception in Thread[KafkaBasedLog
Work Thread - connect-offsets,5,main]
(org.apache.kafka.connect.util.KafkaBasedLog:440)org.apache.kafka.connect.errors.ConnectException:
Error while getting end offsets for topic 'connect-offsets' on brokers at XXX
at
org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:695)
at
org.apache.kafka.connect.util.KafkaBasedLog.readEndOffsets(KafkaBasedLog.java:371)
at
org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:332)
at
org.apache.kafka.connect.util.KafkaBasedLog.access$400(KafkaBasedLog.java:75)
at
org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:406)
Caused by: java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
at
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at
org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:672)
... 4 more
```
which is inline with what I tried to explain above. Note that, even in this
test, the next line being asserted is
```
assertEquals(ConnectException.class, e.getCause().getClass());
```
because I am invoking
[sourceConnectorOffsets](https://github.com/apache/kafka/blob/67e68596329df1d990310f02767a47512a44d568/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1264)
which does this wrapping.
Let me know if that explanation makes sense.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]